#pragma once #include #include #include #include #include #include #include #include "connection_pool.h" #include "message_broker.h" #include "database.h" #include "connection_guard.h" class Worker { public: Worker(ConnectionPool &pool, MessageBroker &broker, std::string name) : pool(pool), broker(broker), workerName(std::move(name)), runningWorker(false), runningWatchdog(false) {} virtual ~Worker() { stopWorkerThread(); stopWatchdogThread(); } void startWorkerThread() { if (runningWorker.load()) { std::cerr << "[" << workerName << "] Worker thread already running, skipping start.\n"; return; } runningWorker.store(true); workerThread = std::thread([this]() { run(); }); } void stopWorkerThread() { runningWorker.store(false); if (workerThread.joinable()) { // Timeout für Thread-Beendigung auto future = std::async(std::launch::async, [this]() { workerThread.join(); }); if (future.wait_for(std::chrono::milliseconds(500)) == std::future_status::timeout) { std::cerr << "[" << workerName << "] Worker-Thread beendet sich nicht, erzwinge Beendigung..." << std::endl; // Thread wird beim Destruktor automatisch detached workerThread.detach(); } } } void enableWatchdog() { if (runningWatchdog.load()) { std::cerr << "[" << workerName << "] Watchdog already enabled, skipping.\n"; return; } runningWatchdog.store(true); watchdogThread = std::thread([this]() { watchdog(); }); } void stopWatchdogThread() { runningWatchdog.store(false); if (watchdogThread.joinable()) { // Timeout für Watchdog-Thread-Beendigung auto future = std::async(std::launch::async, [this]() { watchdogThread.join(); }); if (future.wait_for(std::chrono::milliseconds(200)) == std::future_status::timeout) { std::cerr << "[" << workerName << "] Watchdog-Thread beendet sich nicht, erzwinge Beendigung..." << std::endl; watchdogThread.detach(); } } } std::string getCurrentStep() { std::lock_guard lock(stepMutex); return currentStep; } std::string getStatus() { std::lock_guard lock(stepMutex); return "{\"worker\":\"" + workerName + "\", \"currentStep\":\"" + currentStep + "\"}"; } protected: virtual void run() = 0; void watchdog() { try { while (runningWatchdog.load()) { // Kürzere Sleep-Intervalle für bessere Shutdown-Responsivität for (int i = 0; i < 10 && runningWatchdog.load(); ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } if (!runningWatchdog.load()) break; bool isActive = false; { std::lock_guard lock(activityMutex); isActive = active; active = false; } if (!isActive) { std::cerr << "[" << workerName << "] Watchdog: Keine Aktivität! Starte Worker neu...\n"; std::cerr << "[" << workerName << "] Letzte Aktivität: " << getCurrentStep() << "\n"; stopWorkerThread(); if (runningWatchdog.load()) { // Nur neu starten wenn nicht shutdown startWorkerThread(); } } } } catch (const std::exception &e) { std::cerr << "[" << workerName << "] Watchdog: Ausnahme gefangen: " << e.what() << "\n"; } catch (...) { std::cerr << "[" << workerName << "] Watchdog: Unbekannte Ausnahme gefangen.\n"; } } void signalActivity() { std::lock_guard lock(activityMutex); active = true; } void setCurrentStep(const std::string &step) { std::lock_guard lock(stepMutex); currentStep = step; } void sendMessageToRegionUsers(const int ®ionId, nlohmann::json message) { ConnectionGuard guard(pool); auto &db = guard.get(); db.prepare("QUERY_GET_REGION_USERS", QUERY_GET_REGION_USERS); auto users = db.execute("QUERY_GET_REGION_USERS", {std::to_string(regionId)}); for (const auto &user: users) { message["user_id"] = user.at("user_id"); broker.publish(message.dump()); } } void sendMessageToFalukantUsers(const int &falukantUserId, nlohmann::json message) { message["user_id"] = falukantUserId; broker.publish(message.dump()); } void changeFalukantUserMoney(int falukantUserId, double moneyChange, std::string action, nlohmann::json message) { try { ConnectionGuard connGuard(pool); auto &db = connGuard.get(); db.prepare("QUERY_UPDATE_MONEY", QUERY_UPDATE_MONEY); db.execute("QUERY_UPDATE_MONEY", { std::to_string(falukantUserId), std::to_string(moneyChange), action }); sendMessageToFalukantUsers(falukantUserId, message); } catch (const std::exception &e) { std::cerr << "[" << workerName << "] Fehler in changeFalukantUserMoney: " << e.what() << "\n"; } } time_t getLastActivity() { return lastActivity; } protected: ConnectionPool &pool; MessageBroker &broker; std::string workerName; std::atomic runningWorker; std::atomic runningWatchdog; std::atomic active{false}; std::thread workerThread; std::thread watchdogThread; std::mutex activityMutex; std::chrono::seconds watchdogInterval{10}; std::mutex stepMutex; std::string currentStep; time_t lastActivity; private: static constexpr const char *QUERY_GET_REGION_USERS = R"( select c.user_id from falukant_data."character" c where c.region_id = $1 and c.user_id is not null; )"; static constexpr const char *QUERY_UPDATE_MONEY = R"( SELECT falukant_data.update_money( $1, $2, $3 ); )"; };