- Introduce detailed logging for the message sending process, including checks for user data validity and message queue status. - Implement additional null checks for user data before and after locking the mutex to ensure thread safety. - Ensure proper message copying to maintain validity during queuing, improving overall stability and error visibility.
988 lines
46 KiB
C++
988 lines
46 KiB
C++
#include "websocket_server.h"
|
|
#include "connection_guard.h"
|
|
#include "worker.h"
|
|
#include <iostream>
|
|
#include <chrono>
|
|
#include <cstring>
|
|
#include <future>
|
|
#include <algorithm>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <netinet/tcp.h>
|
|
|
|
using json = nlohmann::json;
|
|
|
|
// Protocols array definition
|
|
struct lws_protocols WebSocketServer::protocols[] = {
|
|
{
|
|
"", // Leeres Protokoll für Standard-WebSocket-Verbindungen
|
|
WebSocketServer::wsCallback,
|
|
sizeof(WebSocketUserData),
|
|
4096
|
|
},
|
|
{
|
|
"yourpart-protocol",
|
|
WebSocketServer::wsCallback,
|
|
sizeof(WebSocketUserData),
|
|
4096
|
|
},
|
|
{ nullptr, nullptr, 0, 0 }
|
|
};
|
|
|
|
// Static instance pointer
|
|
WebSocketServer* WebSocketServer::instance = nullptr;
|
|
|
|
WebSocketServer::WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker,
|
|
bool useSSL, const std::string& certPath, const std::string& keyPath)
|
|
: port(port), pool(pool), broker(broker), useSSL(useSSL), certPath(certPath), keyPath(keyPath) {
|
|
instance = this;
|
|
}
|
|
|
|
WebSocketServer::~WebSocketServer() {
|
|
stop();
|
|
instance = nullptr;
|
|
}
|
|
|
|
void WebSocketServer::run() {
|
|
running = true;
|
|
broker.subscribe([this](const std::string &msg) {
|
|
{
|
|
std::lock_guard<std::mutex> lock(queueMutex);
|
|
messageQueue.push(msg);
|
|
}
|
|
queueCV.notify_one();
|
|
});
|
|
serverThread = std::thread([this](){ startServer(); });
|
|
messageThread = std::thread([this](){ processMessageQueue(); });
|
|
pingThread = std::thread([this](){ pingClients(); });
|
|
|
|
// Warte kurz bis alle Threads gestartet sind
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
}
|
|
|
|
void WebSocketServer::stop() {
|
|
running = false;
|
|
if (context) lws_cancel_service(context);
|
|
|
|
// Stoppe Threads mit Timeout
|
|
std::vector<std::future<void>> futures;
|
|
|
|
if (serverThread.joinable()) {
|
|
futures.push_back(std::async(std::launch::async, [this]() { serverThread.join(); }));
|
|
}
|
|
if (messageThread.joinable()) {
|
|
futures.push_back(std::async(std::launch::async, [this]() { messageThread.join(); }));
|
|
}
|
|
if (pingThread.joinable()) {
|
|
futures.push_back(std::async(std::launch::async, [this]() { pingThread.join(); }));
|
|
}
|
|
|
|
// Warte auf alle Threads mit Timeout
|
|
for (auto& future : futures) {
|
|
if (future.wait_for(std::chrono::milliseconds(1000)) == std::future_status::timeout) {
|
|
std::cerr << "WebSocket-Thread beendet sich nicht, erzwinge Beendigung..." << std::endl;
|
|
}
|
|
}
|
|
|
|
// Force detach alle Threads
|
|
if (serverThread.joinable()) serverThread.detach();
|
|
if (messageThread.joinable()) messageThread.detach();
|
|
if (pingThread.joinable()) pingThread.detach();
|
|
|
|
if (context) {
|
|
lws_context_destroy(context);
|
|
context = nullptr;
|
|
}
|
|
}
|
|
|
|
void WebSocketServer::startServer() {
|
|
// Kurze Wartezeit, falls ein vorheriger Prozess den Port noch freigibt
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
struct lws_context_creation_info info;
|
|
memset(&info, 0, sizeof(info));
|
|
info.port = port;
|
|
info.protocols = protocols;
|
|
|
|
// Setze Socket-Optionen-Callback für SO_REUSEADDR
|
|
// Hinweis: In älteren libwebsockets-Versionen muss SO_REUSEADDR manuell gesetzt werden
|
|
// Wir versuchen es über einen Callback, falls verfügbar
|
|
|
|
// Server-Optionen für mehrere gleichzeitige Verbindungen
|
|
info.options = LWS_SERVER_OPTION_VALIDATE_UTF8 |
|
|
LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE |
|
|
LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME;
|
|
|
|
// Erlaube mehrere Verbindungen pro IP
|
|
info.ka_time = 60;
|
|
info.ka_probes = 10;
|
|
info.ka_interval = 10;
|
|
|
|
// SSL/TLS Konfiguration
|
|
if (useSSL) {
|
|
if (certPath.empty() || keyPath.empty()) {
|
|
throw std::runtime_error("SSL enabled but certificate or key path not provided");
|
|
}
|
|
info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
|
|
info.ssl_cert_filepath = certPath.c_str();
|
|
info.ssl_private_key_filepath = keyPath.c_str();
|
|
std::cout << "WebSocket SSL Server starting on port " << port << " with certificates: "
|
|
<< certPath << " / " << keyPath << std::endl;
|
|
} else {
|
|
std::cout << "WebSocket Server starting on port " << port << " (no SSL)" << std::endl;
|
|
}
|
|
|
|
// Erhöhe Log-Level für besseres Debugging
|
|
setenv("LWS_LOG_LEVEL", "7", 1); // 7 = alle Logs
|
|
|
|
context = lws_create_context(&info);
|
|
if (!context) {
|
|
std::string errorMsg = "Failed to create LWS context on port " + std::to_string(port);
|
|
errorMsg += ". Port may be in use or insufficient permissions.";
|
|
std::cerr << errorMsg << std::endl;
|
|
throw std::runtime_error(errorMsg);
|
|
}
|
|
|
|
std::cout << "WebSocket-Server erfolgreich gestartet auf Port " << port << std::endl;
|
|
|
|
while (running) {
|
|
int ret = lws_service(context, 50);
|
|
if (ret < 0) {
|
|
std::cerr << "WebSocket-Server Fehler: lws_service returned " << ret << std::endl;
|
|
// Bei kritischen Fehlern beenden, sonst weiterlaufen
|
|
if (ret == -1) {
|
|
std::cerr << "Kritischer Fehler im WebSocket-Server, beende..." << std::endl;
|
|
break;
|
|
}
|
|
}
|
|
// Kurze Pause für bessere Shutdown-Responsivität
|
|
if (running) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
}
|
|
}
|
|
|
|
std::cout << "WebSocket-Server wird beendet..." << std::endl;
|
|
}
|
|
|
|
void WebSocketServer::processMessageQueue() {
|
|
while (running) {
|
|
std::unique_lock<std::mutex> lock(queueMutex);
|
|
queueCV.wait_for(lock, std::chrono::milliseconds(100), [this](){ return !messageQueue.empty() || !running; });
|
|
while (!messageQueue.empty() && running) {
|
|
std::string msg = std::move(messageQueue.front());
|
|
messageQueue.pop();
|
|
lock.unlock();
|
|
handleBrokerMessage(msg);
|
|
lock.lock();
|
|
}
|
|
}
|
|
}
|
|
|
|
void WebSocketServer::pingClients() {
|
|
while (running) {
|
|
// Kürzere Sleep-Intervalle für bessere Shutdown-Responsivität
|
|
for (int i = 0; i < WebSocketUserData::PING_INTERVAL_SECONDS * 10 && running; ++i) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
}
|
|
|
|
if (!running || !context) continue;
|
|
|
|
auto now = std::chrono::steady_clock::now();
|
|
std::vector<struct lws*> toDisconnect;
|
|
|
|
// Prüfe alle Verbindungen auf Timeouts
|
|
{
|
|
std::shared_lock<std::shared_mutex> lock(connectionsMutex);
|
|
for (auto& pair : connections) {
|
|
for (auto* wsi : pair.second) {
|
|
auto* ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
|
if (!ud) continue;
|
|
|
|
// Prüfe ob Pong-Timeout erreicht wurde
|
|
auto timeSincePing = std::chrono::duration_cast<std::chrono::seconds>(now - ud->lastPingTime).count();
|
|
auto timeSincePong = std::chrono::duration_cast<std::chrono::seconds>(now - ud->lastPongTime).count();
|
|
|
|
if (!ud->pongReceived && timeSincePing > WebSocketUserData::PONG_TIMEOUT_SECONDS) {
|
|
ud->pingTimeoutCount++;
|
|
std::cout << "Ping-Timeout für User " << ud->userId << " (Versuch " << ud->pingTimeoutCount << "/" << WebSocketUserData::MAX_PING_TIMEOUTS << ")" << std::endl;
|
|
|
|
if (ud->pingTimeoutCount >= WebSocketUserData::MAX_PING_TIMEOUTS) {
|
|
std::cout << "Verbindung wird getrennt: Zu viele Ping-Timeouts für User " << ud->userId << std::endl;
|
|
toDisconnect.push_back(wsi);
|
|
} else {
|
|
// Reset für nächsten Versuch
|
|
ud->pongReceived = true;
|
|
ud->lastPongTime = now;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Trenne problematische Verbindungen
|
|
for (auto* wsi : toDisconnect) {
|
|
lws_close_reason(wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, (unsigned char*)"Ping timeout", 12);
|
|
}
|
|
|
|
// Sende Pings an alle aktiven Verbindungen
|
|
if (running) {
|
|
lws_callback_on_writable_all_protocol(context, &protocols[0]);
|
|
}
|
|
}
|
|
}
|
|
|
|
int WebSocketServer::wsCallback(struct lws *wsi,
|
|
enum lws_callback_reasons reason,
|
|
void *user, void *in, size_t len) {
|
|
if (!instance) return 0;
|
|
|
|
auto *ud = reinterpret_cast<WebSocketUserData*>(user);
|
|
|
|
switch (reason) {
|
|
case LWS_CALLBACK_ESTABLISHED: {
|
|
if (!ud) {
|
|
std::cerr << "[ESTABLISHED] ud ist nullptr" << std::endl;
|
|
return 0;
|
|
}
|
|
ud->pongReceived = true;
|
|
ud->connectionTime = std::chrono::steady_clock::now();
|
|
ud->lastPingTime = std::chrono::steady_clock::now();
|
|
ud->lastPongTime = std::chrono::steady_clock::now();
|
|
ud->pingTimeoutCount = 0;
|
|
|
|
// Füge Verbindung zur Liste aller Verbindungen hinzu
|
|
{
|
|
std::unique_lock<std::shared_mutex> lock(instance->connectionsMutex);
|
|
instance->allConnections.push_back(wsi);
|
|
}
|
|
|
|
const char* protocolName = lws_get_protocol(wsi)->name;
|
|
std::cout << "WebSocket-Verbindung hergestellt (Protokoll: " << (protocolName ? protocolName : "Standard") << ")" << std::endl;
|
|
char client_addr[128];
|
|
lws_get_peer_simple(wsi, client_addr, sizeof(client_addr));
|
|
std::cout << "Client-Adresse: " << client_addr << std::endl;
|
|
break;
|
|
}
|
|
case LWS_CALLBACK_RECEIVE_PONG:
|
|
// WebSocket Pong-Frame empfangen (automatische Antwort auf Ping)
|
|
if (!ud) {
|
|
std::cerr << "[RECEIVE_PONG] ud ist nullptr" << std::endl;
|
|
return 0;
|
|
}
|
|
ud->pongReceived = true;
|
|
ud->lastPongTime = std::chrono::steady_clock::now();
|
|
ud->pingTimeoutCount = 0;
|
|
// std::cout << "Pong-Frame von Client empfangen" << std::endl;
|
|
return 0;
|
|
case LWS_CALLBACK_RECEIVE: {
|
|
if (!ud) {
|
|
std::cerr << "[RECEIVE] ud ist nullptr" << std::endl;
|
|
return 0;
|
|
}
|
|
|
|
std::string msg(reinterpret_cast<char*>(in), len);
|
|
std::cout << "WebSocket-Nachricht empfangen: " << msg << std::endl;
|
|
|
|
// Fallback: Pong als Text-Nachricht (für Kompatibilität)
|
|
if (msg == "pong") {
|
|
ud->pongReceived = true;
|
|
ud->lastPongTime = std::chrono::steady_clock::now();
|
|
ud->pingTimeoutCount = 0;
|
|
std::cout << "Pong (Text) von Client empfangen" << std::endl;
|
|
break;
|
|
}
|
|
|
|
try {
|
|
json parsed = json::parse(msg);
|
|
std::cout << "[RECEIVE] Nachricht empfangen: " << msg << std::endl;
|
|
|
|
if (parsed.contains("event")) {
|
|
std::string event = parsed["event"].get<std::string>();
|
|
|
|
if (event == "setUserId") {
|
|
if (parsed.contains("data") && parsed["data"].contains("userId")) {
|
|
std::string newUserId = parsed["data"]["userId"].get<std::string>();
|
|
|
|
// Wenn die Verbindung bereits unter einer anderen userId registriert ist, entferne die alte Registrierung
|
|
if (!ud->userId.empty() && ud->userId != newUserId) {
|
|
std::cout << "[RECEIVE] User-ID ändert sich von " << ud->userId << " zu " << newUserId << ", entferne alte Registrierung" << std::endl;
|
|
instance->removeConnection(ud->userId, wsi);
|
|
}
|
|
|
|
ud->userId = newUserId;
|
|
std::cout << "[RECEIVE] User-ID gesetzt: " << ud->userId << std::endl;
|
|
|
|
// Verbindung in der Map speichern
|
|
instance->addConnection(ud->userId, wsi);
|
|
std::cout << "[RECEIVE] Verbindung gespeichert" << std::endl;
|
|
} else {
|
|
std::cerr << "[RECEIVE] setUserId-Event ohne data.userId-Feld" << std::endl;
|
|
}
|
|
} else if (event == "getConnections") {
|
|
// Admin-Funktion: Liste aller aktiven Verbindungen
|
|
std::cout << "[RECEIVE] getConnections: Start" << std::endl;
|
|
if (!ud) {
|
|
std::cerr << "[RECEIVE] getConnections: ud ist nullptr" << std::endl;
|
|
break;
|
|
}
|
|
std::cout << "[RECEIVE] getConnections: ud ist gültig" << std::endl;
|
|
|
|
// Prüfe ob ud noch gültig ist, bevor wir darauf zugreifen
|
|
try {
|
|
volatile bool test = ud->pongReceived;
|
|
(void)test;
|
|
std::cout << "[RECEIVE] getConnections: ud-Zugriff erfolgreich" << std::endl;
|
|
} catch (...) {
|
|
std::cerr << "[RECEIVE] getConnections: ud ist ungültig (Exception beim Zugriff)" << std::endl;
|
|
break;
|
|
}
|
|
|
|
if (ud->userId.empty()) {
|
|
std::cerr << "[RECEIVE] getConnections: User-ID nicht gesetzt" << std::endl;
|
|
// Sende Fehlerantwort nicht während des Callbacks, sondern lege sie in die Queue
|
|
// und triggere den WRITEABLE-Callback später
|
|
try {
|
|
std::cout << "[RECEIVE] getConnections: make response" << std::endl;
|
|
json errorResponse = {
|
|
{"event", "getConnectionsResponse"},
|
|
{"success", false},
|
|
{"error", "User-ID nicht gesetzt"}
|
|
};
|
|
std::cout << "errorResponse: " << errorResponse.dump() << std::endl;
|
|
if (instance && wsi && ud) {
|
|
std::cout << "instance: " << instance << std::endl;
|
|
std::cout << "wsi: " << wsi << std::endl;
|
|
std::cout << "ud: " << ud << std::endl;
|
|
|
|
// Prüfe ob ud noch gültig ist, indem wir versuchen, auf ein einfaches Feld zuzugreifen
|
|
try {
|
|
// Test-Zugriff auf ud, um zu prüfen ob es gültig ist
|
|
volatile bool test = ud->pongReceived;
|
|
(void)test; // Unterdrücke Warnung
|
|
std::cout << "ud ist gültig, pongReceived: " << ud->pongReceived << std::endl;
|
|
} catch (...) {
|
|
std::cerr << "[RECEIVE] ud ist ungültig (Exception beim Zugriff)" << std::endl;
|
|
break;
|
|
}
|
|
|
|
// Lege Nachricht in die Queue, ohne sofort lws_callback_on_writable aufzurufen
|
|
// Kopiere die Nachricht und verwende sendMessageToConnection
|
|
try {
|
|
std::cout << "[RECEIVE] Erstelle messageStr..." << std::endl;
|
|
std::string messageStr = errorResponse.dump();
|
|
std::cout << "[RECEIVE] messageStr erstellt: " << messageStr.length() << " Bytes" << std::endl;
|
|
|
|
// Prüfe ob instance, wsi und ud noch gültig sind
|
|
if (!instance) {
|
|
std::cerr << "[RECEIVE] instance ist nullptr vor sendMessageToConnection" << std::endl;
|
|
break;
|
|
}
|
|
if (!wsi) {
|
|
std::cerr << "[RECEIVE] wsi ist nullptr vor sendMessageToConnection" << std::endl;
|
|
break;
|
|
}
|
|
if (!ud) {
|
|
std::cerr << "[RECEIVE] ud ist nullptr vor sendMessageToConnection" << std::endl;
|
|
break;
|
|
}
|
|
|
|
std::cout << "[RECEIVE] Rufe sendMessageToConnection auf..." << std::endl;
|
|
instance->sendMessageToConnection(wsi, ud, messageStr);
|
|
std::cout << "[RECEIVE] sendMessageToConnection erfolgreich aufgerufen" << std::endl;
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[RECEIVE] Exception beim Aufruf von sendMessageToConnection: " << e.what() << std::endl;
|
|
} catch (...) {
|
|
std::cerr << "[RECEIVE] Unbekannte Exception beim Aufruf von sendMessageToConnection" << std::endl;
|
|
}
|
|
|
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
|
if (instance->context) {
|
|
std::cout << "Rufe lws_cancel_service auf..." << std::endl;
|
|
lws_cancel_service(instance->context);
|
|
std::cout << "lws_cancel_service(instance->context) done" << std::endl;
|
|
} else {
|
|
std::cerr << "[RECEIVE] instance->context ist nullptr" << std::endl;
|
|
}
|
|
} else {
|
|
std::cerr << "[RECEIVE] instance, wsi oder ud ist nullptr" << std::endl;
|
|
}
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[RECEIVE] Fehler beim Senden der Fehlerantwort: " << e.what() << std::endl;
|
|
} catch (...) {
|
|
std::cerr << "[RECEIVE] Unbekannter Fehler beim Senden der Fehlerantwort" << std::endl;
|
|
}
|
|
break;
|
|
}
|
|
|
|
// Prüfe Mainadmin-Rechte und sende Antwort asynchron
|
|
// (nicht während des Callbacks, um Verbindungsprobleme zu vermeiden)
|
|
try {
|
|
if (!instance || !instance->isMainAdmin(ud->userId)) {
|
|
std::cerr << "[RECEIVE] getConnections: Zugriff verweigert für User " << ud->userId << std::endl;
|
|
json errorResponse = {
|
|
{"event", "getConnectionsResponse"},
|
|
{"success", false},
|
|
{"error", "Zugriff verweigert: Nur Mainadmin-User können Verbindungen abfragen"}
|
|
};
|
|
if (instance && wsi && ud) {
|
|
// Lege Nachricht in die Queue, ohne sofort lws_callback_on_writable aufzurufen
|
|
{
|
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
|
ud->messageQueue.push(errorResponse.dump());
|
|
}
|
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
|
if (instance->context) {
|
|
lws_cancel_service(instance->context);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
// Hole aktive Verbindungen und sende Antwort
|
|
// Wichtig: getActiveConnections() sollte schnell sein und keine langen Operationen durchführen
|
|
json connections = instance->getActiveConnections();
|
|
json response = {
|
|
{"event", "getConnectionsResponse"},
|
|
{"success", true},
|
|
{"data", connections}
|
|
};
|
|
if (instance && wsi && ud) {
|
|
// Verwende sendMessageToConnection, das bereits alle notwendigen Prüfungen hat
|
|
instance->sendMessageToConnection(wsi, ud, response.dump());
|
|
std::cout << "[RECEIVE] getConnections: Verbindungen an Mainadmin gesendet (" << response.dump().length() << " Bytes)" << std::endl;
|
|
}
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[RECEIVE] Fehler bei getConnections: " << e.what() << std::endl;
|
|
// Sende Fehlerantwort
|
|
try {
|
|
json errorResponse = {
|
|
{"event", "getConnectionsResponse"},
|
|
{"success", false},
|
|
{"error", std::string("Fehler beim Abrufen der Verbindungen: ") + e.what()}
|
|
};
|
|
if (instance && wsi && ud) {
|
|
// Lege Nachricht in die Queue, ohne sofort lws_callback_on_writable aufzurufen
|
|
{
|
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
|
ud->messageQueue.push(errorResponse.dump());
|
|
}
|
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
|
if (instance->context) {
|
|
lws_cancel_service(instance->context);
|
|
}
|
|
}
|
|
} catch (...) {
|
|
// Ignoriere Fehler beim Senden der Fehlerantwort
|
|
}
|
|
}
|
|
} else {
|
|
std::cout << "[RECEIVE] Unbekanntes Event: " << event << std::endl;
|
|
}
|
|
} else {
|
|
std::cout << "[RECEIVE] Nachricht ohne event-Feld" << std::endl;
|
|
}
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[RECEIVE] Fehler beim Parsen der WebSocket-Nachricht: " << e.what() << std::endl;
|
|
}
|
|
break;
|
|
}
|
|
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
|
if (!ud) {
|
|
std::cerr << "[WRITEABLE] ud ist nullptr" << std::endl;
|
|
return 0;
|
|
}
|
|
// Prüfe ob es eine Nachricht zum Senden gibt
|
|
std::string messageToSend;
|
|
{
|
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
|
if (!ud->messageQueue.empty()) {
|
|
messageToSend = std::move(ud->messageQueue.front());
|
|
ud->messageQueue.pop();
|
|
}
|
|
}
|
|
|
|
if (!messageToSend.empty()) {
|
|
// Prüfe ob Nachricht zu groß ist (max 4096 Bytes)
|
|
if (messageToSend.length() > 4096) {
|
|
std::cerr << "[WRITEABLE] Warnung: Nachricht zu groß (" << messageToSend.length() << " Bytes), wird abgeschnitten" << std::endl;
|
|
messageToSend = messageToSend.substr(0, 4096);
|
|
}
|
|
|
|
// Nachricht senden
|
|
std::cout << "[WRITEABLE] Sende Nachricht (" << messageToSend.length() << " Bytes): " << (messageToSend.length() > 100 ? messageToSend.substr(0, 100) + "..." : messageToSend) << std::endl;
|
|
unsigned char buf[LWS_PRE + messageToSend.length()];
|
|
memcpy(buf + LWS_PRE, messageToSend.c_str(), messageToSend.length());
|
|
int ret = lws_write(wsi, buf + LWS_PRE, messageToSend.length(), LWS_WRITE_TEXT);
|
|
if (ret < 0) {
|
|
std::cerr << "[WRITEABLE] Fehler beim Senden: lws_write returned " << ret << " - Verbindung wird möglicherweise geschlossen" << std::endl;
|
|
// Bei Fehler: Verbindung wird wahrscheinlich geschlossen, entferne aus Queue
|
|
{
|
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
|
// Leere die Queue, da die Verbindung nicht mehr funktioniert
|
|
while (!ud->messageQueue.empty()) {
|
|
ud->messageQueue.pop();
|
|
}
|
|
}
|
|
// Keine weitere Aktion - die Verbindung wird durch libwebsockets geschlossen
|
|
return -1; // Signalisiert libwebsockets, dass die Verbindung geschlossen werden soll
|
|
} else if (ret != static_cast<int>(messageToSend.length())) {
|
|
std::cerr << "[WRITEABLE] Warnung: Nur " << ret << " von " << messageToSend.length() << " Bytes gesendet" << std::endl;
|
|
} else {
|
|
std::cout << "[WRITEABLE] Nachricht erfolgreich gesendet (" << ret << " Bytes)" << std::endl;
|
|
}
|
|
|
|
// Wenn noch weitere Nachrichten in der Queue sind, wieder schreibbereit machen
|
|
{
|
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
|
if (!ud->messageQueue.empty()) {
|
|
lws_callback_on_writable(wsi);
|
|
}
|
|
}
|
|
} else {
|
|
// WebSocket Ping-Frame senden (nicht Text-Nachricht!)
|
|
ud->lastPingTime = std::chrono::steady_clock::now();
|
|
ud->pongReceived = false;
|
|
// Leeres Ping-Frame senden (Browser antworten automatisch mit Pong)
|
|
unsigned char buf[LWS_PRE + 0];
|
|
lws_write(wsi, buf + LWS_PRE, 0, LWS_WRITE_PING);
|
|
// std::cout << "Ping-Frame an Client gesendet" << std::endl;
|
|
}
|
|
break;
|
|
}
|
|
case LWS_CALLBACK_CLOSED:
|
|
// Verbindung aus der Map entfernen
|
|
if (ud) {
|
|
if (!ud->userId.empty()) {
|
|
instance->removeConnection(ud->userId, wsi);
|
|
std::cout << "WebSocket-Verbindung geschlossen für User: " << ud->userId << std::endl;
|
|
} else {
|
|
// Falls keine userId gesetzt ist, entferne die Verbindung aus allen möglichen Einträgen
|
|
// (Fallback für den Fall, dass setUserId nie aufgerufen wurde)
|
|
instance->removeConnectionByWsi(wsi);
|
|
std::cout << "WebSocket-Verbindung geschlossen (ohne User-ID, entferne aus allen Einträgen)" << std::endl;
|
|
}
|
|
} else {
|
|
std::cout << "WebSocket-Verbindung geschlossen (ud ist nullptr)" << std::endl;
|
|
}
|
|
|
|
// Entferne aus allConnections
|
|
{
|
|
std::unique_lock<std::shared_mutex> lock(instance->connectionsMutex);
|
|
instance->allConnections.erase(
|
|
std::remove(instance->allConnections.begin(), instance->allConnections.end(), wsi),
|
|
instance->allConnections.end()
|
|
);
|
|
}
|
|
break;
|
|
case LWS_CALLBACK_WSI_DESTROY:
|
|
// Verbindung wird zerstört - aufräumen falls nötig
|
|
if (ud) {
|
|
if (!ud->userId.empty()) {
|
|
instance->removeConnection(ud->userId, wsi);
|
|
} else {
|
|
instance->removeConnectionByWsi(wsi);
|
|
}
|
|
}
|
|
break;
|
|
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
|
// Client-Verbindungsfehler (falls wir als Client fungieren)
|
|
std::cerr << "WebSocket Client-Verbindungsfehler" << std::endl;
|
|
break;
|
|
case LWS_CALLBACK_HTTP:
|
|
// Erlaube WebSocket-Upgrade-Anfragen, lehne andere HTTP-Anfragen ab
|
|
// libwebsockets behandelt WebSocket-Upgrades automatisch, daher 0 zurückgeben
|
|
return 0;
|
|
case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
|
|
// Protokoll-Filter für bessere Kompatibilität
|
|
return 0;
|
|
case LWS_CALLBACK_RAW_CONNECTED:
|
|
// Raw-Verbindungen behandeln
|
|
return 0;
|
|
case LWS_CALLBACK_RAW_ADOPT_FILE:
|
|
case LWS_CALLBACK_RAW_ADOPT:
|
|
// Setze SO_REUSEADDR für den Socket (falls noch nicht gesetzt)
|
|
// Hinweis: Diese Callbacks werden möglicherweise nicht für Listen-Sockets aufgerufen
|
|
{
|
|
int fd = lws_get_socket_fd(wsi);
|
|
if (fd >= 0) {
|
|
int reuse = 1;
|
|
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
|
|
}
|
|
}
|
|
return 0;
|
|
default:
|
|
break;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void WebSocketServer::handleBrokerMessage(const std::string &message) {
|
|
try {
|
|
std::cout << "[handleBrokerMessage] Nachricht empfangen: " << message << std::endl;
|
|
json parsed = json::parse(message);
|
|
if (parsed.contains("user_id")) {
|
|
int fid;
|
|
if (parsed["user_id"].is_string()) {
|
|
fid = std::stoi(parsed["user_id"].get<std::string>());
|
|
} else {
|
|
fid = parsed["user_id"].get<int>();
|
|
}
|
|
auto userId = getUserIdFromFalukantUserId(fid);
|
|
std::cout << "[handleBrokerMessage] Broker-Nachricht für Falukant-User " << fid << " -> User-ID " << userId << std::endl;
|
|
|
|
// Prüfe ob User-ID gefunden wurde
|
|
if (userId.empty()) {
|
|
std::cerr << "[handleBrokerMessage] WARNUNG: User-ID für Falukant-User " << fid << " nicht gefunden! Nachricht wird nicht gesendet." << std::endl;
|
|
return;
|
|
}
|
|
|
|
std::shared_lock<std::shared_mutex> lock(connectionsMutex);
|
|
std::cout << "[handleBrokerMessage] Aktive User-Verbindungen: " << connections.size() << std::endl;
|
|
|
|
auto it = connections.find(userId);
|
|
if (it != connections.end() && !it->second.empty()) {
|
|
std::cout << "[handleBrokerMessage] Sende Nachricht an User " << userId << " (" << it->second.size() << " Verbindungen): " << message << std::endl;
|
|
|
|
// Nachricht an alle Verbindungen des Users senden
|
|
for (auto* wsi : it->second) {
|
|
auto *ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
|
if (ud) {
|
|
bool wasEmpty = false;
|
|
{
|
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
|
wasEmpty = ud->messageQueue.empty();
|
|
ud->messageQueue.push(message);
|
|
std::cout << "[handleBrokerMessage] Nachricht zur Queue hinzugefügt (Queue-Größe: " << ud->messageQueue.size() << ")" << std::endl;
|
|
}
|
|
// Nur wenn die Queue leer war, den Callback aufrufen
|
|
// (sonst wird er bereits durch den WRITEABLE-Handler aufgerufen)
|
|
if (wasEmpty) {
|
|
lws_callback_on_writable(wsi);
|
|
}
|
|
} else {
|
|
std::cerr << "[handleBrokerMessage] FEHLER: ud ist nullptr für eine Verbindung!" << std::endl;
|
|
}
|
|
}
|
|
} else {
|
|
std::cout << "[handleBrokerMessage] Keine aktive Verbindung für User " << userId << " gefunden" << std::endl;
|
|
std::cout << "[handleBrokerMessage] Verfügbare User-IDs in connections:" << std::endl;
|
|
for (const auto& pair : connections) {
|
|
std::cout << " - " << pair.first << " (" << pair.second.size() << " Verbindungen)" << std::endl;
|
|
}
|
|
}
|
|
} else {
|
|
std::cout << "[handleBrokerMessage] Nachricht enthält kein user_id-Feld!" << std::endl;
|
|
}
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[handleBrokerMessage] Error processing broker message: " << e.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
std::string WebSocketServer::getUserIdFromFalukantUserId(int userId) {
|
|
ConnectionGuard guard(pool);
|
|
auto &db = guard.get();
|
|
std::string sql = R"(
|
|
SELECT u.hashed_id
|
|
FROM community.user u
|
|
JOIN falukant_data.falukant_user fu ON u.id = fu.user_id
|
|
WHERE fu.id = $1
|
|
)";
|
|
db.prepare("get_user_id", sql);
|
|
auto res = db.execute("get_user_id", {std::to_string(userId)});
|
|
return (!res.empty()) ? res[0]["hashed_id"] : std::string();
|
|
}
|
|
|
|
bool WebSocketServer::isMainAdmin(const std::string &hashedUserId) {
|
|
ConnectionGuard guard(pool);
|
|
auto &db = guard.get();
|
|
std::string sql = R"(
|
|
SELECT COUNT(*) as count
|
|
FROM community.user u
|
|
JOIN community.user_right ur ON u.id = ur.user_id
|
|
JOIN "type".user_right tr ON ur.right_type_id = tr.id
|
|
WHERE u.hashed_id = $1
|
|
AND tr.title = 'mainadmin'
|
|
)";
|
|
db.prepare("check_mainadmin", sql);
|
|
auto res = db.execute("check_mainadmin", {hashedUserId});
|
|
if (res.empty()) {
|
|
return false;
|
|
}
|
|
int count = std::stoi(res[0]["count"].c_str());
|
|
return count > 0;
|
|
}
|
|
|
|
nlohmann::json WebSocketServer::getActiveConnections() {
|
|
json result = json::array();
|
|
|
|
std::shared_lock<std::shared_mutex> lock(connectionsMutex);
|
|
|
|
// Zähle Verbindungen ohne userId
|
|
size_t unauthenticatedCount = 0;
|
|
for (auto* wsi : allConnections) {
|
|
auto* ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
|
if (ud && ud->userId.empty()) {
|
|
unauthenticatedCount++;
|
|
}
|
|
}
|
|
|
|
// Iteriere über die Member-Variable this->connections (nicht die lokale Variable)
|
|
for (const auto& pair : this->connections) {
|
|
const std::string& userId = pair.first;
|
|
const auto& connList = pair.second;
|
|
|
|
json userConnections = {
|
|
{"userId", userId},
|
|
{"connectionCount", connList.size()},
|
|
{"connections", json::array()}
|
|
};
|
|
|
|
for (auto* wsi : connList) {
|
|
if (!wsi) continue;
|
|
auto* ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
|
if (!ud) continue;
|
|
|
|
try {
|
|
// Berechne Verbindungsdauer seit ESTABLISHED
|
|
// Verwende lastPongTime als Fallback, falls connectionTime nicht gesetzt ist
|
|
auto now = std::chrono::steady_clock::now();
|
|
auto connectionTime = ud->connectionTime.time_since_epoch().count() != 0
|
|
? ud->connectionTime
|
|
: ud->lastPongTime;
|
|
auto connectionDuration = std::chrono::duration_cast<std::chrono::seconds>(
|
|
now - connectionTime).count();
|
|
|
|
// Berechne Zeit seit letztem Pong
|
|
auto timeSinceLastPong = std::chrono::duration_cast<std::chrono::seconds>(
|
|
now - ud->lastPongTime).count();
|
|
|
|
json connInfo = {
|
|
{"connectionDurationSeconds", connectionDuration},
|
|
{"timeSinceLastPongSeconds", timeSinceLastPong},
|
|
{"pingTimeoutCount", ud->pingTimeoutCount},
|
|
{"pongReceived", ud->pongReceived}
|
|
};
|
|
userConnections["connections"].push_back(connInfo);
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[getActiveConnections] Fehler beim Verarbeiten einer Verbindung: " << e.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
result.push_back(userConnections);
|
|
}
|
|
|
|
// Füge unauthentifizierte Verbindungen hinzu
|
|
if (unauthenticatedCount > 0) {
|
|
json unauthenticatedConnections = {
|
|
{"userId", ""},
|
|
{"connectionCount", unauthenticatedCount},
|
|
{"connections", json::array()}
|
|
};
|
|
|
|
for (auto* wsi : allConnections) {
|
|
if (!wsi) continue;
|
|
auto* ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
|
if (!ud || !ud->userId.empty()) continue;
|
|
|
|
try {
|
|
auto now = std::chrono::steady_clock::now();
|
|
// Verwende lastPongTime als Fallback, falls connectionTime nicht gesetzt ist
|
|
auto connectionTime = ud->connectionTime.time_since_epoch().count() != 0
|
|
? ud->connectionTime
|
|
: ud->lastPongTime;
|
|
auto connectionDuration = std::chrono::duration_cast<std::chrono::seconds>(
|
|
now - connectionTime).count();
|
|
auto timeSinceLastPong = std::chrono::duration_cast<std::chrono::seconds>(
|
|
now - ud->lastPongTime).count();
|
|
|
|
json connInfo = {
|
|
{"connectionDurationSeconds", connectionDuration},
|
|
{"timeSinceLastPongSeconds", timeSinceLastPong},
|
|
{"pingTimeoutCount", ud->pingTimeoutCount},
|
|
{"pongReceived", ud->pongReceived},
|
|
{"status", "unauthenticated"}
|
|
};
|
|
unauthenticatedConnections["connections"].push_back(connInfo);
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[getActiveConnections] Fehler beim Verarbeiten einer unauthentifizierten Verbindung: " << e.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
result.push_back(unauthenticatedConnections);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
void WebSocketServer::sendMessageToConnection(struct lws *wsi, const std::string &message) {
|
|
if (!wsi) {
|
|
std::cerr << "[sendMessageToConnection] wsi ist nullptr" << std::endl;
|
|
return;
|
|
}
|
|
|
|
auto* ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
|
if (!ud) {
|
|
std::cerr << "[sendMessageToConnection] ud ist nullptr" << std::endl;
|
|
return;
|
|
}
|
|
|
|
sendMessageToConnection(wsi, ud, message);
|
|
}
|
|
|
|
void WebSocketServer::sendMessageToConnection(struct lws *wsi, WebSocketUserData *ud, const std::string &message) {
|
|
if (!wsi) {
|
|
std::cerr << "[sendMessageToConnection] wsi ist nullptr" << std::endl;
|
|
return;
|
|
}
|
|
|
|
if (!ud) {
|
|
std::cerr << "[sendMessageToConnection] ud ist nullptr" << std::endl;
|
|
return;
|
|
}
|
|
|
|
if (!context) {
|
|
std::cerr << "[sendMessageToConnection] context ist nullptr" << std::endl;
|
|
return;
|
|
}
|
|
|
|
std::cout << "[sendMessageToConnection] Start, wsi=" << wsi << ", ud=" << ud << ", message.length()=" << message.length() << std::endl;
|
|
|
|
try {
|
|
// Prüfe ob ud noch gültig ist, bevor wir den Mutex locken
|
|
std::cout << "[sendMessageToConnection] Prüfe ud-Gültigkeit..." << std::endl;
|
|
volatile bool test = ud->pongReceived;
|
|
(void)test;
|
|
std::cout << "[sendMessageToConnection] ud ist gültig" << std::endl;
|
|
|
|
// Kopiere die Nachricht, um sicherzustellen, dass sie gültig bleibt
|
|
std::string messageCopy = message;
|
|
std::cout << "[sendMessageToConnection] Nachricht kopiert: " << messageCopy.length() << " Bytes" << std::endl;
|
|
|
|
bool wasEmpty = false;
|
|
{
|
|
std::cout << "[sendMessageToConnection] Versuche Mutex zu locken..." << std::endl;
|
|
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
|
std::cout << "[sendMessageToConnection] Mutex gelockt" << std::endl;
|
|
|
|
// Prüfe ob ud noch gültig ist, nachdem wir den Mutex gelockt haben
|
|
if (!ud) {
|
|
std::cerr << "[sendMessageToConnection] ud wurde während des Lockens ungültig" << std::endl;
|
|
return;
|
|
}
|
|
|
|
std::cout << "[sendMessageToConnection] Prüfe Queue-Größe..." << std::endl;
|
|
wasEmpty = ud->messageQueue.empty();
|
|
std::cout << "[sendMessageToConnection] Queue war leer: " << wasEmpty << std::endl;
|
|
|
|
std::cout << "[sendMessageToConnection] Füge Nachricht zur Queue hinzu..." << std::endl;
|
|
ud->messageQueue.push(messageCopy);
|
|
std::cout << "[sendMessageToConnection] Nachricht zur Queue hinzugefügt, neue Größe: " << ud->messageQueue.size() << std::endl;
|
|
}
|
|
|
|
// Nur wenn die Queue leer war, den Callback aufrufen
|
|
// (sonst wird er bereits durch den WRITEABLE-Handler aufgerufen)
|
|
if (wasEmpty) {
|
|
std::cout << "[sendMessageToConnection] Queue war leer, rufe lws_cancel_service auf..." << std::endl;
|
|
// Verwende lws_cancel_service, um den Service zu benachrichtigen
|
|
// Das ist sicherer, wenn wir uns in einem Callback befinden
|
|
// lws_cancel_service triggert einen Service-Loop, der dann LWS_CALLBACK_SERVER_WRITEABLE aufruft
|
|
if (context) {
|
|
lws_cancel_service(context);
|
|
std::cout << "[sendMessageToConnection] lws_cancel_service aufgerufen" << std::endl;
|
|
}
|
|
}
|
|
std::cout << "[sendMessageToConnection] Erfolgreich abgeschlossen" << std::endl;
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[sendMessageToConnection] Fehler: " << e.what() << std::endl;
|
|
} catch (...) {
|
|
std::cerr << "[sendMessageToConnection] Unbekannter Fehler" << std::endl;
|
|
} catch (...) {
|
|
std::cerr << "[sendMessageToConnection] Unbekannter Fehler" << std::endl;
|
|
}
|
|
}
|
|
|
|
void WebSocketServer::setWorkers(const std::vector<std::unique_ptr<Worker>> &workerList) {
|
|
workers.clear();
|
|
workers.reserve(workerList.size());
|
|
for (const auto &wptr : workerList) {
|
|
workers.push_back(wptr.get());
|
|
}
|
|
}
|
|
|
|
void WebSocketServer::addConnection(const std::string &userId, struct lws *wsi) {
|
|
std::unique_lock<std::shared_mutex> lock(connectionsMutex);
|
|
connections[userId].push_back(wsi);
|
|
size_t totalConnections = 0;
|
|
for (const auto& pair : connections) {
|
|
totalConnections += pair.second.size();
|
|
}
|
|
std::cout << "[addConnection] Verbindung für User " << userId << " gespeichert (User hat " << connections[userId].size() << " Verbindung(en), insgesamt: " << totalConnections << " Verbindungen)" << std::endl;
|
|
}
|
|
|
|
void WebSocketServer::removeConnection(const std::string &userId, struct lws *wsi) {
|
|
std::unique_lock<std::shared_mutex> lock(connectionsMutex);
|
|
auto it = connections.find(userId);
|
|
if (it != connections.end()) {
|
|
// Entferne die spezifische Verbindung aus dem Vektor
|
|
auto& connList = it->second;
|
|
connList.erase(std::remove(connList.begin(), connList.end(), wsi), connList.end());
|
|
|
|
// Speichere die verbleibende Anzahl vor dem möglichen Löschen
|
|
size_t remainingConnections = connList.size();
|
|
|
|
// Wenn keine Verbindungen mehr vorhanden sind, entferne den Eintrag
|
|
if (connList.empty()) {
|
|
connections.erase(it);
|
|
}
|
|
|
|
size_t totalConnections = 0;
|
|
for (const auto& pair : connections) {
|
|
totalConnections += pair.second.size();
|
|
}
|
|
std::cout << "[removeConnection] Verbindung für User " << userId << " entfernt (User hat noch " << remainingConnections << " Verbindung(en), insgesamt: " << totalConnections << " Verbindungen)" << std::endl;
|
|
} else {
|
|
std::cout << "[removeConnection] Warnung: Keine Verbindungen für User " << userId << " gefunden" << std::endl;
|
|
}
|
|
|
|
// Entferne auch aus allConnections
|
|
allConnections.erase(
|
|
std::remove(allConnections.begin(), allConnections.end(), wsi),
|
|
allConnections.end()
|
|
);
|
|
}
|
|
|
|
void WebSocketServer::removeConnectionByWsi(struct lws *wsi) {
|
|
// Entfernt eine Verbindung aus allen Einträgen in der connections-Map
|
|
// Wird verwendet, wenn die userId nicht bekannt ist (z.B. bei vorzeitigem Schließen)
|
|
std::unique_lock<std::shared_mutex> lock(connectionsMutex);
|
|
|
|
std::vector<std::string> usersToRemove;
|
|
|
|
for (auto it = connections.begin(); it != connections.end(); ++it) {
|
|
auto& connList = it->second;
|
|
auto wsiIt = std::find(connList.begin(), connList.end(), wsi);
|
|
|
|
if (wsiIt != connList.end()) {
|
|
connList.erase(wsiIt);
|
|
std::cout << "[removeConnectionByWsi] Verbindung entfernt von User " << it->first << std::endl;
|
|
|
|
// Wenn keine Verbindungen mehr vorhanden sind, markiere für Entfernung
|
|
if (connList.empty()) {
|
|
usersToRemove.push_back(it->first);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Entferne leere Einträge
|
|
for (const auto& userId : usersToRemove) {
|
|
connections.erase(userId);
|
|
std::cout << "[removeConnectionByWsi] Leeren Eintrag für User " << userId << " entfernt" << std::endl;
|
|
}
|
|
|
|
// Entferne auch aus allConnections
|
|
allConnections.erase(
|
|
std::remove(allConnections.begin(), allConnections.end(), wsi),
|
|
allConnections.end()
|
|
);
|
|
}
|