Refactor message sending logic in WebSocket server to improve direct transmission and error handling
- Attempt to send messages directly during the RECEIVE callback to avoid mutex issues, enhancing performance. - Implement size checks for messages to prevent overflow, with logging for oversized messages. - Introduce additional error handling and logging for socket write operations, ensuring robust message delivery and queue management. - Maintain thread safety by validating user data and mutex locking before queuing messages when direct sending fails.
This commit is contained in:
committed by
Torsten (PC)
parent
1af4b6c2e4
commit
d71df901ed
@@ -365,34 +365,59 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lege Nachricht in die Queue, ohne sofort lws_callback_on_writable aufzurufen
|
// Versuche, die Nachricht direkt zu senden, ohne die Queue zu verwenden
|
||||||
// Kopiere die Nachricht und verwende sendMessageToConnection
|
// Das vermeidet Probleme mit dem Mutex während des Callbacks
|
||||||
try {
|
try {
|
||||||
std::cout << "[RECEIVE] Erstelle messageStr..." << std::endl;
|
|
||||||
std::string messageStr = errorResponse.dump();
|
std::string messageStr = errorResponse.dump();
|
||||||
std::cout << "[RECEIVE] messageStr erstellt: " << messageStr.length() << " Bytes" << std::endl;
|
std::cout << "[RECEIVE] Versuche Nachricht direkt zu senden: " << messageStr.length() << " Bytes" << std::endl;
|
||||||
|
|
||||||
// Prüfe ob instance, wsi und ud noch gültig sind
|
// Prüfe ob die Nachricht nicht zu groß ist
|
||||||
if (!instance) {
|
if (messageStr.length() > 4096) {
|
||||||
std::cerr << "[RECEIVE] instance ist nullptr vor sendMessageToConnection" << std::endl;
|
std::cerr << "[RECEIVE] Warnung: Nachricht zu groß (" << messageStr.length() << " Bytes), wird abgeschnitten" << std::endl;
|
||||||
break;
|
messageStr = messageStr.substr(0, 4096);
|
||||||
}
|
|
||||||
if (!wsi) {
|
|
||||||
std::cerr << "[RECEIVE] wsi ist nullptr vor sendMessageToConnection" << std::endl;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!ud) {
|
|
||||||
std::cerr << "[RECEIVE] ud ist nullptr vor sendMessageToConnection" << std::endl;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "[RECEIVE] Rufe sendMessageToConnection auf..." << std::endl;
|
// Versuche, die Nachricht direkt zu senden
|
||||||
instance->sendMessageToConnection(wsi, ud, messageStr);
|
// In libwebsockets können wir lws_write während eines RECEIVE-Callbacks aufrufen,
|
||||||
std::cout << "[RECEIVE] sendMessageToConnection erfolgreich aufgerufen" << std::endl;
|
// aber nur wenn der Socket schreibbar ist. Wenn nicht, müssen wir lws_callback_on_writable aufrufen.
|
||||||
} catch (const std::exception &e) {
|
unsigned char buf[LWS_PRE + messageStr.length()];
|
||||||
std::cerr << "[RECEIVE] Exception beim Aufruf von sendMessageToConnection: " << e.what() << std::endl;
|
memcpy(buf + LWS_PRE, messageStr.c_str(), messageStr.length());
|
||||||
|
int ret = lws_write(wsi, buf + LWS_PRE, messageStr.length(), LWS_WRITE_TEXT);
|
||||||
|
|
||||||
|
if (ret < 0) {
|
||||||
|
// Socket ist nicht schreibbar, verwende lws_callback_on_writable
|
||||||
|
// und speichere die Nachricht in einer temporären Variable
|
||||||
|
std::cout << "[RECEIVE] Socket nicht schreibbar (ret=" << ret << "), verwende callback_on_writable" << std::endl;
|
||||||
|
|
||||||
|
// Versuche, die Nachricht in die Queue zu legen, aber mit zusätzlichen Prüfungen
|
||||||
|
if (instance && wsi && ud) {
|
||||||
|
// Prüfe, ob ud und der Mutex gültig sind
|
||||||
|
try {
|
||||||
|
// Test-Zugriff auf ud, um sicherzustellen, dass es gültig ist
|
||||||
|
volatile bool test = ud->pongReceived;
|
||||||
|
(void)test;
|
||||||
|
|
||||||
|
// Versuche, den Mutex zu locken
|
||||||
|
// Verwende try_lock, um zu prüfen, ob der Mutex verfügbar ist
|
||||||
|
std::unique_lock<std::mutex> lock(ud->messageQueueMutex, std::try_to_lock);
|
||||||
|
if (lock.owns_lock()) {
|
||||||
|
ud->messageQueue.push(messageStr);
|
||||||
|
std::cout << "[RECEIVE] Nachricht zur Queue hinzugefügt" << std::endl;
|
||||||
|
lws_callback_on_writable(wsi);
|
||||||
|
} else {
|
||||||
|
std::cerr << "[RECEIVE] Mutex konnte nicht gelockt werden, Nachricht wird verworfen" << std::endl;
|
||||||
|
}
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
std::cerr << "[RECEIVE] Unbekannte Exception beim Aufruf von sendMessageToConnection" << std::endl;
|
std::cerr << "[RECEIVE] Fehler beim Zugriff auf Queue, Nachricht wird verworfen" << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
std::cout << "[RECEIVE] Nachricht direkt gesendet (" << ret << " Bytes)" << std::endl;
|
||||||
|
}
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
std::cerr << "[RECEIVE] Exception beim Senden der Nachricht: " << e.what() << std::endl;
|
||||||
|
} catch (...) {
|
||||||
|
std::cerr << "[RECEIVE] Unbekannte Exception beim Senden der Nachricht" << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
||||||
|
|||||||
Reference in New Issue
Block a user