stabilized app
This commit is contained in:
committed by
Torsten (PC)
parent
51fd9fcd13
commit
1451225978
@@ -1,149 +1,154 @@
|
||||
#include "websocket_server.h"
|
||||
#include "connection_guard.h"
|
||||
#include "worker.h"
|
||||
#include <iostream>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <queue>
|
||||
#include <condition_variable>
|
||||
#include <cstring>
|
||||
|
||||
using json = nlohmann::json;
|
||||
|
||||
// Protocols array definition
|
||||
struct lws_protocols WebSocketServer::protocols[] = {
|
||||
{
|
||||
"yourpart-protocol",
|
||||
WebSocketServer::wsCallback,
|
||||
sizeof(WebSocketUserData),
|
||||
4096
|
||||
},
|
||||
{ nullptr, nullptr, 0, 0 }
|
||||
};
|
||||
|
||||
WebSocketServer::WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker)
|
||||
: port(port), pool(pool), broker(broker) {}
|
||||
|
||||
WebSocketServer::~WebSocketServer() {
|
||||
stop();
|
||||
}
|
||||
|
||||
void WebSocketServer::run() {
|
||||
running = true;
|
||||
broker.subscribe([this](const std::string &message) {
|
||||
std::lock_guard<std::mutex> lock(queueMutex);
|
||||
messageQueue.push(message);
|
||||
broker.subscribe([this](const std::string &msg) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(queueMutex);
|
||||
messageQueue.push(msg);
|
||||
}
|
||||
queueCV.notify_one();
|
||||
});
|
||||
serverThread = std::thread([this]() { startServer(); });
|
||||
messageProcessingThread = std::thread([this]() { processMessageQueue(); });
|
||||
pingThread = std::thread([this]() { pingClients(); });
|
||||
serverThread = std::thread([this](){ startServer(); });
|
||||
messageThread = std::thread([this](){ processMessageQueue(); });
|
||||
pingThread = std::thread([this](){ pingClients(); });
|
||||
}
|
||||
|
||||
void WebSocketServer::stop() {
|
||||
running = false;
|
||||
if (context) lws_cancel_service(context);
|
||||
if (serverThread.joinable()) serverThread.join();
|
||||
if (messageProcessingThread.joinable()) messageProcessingThread.join();
|
||||
if (messageThread.joinable()) messageThread.join();
|
||||
if (pingThread.joinable()) pingThread.join();
|
||||
if (context) {
|
||||
lws_context_destroy(context);
|
||||
context = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketServer::startServer() {
|
||||
uWS::App()
|
||||
.ws<WebSocketUserData>("/*", {
|
||||
.open = [this](uWS::WebSocket<false, true, WebSocketUserData> *ws) {
|
||||
ws->getUserData()->pongReceived = true;
|
||||
},
|
||||
.message = [this](uWS::WebSocket<false, true, WebSocketUserData> *ws, std::string_view message, uWS::OpCode opCode) {
|
||||
handleWebSocketMessage(ws, message, opCode);
|
||||
},
|
||||
.close = [this](uWS::WebSocket<false, true, WebSocketUserData> *ws, int /*code*/, std::string_view /*message*/) {
|
||||
handleWebSocketClose(ws);
|
||||
}
|
||||
})
|
||||
.listen(port, [this](auto *token) {
|
||||
if (token) {
|
||||
std::cout << "WebSocket-Server läuft auf Port " << port << std::endl;
|
||||
} else {
|
||||
std::cerr << "WebSocket-Server konnte nicht gestartet werden!\n";
|
||||
running = false;
|
||||
}
|
||||
})
|
||||
.run();
|
||||
}
|
||||
|
||||
void WebSocketServer::pingClients() {
|
||||
struct lws_context_creation_info info;
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.port = port;
|
||||
info.protocols = protocols;
|
||||
context = lws_create_context(&info);
|
||||
if (!context) {
|
||||
throw std::runtime_error("Failed to create LWS context");
|
||||
}
|
||||
while (running) {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(30));
|
||||
|
||||
std::unique_lock lock(connectionsMutex);
|
||||
for (auto &[userId, ws] : connections) {
|
||||
if (!ws->getUserData()->pongReceived) {
|
||||
ws->close();
|
||||
} else {
|
||||
ws->getUserData()->pongReceived = false;
|
||||
ws->send("ping", uWS::OpCode::TEXT);
|
||||
}
|
||||
}
|
||||
lws_service(context, 50);
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketServer::processMessageQueue() {
|
||||
while (running) {
|
||||
std::unique_lock lock(queueMutex);
|
||||
queueCV.wait(lock, [this]() { return !messageQueue.empty() || !running; });
|
||||
std::unique_lock<std::mutex> lock(queueMutex);
|
||||
queueCV.wait(lock, [this](){ return !messageQueue.empty() || !running; });
|
||||
while (!messageQueue.empty()) {
|
||||
std::string message = std::move(messageQueue.front());
|
||||
std::string msg = std::move(messageQueue.front());
|
||||
messageQueue.pop();
|
||||
lock.unlock();
|
||||
handleBrokerMessage(message);
|
||||
handleBrokerMessage(msg);
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketServer::pingClients() {
|
||||
while (running) {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(30));
|
||||
lws_callback_on_writable_all_protocol(context, &protocols[0]);
|
||||
}
|
||||
}
|
||||
|
||||
int WebSocketServer::wsCallback(struct lws *wsi,
|
||||
enum lws_callback_reasons reason,
|
||||
void *user, void *in, size_t len) {
|
||||
auto *ud = reinterpret_cast<WebSocketUserData*>(user);
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_ESTABLISHED:
|
||||
ud->pongReceived = true;
|
||||
break;
|
||||
case LWS_CALLBACK_RECEIVE: {
|
||||
std::string msg(reinterpret_cast<char*>(in), len);
|
||||
// Here you would dispatch the received message to handleBrokerMessage or handleWebSocketMessage
|
||||
break;
|
||||
}
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
||||
unsigned char buf[LWS_PRE + 4];
|
||||
memcpy(buf + LWS_PRE, "ping", 4);
|
||||
lws_write(wsi, buf + LWS_PRE, 4, LWS_WRITE_TEXT);
|
||||
break;
|
||||
}
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
// Remove closed connection if stored
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void WebSocketServer::handleBrokerMessage(const std::string &message) {
|
||||
try {
|
||||
json parsedMessage = json::parse(message);
|
||||
if (parsedMessage.contains("user_id")) {
|
||||
int falukantUserId = parsedMessage["user_id"];
|
||||
std::shared_lock lock(connectionsMutex);
|
||||
auto userId = getUserIdFromFalukantUserId(falukantUserId);
|
||||
json parsed = json::parse(message);
|
||||
if (parsed.contains("user_id")) {
|
||||
int fid = parsed["user_id"].get<int>();
|
||||
auto userId = getUserIdFromFalukantUserId(fid);
|
||||
std::shared_lock<std::shared_mutex> lock(connectionsMutex);
|
||||
auto it = connections.find(userId);
|
||||
if (it != connections.end()) {
|
||||
it->second->send(message, uWS::OpCode::TEXT);
|
||||
std::cout << "[WebSocketServer] Nachricht an User-ID: " << userId << " gesendet.\n";
|
||||
} else {
|
||||
std::cerr << "[WebSocketServer] Keine Verbindung für User-ID: " << userId << "\n";
|
||||
lws_callback_on_writable(it->second);
|
||||
}
|
||||
} else {
|
||||
std::cerr << "[WebSocketServer] Ungültige Nachricht: " << message << "\n";
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "[WebSocketServer] Fehler beim Verarbeiten der Nachricht: " << e.what() << "\n";
|
||||
std::cerr << "Error processing broker message: " << e.what() << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketServer::handleWebSocketMessage(uWS::WebSocket<false, true, WebSocketUserData> *ws, std::string_view message, uWS::OpCode opCode) {
|
||||
if (message == "pong") {
|
||||
ws->getUserData()->pongReceived = true;
|
||||
return;
|
||||
}
|
||||
|
||||
json parsedMessage = json::parse(message);
|
||||
if (parsedMessage.contains("event") && parsedMessage["event"] == "setUserId") {
|
||||
std::string userId = parsedMessage["data"]["userId"];
|
||||
std::unique_lock lock(connectionsMutex);
|
||||
connections[userId] = ws;
|
||||
ws->getUserData()->userId = userId;
|
||||
}
|
||||
}
|
||||
|
||||
void WebSocketServer::handleWebSocketClose(uWS::WebSocket<false, true, WebSocketUserData> *ws) {
|
||||
std::unique_lock lock(connectionsMutex);
|
||||
auto userId = ws->getUserData()->userId;
|
||||
if (!userId.empty()) {
|
||||
connections.erase(userId);
|
||||
}
|
||||
}
|
||||
|
||||
std::string WebSocketServer::getUserIdFromFalukantUserId(int & userId) {
|
||||
std::string WebSocketServer::getUserIdFromFalukantUserId(int userId) {
|
||||
ConnectionGuard guard(pool);
|
||||
auto &db = guard.get();
|
||||
std::string query = R"(
|
||||
std::string sql = R"(
|
||||
SELECT u.hashed_id
|
||||
FROM community.user u
|
||||
JOIN falukant_data.falukant_user fu ON u.id = fu.user_id
|
||||
WHERE fu.id = $1
|
||||
)";
|
||||
db.prepare("get_user_id", query);
|
||||
auto users = db.execute("get_user_id", {std::to_string(userId)});
|
||||
if (!users.empty()) {
|
||||
return users[0]["hashed_id"];
|
||||
} else {
|
||||
return "";
|
||||
db.prepare("get_user_id", sql);
|
||||
auto res = db.execute("get_user_id", {std::to_string(userId)});
|
||||
return (!res.empty()) ? res[0]["hashed_id"] : std::string();
|
||||
}
|
||||
|
||||
void WebSocketServer::setWorkers(const std::vector<std::unique_ptr<Worker>> &workerList) {
|
||||
workers.clear();
|
||||
workers.reserve(workerList.size());
|
||||
for (const auto &wptr : workerList) {
|
||||
workers.push_back(wptr.get());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user