Files
yourpart3/src/worker.h
Torsten Schulz (local) bd961a03d4 Aktualisiere WebSocket-Server und Daemon-Konfiguration
- Ändere die Pfade für SSL-Zertifikate in der Konfigurationsdatei.
- Verbessere die Fehlerbehandlung beim Entfernen alter vorbereiteter Anweisungen in HouseWorker.
- Füge Debug-Ausgaben zur Nachverfolgung von Verbindungen und Nachrichten im WebSocket-Server hinzu.
- Implementiere Timeout-Logik für das Stoppen von Worker- und Watchdog-Threads.
- Optimiere die Signalverarbeitung und Shutdown-Logik in main.cpp für bessere Responsivität.
2026-01-14 14:38:42 +01:00

202 lines
6.5 KiB
C++

#pragma once
#include <atomic>
#include <thread>
#include <mutex>
#include <chrono>
#include <iostream>
#include <future>
#include <nlohmann/json.hpp>
#include "connection_pool.h"
#include "message_broker.h"
#include "database.h"
#include "connection_guard.h"
class Worker {
public:
Worker(ConnectionPool &pool, MessageBroker &broker, std::string name)
: pool(pool),
broker(broker),
workerName(std::move(name)),
runningWorker(false),
runningWatchdog(false)
{}
virtual ~Worker() {
stopWorkerThread();
stopWatchdogThread();
}
void startWorkerThread() {
if (runningWorker.load()) {
std::cerr << "[" << workerName << "] Worker thread already running, skipping start.\n";
return;
}
runningWorker.store(true);
workerThread = std::thread([this]() { run(); });
}
void stopWorkerThread() {
runningWorker.store(false);
if (workerThread.joinable()) {
// Timeout für Thread-Beendigung
auto future = std::async(std::launch::async, [this]() {
workerThread.join();
});
if (future.wait_for(std::chrono::milliseconds(500)) == std::future_status::timeout) {
std::cerr << "[" << workerName << "] Worker-Thread beendet sich nicht, erzwinge Beendigung..." << std::endl;
// Thread wird beim Destruktor automatisch detached
workerThread.detach();
}
}
}
void enableWatchdog() {
if (runningWatchdog.load()) {
std::cerr << "[" << workerName << "] Watchdog already enabled, skipping.\n";
return;
}
runningWatchdog.store(true);
watchdogThread = std::thread([this]() { watchdog(); });
}
void stopWatchdogThread() {
runningWatchdog.store(false);
if (watchdogThread.joinable()) {
// Timeout für Watchdog-Thread-Beendigung
auto future = std::async(std::launch::async, [this]() {
watchdogThread.join();
});
if (future.wait_for(std::chrono::milliseconds(200)) == std::future_status::timeout) {
std::cerr << "[" << workerName << "] Watchdog-Thread beendet sich nicht, erzwinge Beendigung..." << std::endl;
watchdogThread.detach();
}
}
}
std::string getCurrentStep() {
std::lock_guard<std::mutex> lock(stepMutex);
return currentStep;
}
std::string getStatus() {
std::lock_guard<std::mutex> lock(stepMutex);
return "{\"worker\":\"" + workerName + "\", \"currentStep\":\"" + currentStep + "\"}";
}
protected:
virtual void run() = 0;
void watchdog() {
try {
while (runningWatchdog.load()) {
// Kürzere Sleep-Intervalle für bessere Shutdown-Responsivität
for (int i = 0; i < 10 && runningWatchdog.load(); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (!runningWatchdog.load()) break;
bool isActive = false;
{
std::lock_guard<std::mutex> lock(activityMutex);
isActive = active;
active = false;
}
if (!isActive) {
std::cerr << "[" << workerName << "] Watchdog: Keine Aktivität! Starte Worker neu...\n";
std::cerr << "[" << workerName << "] Letzte Aktivität: " << getCurrentStep() << "\n";
stopWorkerThread();
if (runningWatchdog.load()) { // Nur neu starten wenn nicht shutdown
startWorkerThread();
}
}
}
} catch (const std::exception &e) {
std::cerr << "[" << workerName << "] Watchdog: Ausnahme gefangen: " << e.what() << "\n";
} catch (...) {
std::cerr << "[" << workerName << "] Watchdog: Unbekannte Ausnahme gefangen.\n";
}
}
void signalActivity() {
std::lock_guard<std::mutex> lock(activityMutex);
active = true;
}
void setCurrentStep(const std::string &step) {
std::lock_guard<std::mutex> lock(stepMutex);
currentStep = step;
}
void sendMessageToRegionUsers(const int &regionId, nlohmann::json message) {
ConnectionGuard guard(pool);
auto &db = guard.get();
db.prepare("QUERY_GET_REGION_USERS", QUERY_GET_REGION_USERS);
auto users = db.execute("QUERY_GET_REGION_USERS", {std::to_string(regionId)});
for (const auto &user: users) {
message["user_id"] = user.at("user_id");
broker.publish(message.dump());
}
}
void sendMessageToFalukantUsers(const int &falukantUserId, nlohmann::json message) {
message["user_id"] = falukantUserId;
broker.publish(message.dump());
}
void changeFalukantUserMoney(int falukantUserId, double moneyChange, std::string action, nlohmann::json message) {
try {
ConnectionGuard connGuard(pool);
auto &db = connGuard.get();
db.prepare("QUERY_UPDATE_MONEY", QUERY_UPDATE_MONEY);
db.execute("QUERY_UPDATE_MONEY", {
std::to_string(falukantUserId),
std::to_string(moneyChange),
action
});
sendMessageToFalukantUsers(falukantUserId, message);
} catch (const std::exception &e) {
std::cerr << "[" << workerName << "] Fehler in changeFalukantUserMoney: " << e.what() << "\n";
}
}
time_t getLastActivity() {
return lastActivity;
}
protected:
ConnectionPool &pool;
MessageBroker &broker;
std::string workerName;
std::atomic<bool> runningWorker;
std::atomic<bool> runningWatchdog;
std::atomic<bool> active{false};
std::thread workerThread;
std::thread watchdogThread;
std::mutex activityMutex;
std::chrono::seconds watchdogInterval{10};
std::mutex stepMutex;
std::string currentStep;
time_t lastActivity;
private:
static constexpr const char *QUERY_GET_REGION_USERS = R"(
select c.user_id
from falukant_data."character" c
where c.region_id = $1
and c.user_id is not null;
)";
static constexpr const char *QUERY_UPDATE_MONEY = R"(
SELECT falukant_data.update_money(
$1,
$2,
$3
);
)";
};