From 00a5f47caec074c7c919dd89a3ea4e7bcbc90538 Mon Sep 17 00:00:00 2001 From: "Torsten Schulz (local)" Date: Tue, 4 Nov 2025 15:36:43 +0100 Subject: [PATCH] Refactor WebSocket server connection management and message handling - Update WebSocketUserData to use a message queue for handling outgoing messages, improving concurrency and message delivery. - Modify pingClients method to handle multiple connections per user and implement timeout logic for ping responses. - Enhance addConnection and removeConnection methods to manage multiple connections for each user, including detailed logging of connection states. - Update handleBrokerMessage to send messages to all active connections for a user, ensuring proper queue management and callback invocation. --- src/websocket_server.cpp | 149 +++++++++++++++++++++++++++------------ src/websocket_server.h | 7 +- 2 files changed, 108 insertions(+), 48 deletions(-) diff --git a/src/websocket_server.cpp b/src/websocket_server.cpp index deb510b..47c44c7 100644 --- a/src/websocket_server.cpp +++ b/src/websocket_server.cpp @@ -5,6 +5,7 @@ #include #include #include +#include using json = nlohmann::json; @@ -169,24 +170,26 @@ void WebSocketServer::pingClients() { { std::shared_lock lock(connectionsMutex); for (auto& pair : connections) { - auto* ud = reinterpret_cast(lws_wsi_user(pair.second)); - if (!ud) continue; - - // Prüfe ob Pong-Timeout erreicht wurde - auto timeSincePing = std::chrono::duration_cast(now - ud->lastPingTime).count(); - auto timeSincePong = std::chrono::duration_cast(now - ud->lastPongTime).count(); - - if (!ud->pongReceived && timeSincePing > WebSocketUserData::PONG_TIMEOUT_SECONDS) { - ud->pingTimeoutCount++; - std::cout << "Ping-Timeout für User " << ud->userId << " (Versuch " << ud->pingTimeoutCount << "/" << WebSocketUserData::MAX_PING_TIMEOUTS << ")" << std::endl; + for (auto* wsi : pair.second) { + auto* ud = reinterpret_cast(lws_wsi_user(wsi)); + if (!ud) continue; - if (ud->pingTimeoutCount >= WebSocketUserData::MAX_PING_TIMEOUTS) { - std::cout << "Verbindung wird getrennt: Zu viele Ping-Timeouts für User " << ud->userId << std::endl; - toDisconnect.push_back(pair.second); - } else { - // Reset für nächsten Versuch - ud->pongReceived = true; - ud->lastPongTime = now; + // Prüfe ob Pong-Timeout erreicht wurde + auto timeSincePing = std::chrono::duration_cast(now - ud->lastPingTime).count(); + auto timeSincePong = std::chrono::duration_cast(now - ud->lastPongTime).count(); + + if (!ud->pongReceived && timeSincePing > WebSocketUserData::PONG_TIMEOUT_SECONDS) { + ud->pingTimeoutCount++; + std::cout << "Ping-Timeout für User " << ud->userId << " (Versuch " << ud->pingTimeoutCount << "/" << WebSocketUserData::MAX_PING_TIMEOUTS << ")" << std::endl; + + if (ud->pingTimeoutCount >= WebSocketUserData::MAX_PING_TIMEOUTS) { + std::cout << "Verbindung wird getrennt: Zu viele Ping-Timeouts für User " << ud->userId << std::endl; + toDisconnect.push_back(wsi); + } else { + // Reset für nächsten Versuch + ud->pongReceived = true; + ud->lastPongTime = now; + } } } } @@ -261,7 +264,31 @@ int WebSocketServer::wsCallback(struct lws *wsi, } case LWS_CALLBACK_SERVER_WRITEABLE: { // Prüfe ob es eine Nachricht zum Senden gibt - if (ud->pendingMessage.empty()) { + std::string messageToSend; + { + std::lock_guard lock(ud->messageQueueMutex); + if (!ud->messageQueue.empty()) { + messageToSend = std::move(ud->messageQueue.front()); + ud->messageQueue.pop(); + } + } + + if (!messageToSend.empty()) { + // Nachricht senden + std::cout << "[WRITEABLE] Sende Nachricht: " << messageToSend << std::endl; + unsigned char buf[LWS_PRE + messageToSend.length()]; + memcpy(buf + LWS_PRE, messageToSend.c_str(), messageToSend.length()); + lws_write(wsi, buf + LWS_PRE, messageToSend.length(), LWS_WRITE_TEXT); + std::cout << "[WRITEABLE] Nachricht erfolgreich gesendet" << std::endl; + + // Wenn noch weitere Nachrichten in der Queue sind, wieder schreibbereit machen + { + std::lock_guard lock(ud->messageQueueMutex); + if (!ud->messageQueue.empty()) { + lws_callback_on_writable(wsi); + } + } + } else { // Ping senden ud->lastPingTime = std::chrono::steady_clock::now(); ud->pongReceived = false; @@ -269,21 +296,13 @@ int WebSocketServer::wsCallback(struct lws *wsi, memcpy(buf + LWS_PRE, "ping", 4); lws_write(wsi, buf + LWS_PRE, 4, LWS_WRITE_TEXT); // std::cout << "Ping an Client gesendet" << std::endl; - } else { - // Nachricht senden - std::cout << "[WRITEABLE] Sende Nachricht: " << ud->pendingMessage << std::endl; - unsigned char buf[LWS_PRE + ud->pendingMessage.length()]; - memcpy(buf + LWS_PRE, ud->pendingMessage.c_str(), ud->pendingMessage.length()); - lws_write(wsi, buf + LWS_PRE, ud->pendingMessage.length(), LWS_WRITE_TEXT); - ud->pendingMessage.clear(); - std::cout << "[WRITEABLE] Nachricht erfolgreich gesendet" << std::endl; } break; } case LWS_CALLBACK_CLOSED: // Verbindung aus der Map entfernen if (!ud->userId.empty()) { - instance->removeConnection(ud->userId); + instance->removeConnection(ud->userId, wsi); std::cout << "WebSocket-Verbindung geschlossen für User: " << ud->userId << std::endl; } break; @@ -316,28 +335,44 @@ void WebSocketServer::handleBrokerMessage(const std::string &message) { auto userId = getUserIdFromFalukantUserId(fid); std::cout << "[handleBrokerMessage] Broker-Nachricht für Falukant-User " << fid << " -> User-ID " << userId << std::endl; + // Prüfe ob User-ID gefunden wurde + if (userId.empty()) { + std::cerr << "[handleBrokerMessage] WARNUNG: User-ID für Falukant-User " << fid << " nicht gefunden! Nachricht wird nicht gesendet." << std::endl; + return; + } + std::shared_lock lock(connectionsMutex); - std::cout << "[handleBrokerMessage] Aktive Verbindungen: " << connections.size() << std::endl; + std::cout << "[handleBrokerMessage] Aktive User-Verbindungen: " << connections.size() << std::endl; auto it = connections.find(userId); - if (it != connections.end()) { - std::cout << "[handleBrokerMessage] Sende Nachricht an User " << userId << ": " << message << std::endl; + if (it != connections.end() && !it->second.empty()) { + std::cout << "[handleBrokerMessage] Sende Nachricht an User " << userId << " (" << it->second.size() << " Verbindungen): " << message << std::endl; - // Nachricht in der UserData speichern - auto *ud = reinterpret_cast(lws_wsi_user(it->second)); - if (ud) { - ud->pendingMessage = message; - std::cout << "[handleBrokerMessage] Nachricht in pendingMessage gespeichert" << std::endl; - } else { - std::cerr << "[handleBrokerMessage] FEHLER: ud ist nullptr!" << std::endl; + // Nachricht an alle Verbindungen des Users senden + for (auto* wsi : it->second) { + auto *ud = reinterpret_cast(lws_wsi_user(wsi)); + if (ud) { + bool wasEmpty = false; + { + std::lock_guard lock(ud->messageQueueMutex); + wasEmpty = ud->messageQueue.empty(); + ud->messageQueue.push(message); + std::cout << "[handleBrokerMessage] Nachricht zur Queue hinzugefügt (Queue-Größe: " << ud->messageQueue.size() << ")" << std::endl; + } + // Nur wenn die Queue leer war, den Callback aufrufen + // (sonst wird er bereits durch den WRITEABLE-Handler aufgerufen) + if (wasEmpty) { + lws_callback_on_writable(wsi); + } + } else { + std::cerr << "[handleBrokerMessage] FEHLER: ud ist nullptr für eine Verbindung!" << std::endl; + } } - - lws_callback_on_writable(it->second); } else { std::cout << "[handleBrokerMessage] Keine aktive Verbindung für User " << userId << " gefunden" << std::endl; std::cout << "[handleBrokerMessage] Verfügbare User-IDs in connections:" << std::endl; for (const auto& pair : connections) { - std::cout << " - " << pair.first << std::endl; + std::cout << " - " << pair.first << " (" << pair.second.size() << " Verbindungen)" << std::endl; } } } else { @@ -372,12 +407,36 @@ void WebSocketServer::setWorkers(const std::vector> &wor void WebSocketServer::addConnection(const std::string &userId, struct lws *wsi) { std::unique_lock lock(connectionsMutex); - connections[userId] = wsi; - std::cout << "[addConnection] Verbindung für User " << userId << " gespeichert (Insgesamt: " << connections.size() << " Verbindungen)" << std::endl; + connections[userId].push_back(wsi); + size_t totalConnections = 0; + for (const auto& pair : connections) { + totalConnections += pair.second.size(); + } + std::cout << "[addConnection] Verbindung für User " << userId << " gespeichert (User hat " << connections[userId].size() << " Verbindung(en), insgesamt: " << totalConnections << " Verbindungen)" << std::endl; } -void WebSocketServer::removeConnection(const std::string &userId) { +void WebSocketServer::removeConnection(const std::string &userId, struct lws *wsi) { std::unique_lock lock(connectionsMutex); - connections.erase(userId); - std::cout << "[removeConnection] Verbindung für User " << userId << " entfernt (Insgesamt: " << connections.size() << " Verbindungen)" << std::endl; + auto it = connections.find(userId); + if (it != connections.end()) { + // Entferne die spezifische Verbindung aus dem Vektor + auto& connList = it->second; + connList.erase(std::remove(connList.begin(), connList.end(), wsi), connList.end()); + + // Speichere die verbleibende Anzahl vor dem möglichen Löschen + size_t remainingConnections = connList.size(); + + // Wenn keine Verbindungen mehr vorhanden sind, entferne den Eintrag + if (connList.empty()) { + connections.erase(it); + } + + size_t totalConnections = 0; + for (const auto& pair : connections) { + totalConnections += pair.second.size(); + } + std::cout << "[removeConnection] Verbindung für User " << userId << " entfernt (User hat noch " << remainingConnections << " Verbindung(en), insgesamt: " << totalConnections << " Verbindungen)" << std::endl; + } else { + std::cout << "[removeConnection] Warnung: Keine Verbindungen für User " << userId << " gefunden" << std::endl; + } } diff --git a/src/websocket_server.h b/src/websocket_server.h index 83f073b..bb053c4 100644 --- a/src/websocket_server.h +++ b/src/websocket_server.h @@ -20,7 +20,8 @@ struct WebSocketUserData { std::string userId; bool pongReceived = true; - std::string pendingMessage; + std::queue messageQueue; + std::mutex messageQueueMutex; std::chrono::steady_clock::time_point lastPingTime; std::chrono::steady_clock::time_point lastPongTime; int pingTimeoutCount = 0; @@ -48,7 +49,7 @@ private: void handleBrokerMessage(const std::string &message); std::string getUserIdFromFalukantUserId(int falukantUserId); void addConnection(const std::string &userId, struct lws *wsi); - void removeConnection(const std::string &userId); + void removeConnection(const std::string &userId, struct lws *wsi); static int wsCallback(struct lws *wsi, enum lws_callback_reasons reason, @@ -72,7 +73,7 @@ private: std::queue messageQueue; std::shared_mutex connectionsMutex; - std::unordered_map connections; + std::unordered_map> connections; std::vector workers;