From bd961a03d41ef2a159a9f0673af6ab47cc79bd22 Mon Sep 17 00:00:00 2001 From: "Torsten Schulz (local)" Date: Tue, 28 Oct 2025 08:06:45 +0100 Subject: [PATCH] Aktualisiere WebSocket-Server und Daemon-Konfiguration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Ändere die Pfade für SSL-Zertifikate in der Konfigurationsdatei. - Verbessere die Fehlerbehandlung beim Entfernen alter vorbereiteter Anweisungen in HouseWorker. - Füge Debug-Ausgaben zur Nachverfolgung von Verbindungen und Nachrichten im WebSocket-Server hinzu. - Implementiere Timeout-Logik für das Stoppen von Worker- und Watchdog-Threads. - Optimiere die Signalverarbeitung und Shutdown-Logik in main.cpp für bessere Responsivität. --- daemon.conf | 4 +- daemon.log | 5 + src/houseworker.cpp | 17 +++- src/main.cpp | 39 +++++++- src/message_broker.cpp | 2 + src/produce_worker.cpp | 19 ++-- src/websocket_server.cpp | 193 ++++++++++++++++++++++++++++++++++----- src/websocket_server.h | 7 ++ src/worker.h | 34 ++++++- ssl-certs/server.crt | 33 +++++++ ssl-certs/server.key | 52 +++++++++++ 11 files changed, 364 insertions(+), 41 deletions(-) create mode 100644 daemon.log create mode 100644 ssl-certs/server.crt create mode 100644 ssl-certs/server.key diff --git a/daemon.conf b/daemon.conf index 524b501..d60fdbc 100644 --- a/daemon.conf +++ b/daemon.conf @@ -6,5 +6,5 @@ DB_PASSWORD=hitomisan THREAD_COUNT=4 WEBSOCKET_PORT=4551 WEBSOCKET_SSL_ENABLED=false -WEBSOCKET_SSL_CERT_PATH=/etc/yourpart/server.crt -WEBSOCKET_SSL_KEY_PATH=/etc/yourpart/server.key \ No newline at end of file +WEBSOCKET_SSL_CERT_PATH=/home/torsten/Programs/yourpart-daemon/ssl-certs/server.crt +WEBSOCKET_SSL_KEY_PATH=/home/torsten/Programs/yourpart-daemon/ssl-certs/server.key \ No newline at end of file diff --git a/daemon.log b/daemon.log new file mode 100644 index 0000000..8c2de3d --- /dev/null +++ b/daemon.log @@ -0,0 +1,5 @@ +WebSocket Server starting on port 4551 (no SSL) +[2025/09/29 08:50:10:6854] N: lws_create_context: LWS: 4.3.5-unknown, NET CLI SRV H1 H2 WS ConMon IPv6-absent +[2025/09/29 08:50:10:6874] N: __lws_lc_tag: ++ [wsi|0|pipe] (1) +[2025/09/29 08:50:10:6874] N: __lws_lc_tag: ++ [vh|0|netlink] (1) +WebSocket-Server erfolgreich gestartet auf Port 4551 diff --git a/src/houseworker.cpp b/src/houseworker.cpp index 029e9bf..3226347 100644 --- a/src/houseworker.cpp +++ b/src/houseworker.cpp @@ -53,10 +53,23 @@ void HouseWorker::performHouseStateChange() { try { ConnectionGuard connGuard(pool); auto &db = connGuard.get(); - db.remove("QUERY_UPDATE_BUYABLE_HOUSE_STATE"); - db.remove("QUERY_UPDATE_USER_HOUSE_STATE"); + // Entferne alte vorbereitete Anweisungen falls sie existieren + try { + db.remove("QUERY_UPDATE_BUYABLE_HOUSE_STATE"); + } catch (...) { + // Ignoriere Fehler beim Entfernen + } + try { + db.remove("QUERY_UPDATE_USER_HOUSE_STATE"); + } catch (...) { + // Ignoriere Fehler beim Entfernen + } + + // Bereite neue Anweisungen vor db.prepare("QUERY_UPDATE_BUYABLE_HOUSE_STATE", QUERY_UPDATE_BUYABLE_HOUSE_STATE); db.prepare("QUERY_UPDATE_USER_HOUSE_STATE", QUERY_UPDATE_USER_HOUSE_STATE); + + // Führe die Anweisungen aus db.execute("QUERY_UPDATE_BUYABLE_HOUSE_STATE"); db.execute("QUERY_UPDATE_USER_HOUSE_STATE"); } catch(const std::exception &e) { diff --git a/src/main.cpp b/src/main.cpp index 74085d2..9b1a30e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include std::atomic keepRunning(true); @@ -26,8 +27,10 @@ void handleSignal(int signal) { std::cerr << "Signal erhalten: " << signal << ". Beende Anwendung..." << std::endl; if (signal == SIGINT || signal == SIGTERM) { + std::cerr << "Setze Shutdown-Flags..." << std::endl; keepRunning.store(false, std::memory_order_relaxed); shutdownRequested.store(true, std::memory_order_relaxed); + std::cerr << "Shutdown-Flags gesetzt. keepRunning=" << keepRunning.load() << ", shutdownRequested=" << shutdownRequested.load() << std::endl; } } @@ -36,7 +39,7 @@ int main() { std::signal(SIGTERM, handleSignal); try { - Config config("/etc/yourpart/daemon.conf"); + Config config("../daemon.conf"); ConnectionPool pool( config.get("DB_HOST"), config.get("DB_PORT"), @@ -78,24 +81,52 @@ int main() { sd_notify(0, "READY=1"); // Hauptschleife mit besserer Signal-Behandlung - while (keepRunning && !shutdownRequested) { + std::cerr << "Hauptschleife gestartet. keepRunning=" << keepRunning.load() << ", shutdownRequested=" << shutdownRequested.load() << std::endl; + while (keepRunning.load() && !shutdownRequested.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } + std::cerr << "Hauptschleife beendet. keepRunning=" << keepRunning.load() << ", shutdownRequested=" << shutdownRequested.load() << std::endl; std::cerr << "Starte sauberes Herunterfahren..." << std::endl; - // Stoppe alle Worker + auto shutdownStart = std::chrono::steady_clock::now(); + const auto maxShutdownTime = std::chrono::seconds(10); + + // Stoppe alle Worker-Threads + std::cerr << "Stoppe Worker-Threads..." << std::endl; for (auto &worker : workers) { worker->stopWorkerThread(); + if (std::chrono::steady_clock::now() - shutdownStart > maxShutdownTime) { + std::cerr << "Shutdown-Timeout erreicht, erzwinge Beendigung..." << std::endl; + break; + } + } + + // Stoppe Watchdog-Threads + std::cerr << "Stoppe Watchdog-Threads..." << std::endl; + for (auto &worker : workers) { + worker->stopWatchdogThread(); + if (std::chrono::steady_clock::now() - shutdownStart > maxShutdownTime) { + std::cerr << "Shutdown-Timeout erreicht, erzwinge Beendigung..." << std::endl; + break; + } } // Stoppe WebSocket Server + std::cerr << "Stoppe WebSocket-Server..." << std::endl; websocketServer.stop(); // Stoppe Message Broker + std::cerr << "Stoppe Message Broker..." << std::endl; broker.stop(); - std::cerr << "Anwendung erfolgreich beendet." << std::endl; + auto shutdownDuration = std::chrono::duration_cast( + std::chrono::steady_clock::now() - shutdownStart); + std::cerr << "Anwendung erfolgreich beendet in " << shutdownDuration.count() << "ms." << std::endl; + + // Erzwinge sofortiges Exit nach Shutdown + std::cerr << "Erzwinge Prozess-Beendigung..." << std::endl; + std::_Exit(0); } catch (const std::exception &e) { std::cerr << "Fehler: " << e.what() << std::endl; return 1; diff --git a/src/message_broker.cpp b/src/message_broker.cpp index 7809aa4..83c1765 100644 --- a/src/message_broker.cpp +++ b/src/message_broker.cpp @@ -3,6 +3,7 @@ void MessageBroker::publish(const std::string &message) { std::lock_guard lock(mutex); + std::cout << "[MessageBroker] Nachricht gepubliziert: " << message << std::endl; messageQueue.push(message); cv.notify_all(); } @@ -34,6 +35,7 @@ void MessageBroker::processMessages() { std::string message = std::move(messageQueue.front()); messageQueue.pop(); lock.unlock(); + std::cout << "[MessageBroker] Sende Nachricht an " << subscribers.size() << " Subscribers: " << message << std::endl; for (const auto &callback : subscribers) { callback(message); } diff --git a/src/produce_worker.cpp b/src/produce_worker.cpp index 3b611f3..dfaeae2 100644 --- a/src/produce_worker.cpp +++ b/src/produce_worker.cpp @@ -13,20 +13,25 @@ ProduceWorker::~ProduceWorker() { void ProduceWorker::run() { auto lastIterationTime = std::chrono::steady_clock::now(); - while (runningWorker) { + while (runningWorker.load()) { setCurrentStep("Check runningWorker Variable"); - { - std::lock_guard lock(activityMutex); - if (!runningWorker) { - break; - } + if (!runningWorker.load()) { + break; } + setCurrentStep("Calculate elapsed time"); auto now = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration_cast(now - lastIterationTime); if (elapsed < std::chrono::milliseconds(200)) { - std::this_thread::sleep_for(std::chrono::milliseconds(200) - elapsed); + // Kürzere Sleep-Intervalle für bessere Shutdown-Responsivität + auto sleepTime = std::chrono::milliseconds(200) - elapsed; + for (int i = 0; i < sleepTime.count() && runningWorker.load(); i += 10) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } } + + if (!runningWorker.load()) break; + lastIterationTime = std::chrono::steady_clock::now(); setCurrentStep("Process Productions"); processProductions(); diff --git a/src/websocket_server.cpp b/src/websocket_server.cpp index c8b7471..deb510b 100644 --- a/src/websocket_server.cpp +++ b/src/websocket_server.cpp @@ -4,11 +4,18 @@ #include #include #include +#include using json = nlohmann::json; // Protocols array definition struct lws_protocols WebSocketServer::protocols[] = { + { + "", // Leeres Protokoll für Standard-WebSocket-Verbindungen + WebSocketServer::wsCallback, + sizeof(WebSocketUserData), + 4096 + }, { "yourpart-protocol", WebSocketServer::wsCallback, @@ -44,14 +51,40 @@ void WebSocketServer::run() { serverThread = std::thread([this](){ startServer(); }); messageThread = std::thread([this](){ processMessageQueue(); }); pingThread = std::thread([this](){ pingClients(); }); + + // Warte kurz bis alle Threads gestartet sind + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } void WebSocketServer::stop() { running = false; if (context) lws_cancel_service(context); - if (serverThread.joinable()) serverThread.join(); - if (messageThread.joinable()) messageThread.join(); - if (pingThread.joinable()) pingThread.join(); + + // Stoppe Threads mit Timeout + std::vector> futures; + + if (serverThread.joinable()) { + futures.push_back(std::async(std::launch::async, [this]() { serverThread.join(); })); + } + if (messageThread.joinable()) { + futures.push_back(std::async(std::launch::async, [this]() { messageThread.join(); })); + } + if (pingThread.joinable()) { + futures.push_back(std::async(std::launch::async, [this]() { pingThread.join(); })); + } + + // Warte auf alle Threads mit Timeout + for (auto& future : futures) { + if (future.wait_for(std::chrono::milliseconds(1000)) == std::future_status::timeout) { + std::cerr << "WebSocket-Thread beendet sich nicht, erzwinge Beendigung..." << std::endl; + } + } + + // Force detach alle Threads + if (serverThread.joinable()) serverThread.detach(); + if (messageThread.joinable()) messageThread.detach(); + if (pingThread.joinable()) pingThread.detach(); + if (context) { lws_context_destroy(context); context = nullptr; @@ -64,12 +97,15 @@ void WebSocketServer::startServer() { info.port = port; info.protocols = protocols; + // Vereinfachte Server-Optionen für bessere Kompatibilität + info.options = LWS_SERVER_OPTION_VALIDATE_UTF8; + // SSL/TLS Konfiguration if (useSSL) { if (certPath.empty() || keyPath.empty()) { throw std::runtime_error("SSL enabled but certificate or key path not provided"); } - info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; info.ssl_cert_filepath = certPath.c_str(); info.ssl_private_key_filepath = keyPath.c_str(); std::cout << "WebSocket SSL Server starting on port " << port << " with certificates: " @@ -78,23 +114,36 @@ void WebSocketServer::startServer() { std::cout << "WebSocket Server starting on port " << port << " (no SSL)" << std::endl; } - // Reduziere Log-Level um weniger Debug-Ausgaben zu haben - setenv("LWS_LOG_LEVEL", "0", 1); // 0 = nur Fehler + // Erhöhe Log-Level für besseres Debugging + setenv("LWS_LOG_LEVEL", "7", 1); // 7 = alle Logs context = lws_create_context(&info); if (!context) { throw std::runtime_error("Failed to create LWS context"); } + + std::cout << "WebSocket-Server erfolgreich gestartet auf Port " << port << std::endl; + while (running) { - lws_service(context, 50); + int ret = lws_service(context, 50); + if (ret < 0) { + std::cerr << "WebSocket-Server Fehler: lws_service returned " << ret << std::endl; + break; + } + // Kurze Pause für bessere Shutdown-Responsivität + if (running) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } } + + std::cout << "WebSocket-Server wird beendet..." << std::endl; } void WebSocketServer::processMessageQueue() { while (running) { std::unique_lock lock(queueMutex); - queueCV.wait(lock, [this](){ return !messageQueue.empty() || !running; }); - while (!messageQueue.empty()) { + queueCV.wait_for(lock, std::chrono::milliseconds(100), [this](){ return !messageQueue.empty() || !running; }); + while (!messageQueue.empty() && running) { std::string msg = std::move(messageQueue.front()); messageQueue.pop(); lock.unlock(); @@ -106,8 +155,52 @@ void WebSocketServer::processMessageQueue() { void WebSocketServer::pingClients() { while (running) { - std::this_thread::sleep_for(std::chrono::seconds(30)); - lws_callback_on_writable_all_protocol(context, &protocols[0]); + // Kürzere Sleep-Intervalle für bessere Shutdown-Responsivität + for (int i = 0; i < WebSocketUserData::PING_INTERVAL_SECONDS * 10 && running; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + if (!running || !context) continue; + + auto now = std::chrono::steady_clock::now(); + std::vector toDisconnect; + + // Prüfe alle Verbindungen auf Timeouts + { + std::shared_lock lock(connectionsMutex); + for (auto& pair : connections) { + auto* ud = reinterpret_cast(lws_wsi_user(pair.second)); + if (!ud) continue; + + // Prüfe ob Pong-Timeout erreicht wurde + auto timeSincePing = std::chrono::duration_cast(now - ud->lastPingTime).count(); + auto timeSincePong = std::chrono::duration_cast(now - ud->lastPongTime).count(); + + if (!ud->pongReceived && timeSincePing > WebSocketUserData::PONG_TIMEOUT_SECONDS) { + ud->pingTimeoutCount++; + std::cout << "Ping-Timeout für User " << ud->userId << " (Versuch " << ud->pingTimeoutCount << "/" << WebSocketUserData::MAX_PING_TIMEOUTS << ")" << std::endl; + + if (ud->pingTimeoutCount >= WebSocketUserData::MAX_PING_TIMEOUTS) { + std::cout << "Verbindung wird getrennt: Zu viele Ping-Timeouts für User " << ud->userId << std::endl; + toDisconnect.push_back(pair.second); + } else { + // Reset für nächsten Versuch + ud->pongReceived = true; + ud->lastPongTime = now; + } + } + } + } + + // Trenne problematische Verbindungen + for (auto* wsi : toDisconnect) { + lws_close_reason(wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, (unsigned char*)"Ping timeout", 12); + } + + // Sende Pings an alle aktiven Verbindungen + if (running) { + lws_callback_on_writable_all_protocol(context, &protocols[0]); + } } } @@ -118,27 +211,51 @@ int WebSocketServer::wsCallback(struct lws *wsi, auto *ud = reinterpret_cast(user); switch (reason) { - case LWS_CALLBACK_ESTABLISHED: + case LWS_CALLBACK_ESTABLISHED: { ud->pongReceived = true; - std::cout << "WebSocket-Verbindung hergestellt" << std::endl; + ud->lastPingTime = std::chrono::steady_clock::now(); + ud->lastPongTime = std::chrono::steady_clock::now(); + ud->pingTimeoutCount = 0; + const char* protocolName = lws_get_protocol(wsi)->name; + std::cout << "WebSocket-Verbindung hergestellt (Protokoll: " << (protocolName ? protocolName : "Standard") << ")" << std::endl; + char client_addr[128]; + lws_get_peer_simple(wsi, client_addr, sizeof(client_addr)); + std::cout << "Client-Adresse: " << client_addr << std::endl; break; + } case LWS_CALLBACK_RECEIVE: { std::string msg(reinterpret_cast(in), len); std::cout << "WebSocket-Nachricht empfangen: " << msg << std::endl; + // Pong-Antwort behandeln + if (msg == "pong") { + ud->pongReceived = true; + ud->lastPongTime = std::chrono::steady_clock::now(); + ud->pingTimeoutCount = 0; + std::cout << "Pong von Client empfangen" << std::endl; + break; + } + try { json parsed = json::parse(msg); + std::cout << "[RECEIVE] Nachricht empfangen: " << msg << std::endl; + if (parsed.contains("event") && parsed["event"] == "setUserId") { if (parsed.contains("data") && parsed["data"].contains("userId")) { ud->userId = parsed["data"]["userId"].get(); - std::cout << "User-ID gesetzt: " << ud->userId << std::endl; + std::cout << "[RECEIVE] User-ID gesetzt: " << ud->userId << std::endl; // Verbindung in der Map speichern instance->addConnection(ud->userId, wsi); + std::cout << "[RECEIVE] Verbindung gespeichert" << std::endl; + } else { + std::cerr << "[RECEIVE] setUserId-Event ohne data.userId-Feld" << std::endl; } + } else { + std::cout << "[RECEIVE] Ignoriere Nachricht (kein setUserId-Event)" << std::endl; } } catch (const std::exception &e) { - std::cerr << "Fehler beim Parsen der WebSocket-Nachricht: " << e.what() << std::endl; + std::cerr << "[RECEIVE] Fehler beim Parsen der WebSocket-Nachricht: " << e.what() << std::endl; } break; } @@ -146,15 +263,20 @@ int WebSocketServer::wsCallback(struct lws *wsi, // Prüfe ob es eine Nachricht zum Senden gibt if (ud->pendingMessage.empty()) { // Ping senden + ud->lastPingTime = std::chrono::steady_clock::now(); + ud->pongReceived = false; unsigned char buf[LWS_PRE + 4]; memcpy(buf + LWS_PRE, "ping", 4); lws_write(wsi, buf + LWS_PRE, 4, LWS_WRITE_TEXT); + // std::cout << "Ping an Client gesendet" << std::endl; } else { // Nachricht senden + std::cout << "[WRITEABLE] Sende Nachricht: " << ud->pendingMessage << std::endl; unsigned char buf[LWS_PRE + ud->pendingMessage.length()]; memcpy(buf + LWS_PRE, ud->pendingMessage.c_str(), ud->pendingMessage.length()); lws_write(wsi, buf + LWS_PRE, ud->pendingMessage.length(), LWS_WRITE_TEXT); ud->pendingMessage.clear(); + std::cout << "[WRITEABLE] Nachricht erfolgreich gesendet" << std::endl; } break; } @@ -162,8 +284,18 @@ int WebSocketServer::wsCallback(struct lws *wsi, // Verbindung aus der Map entfernen if (!ud->userId.empty()) { instance->removeConnection(ud->userId); + std::cout << "WebSocket-Verbindung geschlossen für User: " << ud->userId << std::endl; } break; + case LWS_CALLBACK_HTTP: + // HTTP-Anfragen ablehnen (nur WebSocket erlaubt) + return -1; + case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: + // Protokoll-Filter für bessere Kompatibilität + return 0; + case LWS_CALLBACK_RAW_CONNECTED: + // Raw-Verbindungen behandeln + return 0; default: break; } @@ -172,30 +304,47 @@ int WebSocketServer::wsCallback(struct lws *wsi, void WebSocketServer::handleBrokerMessage(const std::string &message) { try { + std::cout << "[handleBrokerMessage] Nachricht empfangen: " << message << std::endl; json parsed = json::parse(message); if (parsed.contains("user_id")) { - int fid = parsed["user_id"].get(); + int fid; + if (parsed["user_id"].is_string()) { + fid = std::stoi(parsed["user_id"].get()); + } else { + fid = parsed["user_id"].get(); + } auto userId = getUserIdFromFalukantUserId(fid); - std::cout << "Broker-Nachricht für Falukant-User " << fid << " -> User-ID " << userId << std::endl; + std::cout << "[handleBrokerMessage] Broker-Nachricht für Falukant-User " << fid << " -> User-ID " << userId << std::endl; std::shared_lock lock(connectionsMutex); + std::cout << "[handleBrokerMessage] Aktive Verbindungen: " << connections.size() << std::endl; + auto it = connections.find(userId); if (it != connections.end()) { - std::cout << "Sende Nachricht an User " << userId << ": " << message << std::endl; + std::cout << "[handleBrokerMessage] Sende Nachricht an User " << userId << ": " << message << std::endl; // Nachricht in der UserData speichern auto *ud = reinterpret_cast(lws_wsi_user(it->second)); if (ud) { ud->pendingMessage = message; + std::cout << "[handleBrokerMessage] Nachricht in pendingMessage gespeichert" << std::endl; + } else { + std::cerr << "[handleBrokerMessage] FEHLER: ud ist nullptr!" << std::endl; } lws_callback_on_writable(it->second); } else { - std::cout << "Keine aktive Verbindung für User " << userId << " gefunden" << std::endl; + std::cout << "[handleBrokerMessage] Keine aktive Verbindung für User " << userId << " gefunden" << std::endl; + std::cout << "[handleBrokerMessage] Verfügbare User-IDs in connections:" << std::endl; + for (const auto& pair : connections) { + std::cout << " - " << pair.first << std::endl; + } } + } else { + std::cout << "[handleBrokerMessage] Nachricht enthält kein user_id-Feld!" << std::endl; } } catch (const std::exception &e) { - std::cerr << "Error processing broker message: " << e.what() << std::endl; + std::cerr << "[handleBrokerMessage] Error processing broker message: " << e.what() << std::endl; } } @@ -224,11 +373,11 @@ void WebSocketServer::setWorkers(const std::vector> &wor void WebSocketServer::addConnection(const std::string &userId, struct lws *wsi) { std::unique_lock lock(connectionsMutex); connections[userId] = wsi; - std::cout << "Verbindung für User " << userId << " gespeichert" << std::endl; + std::cout << "[addConnection] Verbindung für User " << userId << " gespeichert (Insgesamt: " << connections.size() << " Verbindungen)" << std::endl; } void WebSocketServer::removeConnection(const std::string &userId) { std::unique_lock lock(connectionsMutex); connections.erase(userId); - std::cout << "Verbindung für User " << userId << " entfernt" << std::endl; + std::cout << "[removeConnection] Verbindung für User " << userId << " entfernt (Insgesamt: " << connections.size() << " Verbindungen)" << std::endl; } diff --git a/src/websocket_server.h b/src/websocket_server.h index 2238257..83f073b 100644 --- a/src/websocket_server.h +++ b/src/websocket_server.h @@ -15,11 +15,18 @@ #include #include #include +#include struct WebSocketUserData { std::string userId; bool pongReceived = true; std::string pendingMessage; + std::chrono::steady_clock::time_point lastPingTime; + std::chrono::steady_clock::time_point lastPongTime; + int pingTimeoutCount = 0; + static constexpr int MAX_PING_TIMEOUTS = 3; + static constexpr int PING_INTERVAL_SECONDS = 30; + static constexpr int PONG_TIMEOUT_SECONDS = 10; }; class Worker; // forward diff --git a/src/worker.h b/src/worker.h index f09a880..35a5d7c 100644 --- a/src/worker.h +++ b/src/worker.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "connection_pool.h" @@ -39,7 +40,16 @@ public: void stopWorkerThread() { runningWorker.store(false); if (workerThread.joinable()) { - workerThread.join(); + // Timeout für Thread-Beendigung + auto future = std::async(std::launch::async, [this]() { + workerThread.join(); + }); + + if (future.wait_for(std::chrono::milliseconds(500)) == std::future_status::timeout) { + std::cerr << "[" << workerName << "] Worker-Thread beendet sich nicht, erzwinge Beendigung..." << std::endl; + // Thread wird beim Destruktor automatisch detached + workerThread.detach(); + } } } @@ -55,7 +65,15 @@ public: void stopWatchdogThread() { runningWatchdog.store(false); if (watchdogThread.joinable()) { - watchdogThread.join(); + // Timeout für Watchdog-Thread-Beendigung + auto future = std::async(std::launch::async, [this]() { + watchdogThread.join(); + }); + + if (future.wait_for(std::chrono::milliseconds(200)) == std::future_status::timeout) { + std::cerr << "[" << workerName << "] Watchdog-Thread beendet sich nicht, erzwinge Beendigung..." << std::endl; + watchdogThread.detach(); + } } } @@ -75,7 +93,13 @@ protected: void watchdog() { try { while (runningWatchdog.load()) { - std::this_thread::sleep_for(watchdogInterval); + // Kürzere Sleep-Intervalle für bessere Shutdown-Responsivität + for (int i = 0; i < 10 && runningWatchdog.load(); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + if (!runningWatchdog.load()) break; + bool isActive = false; { std::lock_guard lock(activityMutex); @@ -86,7 +110,9 @@ protected: std::cerr << "[" << workerName << "] Watchdog: Keine Aktivität! Starte Worker neu...\n"; std::cerr << "[" << workerName << "] Letzte Aktivität: " << getCurrentStep() << "\n"; stopWorkerThread(); - startWorkerThread(); + if (runningWatchdog.load()) { // Nur neu starten wenn nicht shutdown + startWorkerThread(); + } } } } catch (const std::exception &e) { diff --git a/ssl-certs/server.crt b/ssl-certs/server.crt new file mode 100644 index 0000000..38288b3 --- /dev/null +++ b/ssl-certs/server.crt @@ -0,0 +1,33 @@ +-----BEGIN CERTIFICATE----- +MIIFnTCCA4WgAwIBAgIUf7ObINpycsL8eoD5HWWZlQWXjJ0wDQYJKoZIhvcNAQEL +BQAwXjELMAkGA1UEBhMCREUxEDAOBgNVBAgMB0dlcm1hbnkxDzANBgNVBAcMBkJl +cmxpbjERMA8GA1UECgwIWW91clBhcnQxGTAXBgNVBAMMEHd3dy55b3VyLXBhcnQu +ZGUwHhcNMjUwOTI5MTEyNTM0WhcNMjYwOTI5MTEyNTM0WjBeMQswCQYDVQQGEwJE +RTEQMA4GA1UECAwHR2VybWFueTEPMA0GA1UEBwwGQmVybGluMREwDwYDVQQKDAhZ +b3VyUGFydDEZMBcGA1UEAwwQd3d3LnlvdXItcGFydC5kZTCCAiIwDQYJKoZIhvcN +AQEBBQADggIPADCCAgoCggIBAJt0zR/ez1S7uidVTITbeoKAfHfYzTt0/73Iqmn5 +28zT160/2Q/Cf2I6VJ6O50GY7p3M2vMO13vJwcZJ/KZn4371Tm9jwu10OMYBld4t +ZXZ8kv1n9kLyOMAoLvrT8r4qDlsl43bE2vh509aisvjEph8OETquwiWFy0Rx46vy +ilNLgwzQJcdAyR3SsYyHGbwTqyN5PdkJ6ok7gG5ZbCMD0ZYbI2KoSHoQIHZLbnLg +VB/YUK6LHvSrgAHl9c0e4dJaEpssRGZaCUPZ+zwqwPvEeCvkO244ErSXYSGkTn3Y +WDeg7cFoCn8MVp8OEBel0mHPCNlnEYoWtYr+rx8C8FdFcIU4Dx5n5GX53a+ePN3B +Tu0cEZ4HL7IcVPsAOl2/xZl2efRBsZpp+Sp+MstXQKbNp2ylYquSFm9ZAbqdN+hZ +CAmm6Cqg9fKoFSQL9ljb5traS9HeLm/rCtnQpacpzmTcTi8grNa3ydLoF6OgxUba +RlcRAI4vvJgj5c1Q65Wlu7k1ttiFZXxMuW2QiZW03/5M0msr5JO2TVTBZtVd1Xll +ON42SEhwyeq6PgfJz4gCRIFQqD8os2cVZV6DfZcSupXgpfWpQl5Z5wWNrPLeBJWm +iCveM5wXpauook3bBJDVHKhNX4XIVjpy0ZDI/INxAGxfNfTFoVuPbYvWVvf8y4Bu +0orxAgMBAAGjUzBRMB0GA1UdDgQWBBTDF/IEVy993K4Tbo+vt3y0nFaexTAfBgNV +HSMEGDAWgBTDF/IEVy993K4Tbo+vt3y0nFaexTAPBgNVHRMBAf8EBTADAQH/MA0G +CSqGSIb3DQEBCwUAA4ICAQA0EoB748+ssgldnLNqB6f0HRyrX8YP7lLc34LEp7Mj +FGB1aWTGSXVeZIz96fFKkOR3h9SgLGtiyI3L3QsADXdUmntiVun1+7ejj9/7BPQE +LiMYuln+erRJOiYDqNHjlIMIIW5mA9yO8Pup4W0pD7wGTRQbBU9jnndYViex4TFc +mpPTtFkD+sAAuh7LFIA05X4jI3eAzGK3qUDvq6z1ojcmXBeZEijuhaaClJTRPwoO +HNjxYSM17zd5DHbAPW8xEZLkf7mh+SwYO/SjMKwXs6yiTmSmo4/cjkvX/OrZX67U +oNPovGvAgfSVT2RfY2sagr5Vv8uH8np8aH6a4BbjPUI4vC5Gs23iM//YILgWOoQr ++k0CfOyO+WVTc2capgN1xJ2IcnOrN9SMOtMdaLjbk1TPfZBlHnamcholXbcor8Fp +M1Si9uCO160Lkk96VpE55AFYldxrV0a5HwjK1zCdzS4XO8GP83Qqy1ZJk8WrD/Qm +HK3q+eAWpEnVKCOPjRKJD4gJgR2/SEnBNfm4SI+v58oIF56Uq+RY+1UTR0pQS0GF +D29Es18R5toNX7j93ccyi+j2igpV9yKouKEDq78NI1KU7t8MI0Pt8gBlJQI/eBJS +L7RGWEMdjxUsm+u+gniIizGCU4gtCNRkcR+XAeKUW22qZx0otjJ4DThEeXzlsJ2y +ag== +-----END CERTIFICATE----- diff --git a/ssl-certs/server.key b/ssl-certs/server.key new file mode 100644 index 0000000..586432a --- /dev/null +++ b/ssl-certs/server.key @@ -0,0 +1,52 @@ +-----BEGIN PRIVATE KEY----- +MIIJQwIBADANBgkqhkiG9w0BAQEFAASCCS0wggkpAgEAAoICAQCbdM0f3s9Uu7on +VUyE23qCgHx32M07dP+9yKpp+dvM09etP9kPwn9iOlSejudBmO6dzNrzDtd7ycHG +SfymZ+N+9U5vY8LtdDjGAZXeLWV2fJL9Z/ZC8jjAKC760/K+Kg5bJeN2xNr4edPW +orL4xKYfDhE6rsIlhctEceOr8opTS4MM0CXHQMkd0rGMhxm8E6sjeT3ZCeqJO4Bu +WWwjA9GWGyNiqEh6ECB2S25y4FQf2FCuix70q4AB5fXNHuHSWhKbLERmWglD2fs8 +KsD7xHgr5DtuOBK0l2EhpE592Fg3oO3BaAp/DFafDhAXpdJhzwjZZxGKFrWK/q8f +AvBXRXCFOA8eZ+Rl+d2vnjzdwU7tHBGeBy+yHFT7ADpdv8WZdnn0QbGaafkqfjLL +V0CmzadspWKrkhZvWQG6nTfoWQgJpugqoPXyqBUkC/ZY2+ba2kvR3i5v6wrZ0KWn +Kc5k3E4vIKzWt8nS6BejoMVG2kZXEQCOL7yYI+XNUOuVpbu5NbbYhWV8TLltkImV +tN/+TNJrK+STtk1UwWbVXdV5ZTjeNkhIcMnquj4Hyc+IAkSBUKg/KLNnFWVeg32X +ErqV4KX1qUJeWecFjazy3gSVpogr3jOcF6WrqKJN2wSQ1RyoTV+FyFY6ctGQyPyD +cQBsXzX0xaFbj22L1lb3/MuAbtKK8QIDAQABAoICAAd5JyyKXP4cP3npN8pOBQrh +p4IpLu4WHP6EF12sfl6fmz9j2bDwUyh/KH7eHLPOiN+XB9pODwm/WHB6cXH0Pfd2 +Ll7sXURGLV3G+Rv/A5D9coFKQnhjzbq+n8oM/v8ZdVrYRKHquyJddHOtuwP6q6gD +6IwBN1n/j2bXIQhcyr2v/FEFD2Dfnl9/t8t7Oe9sxGIaX7DXsUHHRZCAfeJlyklA +nRwOvhu4m1/mds0A1+h3QSMv8tU1KqxksEMr8jQXIox5RYFEYCxF7hYNkd0UnAiT +onAFM/CAs8Ge1Qtnl2+WreFZqaIDj0U6k0dYwFc1gU3Wvq0MVA5GWbe4X+KZJuxc +W1/IIO5+rQn9vYwVrDhWcfL8PFsX4P9bWSc8Hpg/uf2UFKgxO4ydPOepy9+i4xVS +Bun2XcWh6GlyG1OEtvu6CVmAcvQ4s+K53r+2W/la9tmqLObLVCJqB2vrz8ISwy7N +glXJj55Kc5A9Mhjnct9Ap7Mv9hoVG3cZp3jaTlPbhPVlCJb+gePQ26ao6zpjOQpy +WWXBzhFAELiC8FKCNDzPwsU1mP0Z+Kkn5XT/GiCG0KabvQ1ZB2bMsZ6UzpwpEVVt +V+PjZ8GWv4qm2BxgdttVTum7/EJQDdZ6N2SpKZ0TRSdW0rmCrcs7poFx4AuopJBc +emHF9YzpgHidQIC/Yy5BAoIBAQDWZfZqdrSj6WwxXLsiQCsFnJ7hTizalkfNPmQo +D5sHJ67TFAFSss1T/vNHbnK4j89VU2NBntoqe2RWhZtwhIvyzq78yJNIbkAJke8w +SB6mrn0/Q+mbeGf5HXFNiqzy8gECAOJbC+ep6/bdE/6r9Iv81ojY6xKegjcLQdZy +J+bcMom7zGC7IfRx/uGj2k5OyqpGijrv8rUoTRpthqnvvvOFrC7J0466kIkTqphU +9muScWJYiaOR9gLb6JJxk7NfJANgZpldysmP4Hu5+3eIa5fhu3g0s1b3TTAOJlzN +m3mUBrChK+zR5Be9Wx86u+0TfwO007LGqfgCZmVT66F9+kbBAoIBAQC5nvKIrDGE +eFcq/qFfNj9Vn/3RT+eRHSUutRffpVjkQkEeqVucI17c9SX2154S85yXwOmecre5 +3SeuBq3SfPw2HK3mHAAiflW14dlcvyIunF7SQQIYQ2Yp1jWbnIEZHMFMWfQ35nZO +QXn4DKpwtpBig2y35m+pXv/hDY2iKVQPlEqk5Gn0/I3LO35Dw98DPdSUK9MVTDOB +7L58WYUiLf9jypsnSLIKjgYJoUp+zTvm9agC4PoyXhw7bskQrfrAUYYUKK8ospPW +lRfKtafRqW92uSvksbLOgEaSIDUxXUdfx6qKob8yJEgZRFtEsj1u4+ai2vRhPRry +OM9CDTTPqwAxAoIBAFRMMt8ZXV01YjzmYQ3OfRvvwOiFfE1V2VVxup+lwybFC5Ai +xYJmmyTzkWP8oU6//J1n9BbRBqa9bW43ii5rbztA2Ly7gG1yK+uXXZx2Ptb6tHQz +l55xcTAZy3rZk7bSQGMxRc7Wl3fQN3glbHTf6kq3b4capm98c3gRouevmK3rkQwu +B7qMVzibJszuAOwp81lY8GN34pK9/i4iTJ7fTZC5aowginYAbmU1JkABw7oIqsp1 +E8NIH0en4iyWDmjSGCHHNXYTTb0sXnl5zj3tUAKJW3IdMYx65PIrU0HkZ6E0IC6+ +vpaoQE1LjrPhQA3yWtq2ggxquAD5kc21UAHgbgECggEBAIJm+OotVmhDBrHsyr+R +47KqwGzA3uTifdGvZYM3rRhGt2rr/bDWZHmEO9SLK8ESpesymq022H3ZsVuf05Ox +PJpjUYP8HdgduucZMFPo7wGh1zeMdgVHrEkt9OFKdKOIwP97nod6/5gAhchOVZrz +lsGupL0ZRU7Or6KSm/LVZ/m96yamVQ3IM3EYbZ77xvuG/4XMt/EZZIIdKMFBPreB +aw7XMmLJvlKN7g3r4uLsGe4qnIrRNNQXq2vRa62tHCDp5PDamBtWQWgZu+or7ibs +CqN0eTKj6AMMuQdFWzk/17mhEt1rvl9if8hIbnn3YhM6RjgY7GA3xmtun6Q+lOBj +uLECggEBAKa14EHMADkcVIJrWplLmPVHbgSZtCCL3O2CGfqxTcn8urSu5wK24KCE +xUtVXgHx2KuP9cWlinAF8kYT9UGNja/fMooLix3POlyp0W3W1274/cYPUqmEQwZn +CNRrSiizCXi07PFyVScrx1rTb/5wuUAMyF0Vawo2dX9zITjxbI2Jaw68c5LU6zKY +Tq8HO/4KznfSPx9DhnO0NDJgKMVyfP+Il3ItruA1lVtU/N1Eubn4uvNRhNR9BIgt +i4G/jE3lC2SIyOMLSWNt7deyiMkiXvEUb3GBPyBWmZNspH8Xh3shmC1zRx/aiGjb +Vnk0Wqf704tn4ss7Mfo2SwcZxAjov58= +-----END PRIVATE KEY-----