From d71df901ed9a809083a457f86a1f2d4410030242 Mon Sep 17 00:00:00 2001 From: "Torsten Schulz (local)" Date: Fri, 21 Nov 2025 14:39:16 +0100 Subject: [PATCH] Refactor message sending logic in WebSocket server to improve direct transmission and error handling - Attempt to send messages directly during the RECEIVE callback to avoid mutex issues, enhancing performance. - Implement size checks for messages to prevent overflow, with logging for oversized messages. - Introduce additional error handling and logging for socket write operations, ensuring robust message delivery and queue management. - Maintain thread safety by validating user data and mutex locking before queuing messages when direct sending fails. --- src/websocket_server.cpp | 67 +++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/src/websocket_server.cpp b/src/websocket_server.cpp index 5574975..477c926 100644 --- a/src/websocket_server.cpp +++ b/src/websocket_server.cpp @@ -365,34 +365,59 @@ int WebSocketServer::wsCallback(struct lws *wsi, break; } - // Lege Nachricht in die Queue, ohne sofort lws_callback_on_writable aufzurufen - // Kopiere die Nachricht und verwende sendMessageToConnection + // Versuche, die Nachricht direkt zu senden, ohne die Queue zu verwenden + // Das vermeidet Probleme mit dem Mutex während des Callbacks try { - std::cout << "[RECEIVE] Erstelle messageStr..." << std::endl; std::string messageStr = errorResponse.dump(); - std::cout << "[RECEIVE] messageStr erstellt: " << messageStr.length() << " Bytes" << std::endl; + std::cout << "[RECEIVE] Versuche Nachricht direkt zu senden: " << messageStr.length() << " Bytes" << std::endl; - // Prüfe ob instance, wsi und ud noch gültig sind - if (!instance) { - std::cerr << "[RECEIVE] instance ist nullptr vor sendMessageToConnection" << std::endl; - break; - } - if (!wsi) { - std::cerr << "[RECEIVE] wsi ist nullptr vor sendMessageToConnection" << std::endl; - break; - } - if (!ud) { - std::cerr << "[RECEIVE] ud ist nullptr vor sendMessageToConnection" << std::endl; - break; + // Prüfe ob die Nachricht nicht zu groß ist + if (messageStr.length() > 4096) { + std::cerr << "[RECEIVE] Warnung: Nachricht zu groß (" << messageStr.length() << " Bytes), wird abgeschnitten" << std::endl; + messageStr = messageStr.substr(0, 4096); } - std::cout << "[RECEIVE] Rufe sendMessageToConnection auf..." << std::endl; - instance->sendMessageToConnection(wsi, ud, messageStr); - std::cout << "[RECEIVE] sendMessageToConnection erfolgreich aufgerufen" << std::endl; + // Versuche, die Nachricht direkt zu senden + // In libwebsockets können wir lws_write während eines RECEIVE-Callbacks aufrufen, + // aber nur wenn der Socket schreibbar ist. Wenn nicht, müssen wir lws_callback_on_writable aufrufen. + unsigned char buf[LWS_PRE + messageStr.length()]; + memcpy(buf + LWS_PRE, messageStr.c_str(), messageStr.length()); + int ret = lws_write(wsi, buf + LWS_PRE, messageStr.length(), LWS_WRITE_TEXT); + + if (ret < 0) { + // Socket ist nicht schreibbar, verwende lws_callback_on_writable + // und speichere die Nachricht in einer temporären Variable + std::cout << "[RECEIVE] Socket nicht schreibbar (ret=" << ret << "), verwende callback_on_writable" << std::endl; + + // Versuche, die Nachricht in die Queue zu legen, aber mit zusätzlichen Prüfungen + if (instance && wsi && ud) { + // Prüfe, ob ud und der Mutex gültig sind + try { + // Test-Zugriff auf ud, um sicherzustellen, dass es gültig ist + volatile bool test = ud->pongReceived; + (void)test; + + // Versuche, den Mutex zu locken + // Verwende try_lock, um zu prüfen, ob der Mutex verfügbar ist + std::unique_lock lock(ud->messageQueueMutex, std::try_to_lock); + if (lock.owns_lock()) { + ud->messageQueue.push(messageStr); + std::cout << "[RECEIVE] Nachricht zur Queue hinzugefügt" << std::endl; + lws_callback_on_writable(wsi); + } else { + std::cerr << "[RECEIVE] Mutex konnte nicht gelockt werden, Nachricht wird verworfen" << std::endl; + } + } catch (...) { + std::cerr << "[RECEIVE] Fehler beim Zugriff auf Queue, Nachricht wird verworfen" << std::endl; + } + } + } else { + std::cout << "[RECEIVE] Nachricht direkt gesendet (" << ret << " Bytes)" << std::endl; + } } catch (const std::exception &e) { - std::cerr << "[RECEIVE] Exception beim Aufruf von sendMessageToConnection: " << e.what() << std::endl; + std::cerr << "[RECEIVE] Exception beim Senden der Nachricht: " << e.what() << std::endl; } catch (...) { - std::cerr << "[RECEIVE] Unbekannte Exception beim Aufruf von sendMessageToConnection" << std::endl; + std::cerr << "[RECEIVE] Unbekannte Exception beim Senden der Nachricht" << std::endl; } // Verwende lws_cancel_service, um den Service zu benachrichtigen