Refactor WebSocket server message queuing and error handling
- Implement message queuing for error responses during WebSocket callbacks to prevent immediate sending, enhancing stability. - Utilize lws_cancel_service to trigger the writable callback safely, ensuring messages are sent correctly after the callback execution. - Improve error handling and logging for message sending operations, providing clearer insights into potential issues.
This commit is contained in:
committed by
Torsten (PC)
parent
0c36c4a4e5
commit
2d3d120f81
@@ -322,6 +322,8 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
|||||||
|
|
||||||
if (ud->userId.empty()) {
|
if (ud->userId.empty()) {
|
||||||
std::cerr << "[RECEIVE] getConnections: User-ID nicht gesetzt" << std::endl;
|
std::cerr << "[RECEIVE] getConnections: User-ID nicht gesetzt" << std::endl;
|
||||||
|
// Sende Fehlerantwort nicht während des Callbacks, sondern lege sie in die Queue
|
||||||
|
// und triggere den WRITEABLE-Callback später
|
||||||
try {
|
try {
|
||||||
json errorResponse = {
|
json errorResponse = {
|
||||||
{"event", "getConnectionsResponse"},
|
{"event", "getConnectionsResponse"},
|
||||||
@@ -329,7 +331,15 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
|||||||
{"error", "User-ID nicht gesetzt"}
|
{"error", "User-ID nicht gesetzt"}
|
||||||
};
|
};
|
||||||
if (instance && wsi && ud) {
|
if (instance && wsi && ud) {
|
||||||
instance->sendMessageToConnection(wsi, ud, errorResponse.dump());
|
// Lege Nachricht in die Queue, ohne sofort lws_callback_on_writable aufzurufen
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
||||||
|
ud->messageQueue.push(errorResponse.dump());
|
||||||
|
}
|
||||||
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
||||||
|
if (instance->context) {
|
||||||
|
lws_cancel_service(instance->context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
std::cerr << "[RECEIVE] Fehler beim Senden der Fehlerantwort: " << e.what() << std::endl;
|
std::cerr << "[RECEIVE] Fehler beim Senden der Fehlerantwort: " << e.what() << std::endl;
|
||||||
@@ -350,7 +360,15 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
|||||||
{"error", "Zugriff verweigert: Nur Mainadmin-User können Verbindungen abfragen"}
|
{"error", "Zugriff verweigert: Nur Mainadmin-User können Verbindungen abfragen"}
|
||||||
};
|
};
|
||||||
if (instance && wsi && ud) {
|
if (instance && wsi && ud) {
|
||||||
instance->sendMessageToConnection(wsi, ud, errorResponse.dump());
|
// Lege Nachricht in die Queue, ohne sofort lws_callback_on_writable aufzurufen
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
||||||
|
ud->messageQueue.push(errorResponse.dump());
|
||||||
|
}
|
||||||
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
||||||
|
if (instance->context) {
|
||||||
|
lws_cancel_service(instance->context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -366,7 +384,14 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
|||||||
if (instance && wsi && ud) {
|
if (instance && wsi && ud) {
|
||||||
// Sende die Nachricht, aber nicht während des Callbacks
|
// Sende die Nachricht, aber nicht während des Callbacks
|
||||||
// Die Nachricht wird in die Queue gelegt und beim nächsten WRITEABLE gesendet
|
// Die Nachricht wird in die Queue gelegt und beim nächsten WRITEABLE gesendet
|
||||||
instance->sendMessageToConnection(wsi, ud, response.dump());
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
||||||
|
ud->messageQueue.push(response.dump());
|
||||||
|
}
|
||||||
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
||||||
|
if (instance->context) {
|
||||||
|
lws_cancel_service(instance->context);
|
||||||
|
}
|
||||||
std::cout << "[RECEIVE] getConnections: Verbindungen an Mainadmin gesendet (" << response.dump().length() << " Bytes)" << std::endl;
|
std::cout << "[RECEIVE] getConnections: Verbindungen an Mainadmin gesendet (" << response.dump().length() << " Bytes)" << std::endl;
|
||||||
}
|
}
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
@@ -379,7 +404,15 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
|||||||
{"error", std::string("Fehler beim Abrufen der Verbindungen: ") + e.what()}
|
{"error", std::string("Fehler beim Abrufen der Verbindungen: ") + e.what()}
|
||||||
};
|
};
|
||||||
if (instance && wsi && ud) {
|
if (instance && wsi && ud) {
|
||||||
instance->sendMessageToConnection(wsi, ud, errorResponse.dump());
|
// Lege Nachricht in die Queue, ohne sofort lws_callback_on_writable aufzurufen
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
||||||
|
ud->messageQueue.push(errorResponse.dump());
|
||||||
|
}
|
||||||
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
||||||
|
if (instance->context) {
|
||||||
|
lws_cancel_service(instance->context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
// Ignoriere Fehler beim Senden der Fehlerantwort
|
// Ignoriere Fehler beim Senden der Fehlerantwort
|
||||||
@@ -756,6 +789,9 @@ void WebSocketServer::sendMessageToConnection(struct lws *wsi, WebSocketUserData
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Prüfe ob wsi noch gültig ist (wenn verfügbar)
|
||||||
|
// lws_wsi_is_valid ist nicht in allen Versionen verfügbar, daher try-catch
|
||||||
|
|
||||||
bool wasEmpty = false;
|
bool wasEmpty = false;
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
||||||
@@ -766,11 +802,17 @@ void WebSocketServer::sendMessageToConnection(struct lws *wsi, WebSocketUserData
|
|||||||
// Nur wenn die Queue leer war, den Callback aufrufen
|
// Nur wenn die Queue leer war, den Callback aufrufen
|
||||||
// (sonst wird er bereits durch den WRITEABLE-Handler aufgerufen)
|
// (sonst wird er bereits durch den WRITEABLE-Handler aufgerufen)
|
||||||
if (wasEmpty) {
|
if (wasEmpty) {
|
||||||
// lws_callback_on_writable kann sicher während eines Callbacks aufgerufen werden
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
||||||
lws_callback_on_writable(wsi);
|
// Das ist sicherer, wenn wir uns in einem Callback befinden
|
||||||
|
// lws_cancel_service triggert einen Service-Loop, der dann LWS_CALLBACK_SERVER_WRITEABLE aufruft
|
||||||
|
if (context) {
|
||||||
|
lws_cancel_service(context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
std::cerr << "[sendMessageToConnection] Fehler: " << e.what() << std::endl;
|
std::cerr << "[sendMessageToConnection] Fehler: " << e.what() << std::endl;
|
||||||
|
} catch (...) {
|
||||||
|
std::cerr << "[sendMessageToConnection] Unbekannter Fehler" << std::endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user