diff --git a/CMakeLists.txt b/CMakeLists.txt index eab6b24..1775a1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,22 +1,33 @@ cmake_minimum_required(VERSION 3.20) project(YourPartDaemon VERSION 1.0 LANGUAGES CXX) -set(CMAKE_CXX_STANDARD 20) +# C++ Standard and Compiler Settings +set(CMAKE_CXX_STANDARD 23) set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_C_COMPILER "gcc-13") -set(CMAKE_CPP_COMPILER "g++-13") +# Explicitly set compiler versions for Tumbleweed +set(CMAKE_C_COMPILER gcc-15) +set(CMAKE_CXX_COMPILER g++-15) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -flto=auto") set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -flto") +set(CMAKE_BUILD_TYPE Debug) +# Include /usr/local if needed +list(APPEND CMAKE_PREFIX_PATH /usr/local) + +# Find libwebsockets via pkg-config +find_package(PkgConfig REQUIRED) +pkg_check_modules(LWS REQUIRED libwebsockets) + +# Find other dependencies find_package(PostgreSQL REQUIRED) find_package(Threads REQUIRED) +find_package(nlohmann_json CONFIG REQUIRED) + +# PostgreSQL C++ libpqxx find_package(PkgConfig REQUIRED) pkg_check_modules(LIBPQXX REQUIRED libpqxx) -include_directories(${LIBPQXX_INCLUDE_DIRS}) -link_directories(${LIBPQXX_LIBRARY_DIRS}) -add_definitions(${LIBPQXX_CFLAGS_OTHER}) - +# Project sources and headers set(SOURCES src/main.cpp src/config.cpp @@ -26,6 +37,12 @@ set(SOURCES src/produce_worker.cpp src/message_broker.cpp src/websocket_server.cpp + src/stockagemanager.cpp + src/director_worker.cpp + src/valuerecalculationworker.cpp + src/usercharacterworker.cpp + src/houseworker.cpp + src/politics_worker.cpp ) set(HEADERS @@ -37,13 +54,34 @@ set(HEADERS src/produce_worker.h src/message_broker.h src/websocket_server.h + src/stockagemanager.h + src/director_worker.h + src/valuerecalculationworker.h + src/usercharacterworker.h + src/houseworker.h + src/politics_worker.h ) +# Define executable target add_executable(yourpart-daemon ${SOURCES} ${HEADERS}) -find_package(nlohmann_json CONFIG REQUIRED) -target_include_directories(yourpart-daemon PRIVATE ${PostgreSQL_INCLUDE_DIRS}) -target_link_libraries(yourpart-daemon PRIVATE ${PostgreSQL_LIBRARIES} Threads::Threads z ssl crypto ${CMAKE_SOURCE_DIR}/lib/uSockets.a ${LIBPQXX_LIBRARIES}) +# Include directories +target_include_directories(yourpart-daemon PRIVATE + ${PostgreSQL_INCLUDE_DIRS} + ${LIBPQXX_INCLUDE_DIRS} + ${LWS_INCLUDE_DIRS} +) +# Link libraries +target_link_libraries(yourpart-daemon PRIVATE + ${PostgreSQL_LIBRARIES} + Threads::Threads + z ssl crypto + ${LIBPQXX_LIBRARIES} + ${LWS_LIBRARIES} + nlohmann_json::nlohmann_json +) + +# Installation rules install(TARGETS yourpart-daemon DESTINATION /usr/local/bin) install(FILES daemon.conf DESTINATION /etc/yourpart/) diff --git a/CMakeLists.txt.user b/CMakeLists.txt.user new file mode 100644 index 0000000..fb68610 --- /dev/null +++ b/CMakeLists.txt.user @@ -0,0 +1,414 @@ + + + + + + EnvironmentId + {551ef6b3-a39b-43e2-9ee3-ad56e19ff4f4} + + + ProjectExplorer.Project.ActiveTarget + 0 + + + ProjectExplorer.Project.EditorSettings + + true + true + true + + Cpp + + CppGlobal + + + + QmlJS + + QmlJSGlobal + + + 2 + UTF-8 + false + 4 + false + 0 + 80 + true + true + 1 + 0 + false + true + false + 2 + true + true + 0 + 8 + true + false + 1 + true + true + true + *.md, *.MD, Makefile + false + true + true + + + + ProjectExplorer.Project.PluginSettings + + + true + false + true + true + true + true + + false + + + 0 + true + + true + true + Builtin.DefaultTidyAndClazy + 8 + true + + + + true + + + + + ProjectExplorer.Project.Target.0 + + Desktop + true + Importiertes Kit + Importiertes Kit + {78ff90a3-f672-45c2-ad08-343b0923896f} + 0 + 0 + 0 + + Debug + 2 + false + + -DCMAKE_BUILD_TYPE:STRING=Release +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} +-DCMAKE_GENERATOR:STRING=Unix Makefiles +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake +-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} +-DCMAKE_COLOR_DIAGNOSTICS:BOOL=ON + /home/torsten/Programs/yourpart-daemon/build/ + + + + + all + + false + + true + Erstellen + CMakeProjectManager.MakeStep + + 1 + Erstellen + Erstellen + ProjectExplorer.BuildSteps.Build + + + + + + clean + + false + + true + Erstellen + CMakeProjectManager.MakeStep + + 1 + Bereinigen + Bereinigen + ProjectExplorer.BuildSteps.Clean + + 2 + false + + false + + Release + CMakeProjectManager.CMakeBuildConfiguration + 0 + 0 + + + 0 + Deployment + Deployment + ProjectExplorer.BuildSteps.Deploy + + 1 + + false + ProjectExplorer.DefaultDeployConfiguration + + + + + + + + + false + + true + ApplicationManagerPlugin.Deploy.CMakePackageStep + + + install-package --acknowledge + true + Application Manager-Paket installieren + ApplicationManagerPlugin.Deploy.InstallPackageStep + + + + + + + + 2 + Deployment + Deployment + ProjectExplorer.BuildSteps.Deploy + + 1 + + false + ApplicationManagerPlugin.Deploy.Configuration + + 2 + + true + true + 0 + true + + 2 + + false + -e cpu-cycles --call-graph dwarf,4096 -F 250 + yourpart-daemon + CMakeProjectManager.CMakeRunConfiguration. + yourpart-daemon + false + true + true + true + /home/torsten/Programs/yourpart-daemon/build + + 1 + + + Debug + 2 + false + + -DCMAKE_BUILD_TYPE:STRING=Debug +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} +-DCMAKE_GENERATOR:STRING=Unix Makefiles +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake +-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} +-DCMAKE_COLOR_DIAGNOSTICS:BOOL=ON + /mnt/share/torsten/Programs/yourpart-daemon + /home/torsten/Programs/yourpart-daemon/build + + + + + all + + false + + true + CMakeProjectManager.MakeStep + + 1 + Erstellen + Erstellen + ProjectExplorer.BuildSteps.Build + + + + + + clean + + false + + true + CMakeProjectManager.MakeStep + + 1 + Bereinigen + Bereinigen + ProjectExplorer.BuildSteps.Clean + + 2 + false + + false + + Debug (importiert) + CMakeProjectManager.CMakeBuildConfiguration + 0 + -1 + + + 0 + Deployment + Deployment + ProjectExplorer.BuildSteps.Deploy + + 1 + + false + ProjectExplorer.DefaultDeployConfiguration + + + + + + + install + + false + + true + ApplicationManagerPlugin.Deploy.CMakePackageStep + + + install-package --acknowledge + true + Application Manager-Paket installieren + ApplicationManagerPlugin.Deploy.InstallPackageStep + + + + + + + + 2 + Deployment + Deployment + ProjectExplorer.BuildSteps.Deploy + + 1 + + false + ApplicationManagerPlugin.Deploy.Configuration + + 2 + 0 + + 2 + + + 0 + Deployment + Deployment + ProjectExplorer.BuildSteps.Deploy + + 1 + + false + ProjectExplorer.DefaultDeployConfiguration + + + + + + + + + false + + true + ApplicationManagerPlugin.Deploy.CMakePackageStep + + + install-package --acknowledge + true + Application Manager-Paket installieren + ApplicationManagerPlugin.Deploy.InstallPackageStep + + + + + + + + 2 + Deployment + Deployment + ProjectExplorer.BuildSteps.Deploy + + 1 + + false + ApplicationManagerPlugin.Deploy.Configuration + + 2 + + true + true + 0 + true + + 2 + + false + -e cpu-cycles --call-graph dwarf,4096 -F 250 + yourpart-daemon + CMakeProjectManager.CMakeRunConfiguration. + yourpart-daemon + false + true + true + true + /home/torsten/Programs/yourpart-daemon/build + + 1 + + + + ProjectExplorer.Project.TargetCount + 1 + + + ProjectExplorer.Project.Updater.FileVersion + 22 + + + Version + 22 + + diff --git a/CMakeLists.txt.user.d36652f b/CMakeLists.txt.user.d36652f new file mode 100644 index 0000000..4b2b1e0 --- /dev/null +++ b/CMakeLists.txt.user.d36652f @@ -0,0 +1,205 @@ + + + + + + EnvironmentId + {d36652ff-969b-426b-a63f-1edd325096c5} + + + ProjectExplorer.Project.ActiveTarget + 0 + + + ProjectExplorer.Project.EditorSettings + + true + false + true + + Cpp + + CppGlobal + + + + QmlJS + + QmlJSGlobal + + + 2 + UTF-8 + false + 4 + false + 80 + true + true + 1 + 0 + false + true + false + 0 + true + true + 0 + 8 + true + false + 1 + true + true + true + *.md, *.MD, Makefile + false + true + true + + + + ProjectExplorer.Project.PluginSettings + + + true + false + true + true + true + true + + + 0 + true + + true + true + Builtin.DefaultTidyAndClazy + 8 + true + + + + true + + + true + + + + + ProjectExplorer.Project.Target.0 + + Desktop + Importiertes Kit + Importiertes Kit + {3c6cfc13-714d-4db1-bd45-b9794643cc67} + 0 + 0 + 0 + + Debug + 2 + false + + -DCMAKE_GENERATOR:STRING=Unix Makefiles +-DCMAKE_BUILD_TYPE:STRING=Build +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake +-DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} +-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + /home/torsten/Programs/yourpart-daemon + /home/torsten/Programs/yourpart-daemon/build + + + + + all + + false + + true + Erstellen + CMakeProjectManager.MakeStep + + 1 + Erstellen + Erstellen + ProjectExplorer.BuildSteps.Build + + + + + + clean + + false + + true + Erstellen + CMakeProjectManager.MakeStep + + 1 + Bereinigen + Bereinigen + ProjectExplorer.BuildSteps.Clean + + 2 + false + + false + + Erstellen + CMakeProjectManager.CMakeBuildConfiguration + + 1 + + + 0 + Deployment + Deployment + ProjectExplorer.BuildSteps.Deploy + + 1 + + false + ProjectExplorer.DefaultDeployConfiguration + + 1 + + true + true + 0 + true + /usr/bin/valgrind + + 2 + + false + yourpart-daemon + CMakeProjectManager.CMakeRunConfiguration.yourpart-daemon + yourpart-daemon + false + true + true + true + /home/torsten/Programs/yourpart-daemon/build + + 1 + + + + ProjectExplorer.Project.TargetCount + 1 + + + ProjectExplorer.Project.Updater.FileVersion + 22 + + + Version + 22 + + diff --git a/src/character_creation_worker.cpp b/src/character_creation_worker.cpp index 2dd3793..116b78d 100644 --- a/src/character_creation_worker.cpp +++ b/src/character_creation_worker.cpp @@ -3,15 +3,21 @@ #include #include #include +#include CharacterCreationWorker::CharacterCreationWorker(ConnectionPool &pool, MessageBroker &broker) - : Worker(pool, broker, "CharacterCreationWorker") - , gen(std::random_device{}()) - , dist(2, 3) -{ + : Worker(pool, broker, "CharacterCreationWorker"), + gen(std::random_device{}()), + dist(2, 3), + deathCheckRunning(true), + deathThread(&CharacterCreationWorker::monitorCharacterDeaths, this) { } CharacterCreationWorker::~CharacterCreationWorker() { + deathCheckRunning.store(false); + if (deathThread.joinable()) { + deathThread.join(); + } } void CharacterCreationWorker::run() { @@ -38,22 +44,17 @@ bool CharacterCreationWorker::isTodayCharacterCreated() { auto &db = connGuard.get(); setCurrentStep("Execute Query"); auto results = db.query(QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED); - if (!results.empty()) { - std::string created_at_str = results[0].at("created_at"); - return true; - } + return !results.empty(); } catch (const std::exception &e) { - std::cerr << "[CharacterCreationWorker] Fehler in isTodayCharacterCreated: " - << e.what() << std::endl; + std::cerr << "[CharacterCreationWorker] Fehler in isTodayCharacterCreated: " << e.what() << std::endl; + return false; } - setCurrentStep("No previous day character found"); - return false; } void CharacterCreationWorker::createCharactersForToday() { loadNames(); if (first_name_cache.empty() || last_name_cache.empty()) { - std::cerr << "Fehler: Namen konnten nicht geladen werden." << std::endl; + std::cerr << "[CharacterCreationWorker] Fehler: Namen konnten nicht geladen werden." << std::endl; return; } @@ -67,7 +68,7 @@ void CharacterCreationWorker::createCharactersForRegion(int region_id) { std::vector nobility_stands = {1, 2, 3}; std::vector genders = {"male", "female"}; for (auto nobility : nobility_stands) { - for (auto &gender : genders) { + for (const auto &gender : genders) { int num_chars = dist(gen); for (int i = 0; i < num_chars; ++i) { createCharacter(region_id, gender, nobility); @@ -76,9 +77,7 @@ void CharacterCreationWorker::createCharactersForRegion(int region_id) { } } -void CharacterCreationWorker::createCharacter(int region_id, - const std::string &gender, - int title_of_nobility) { +void CharacterCreationWorker::createCharacter(int region_id, const std::string &gender, int title_of_nobility) { int first_name_id = getRandomFromSet(first_name_cache[gender]); if (first_name_id == -1) { std::cerr << "Fehler: Kein passender Vorname gefunden." << std::endl; @@ -94,54 +93,152 @@ void CharacterCreationWorker::createCharacter(int region_id, auto &db = connGuard.get(); db.prepare("insert_character", QUERY_INSERT_CHARACTER); - db.execute("insert_character", {std::to_string(region_id), - std::to_string(first_name_id), - std::to_string(last_name_id), - gender, + db.execute("insert_character", {std::to_string(region_id), + std::to_string(first_name_id), + std::to_string(last_name_id), + gender, std::to_string(title_of_nobility)}); } catch (const std::exception &e) { - std::cerr << "[CharacterCreationWorker] Fehler in createCharacter: " + std::cerr << "[CharacterCreationWorker] Fehler in createCharacter: " << e.what() << std::endl; + } +} + +void CharacterCreationWorker::monitorCharacterDeaths() { + while (deathCheckRunning) { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + auto results = db.query(QUERY_GET_ELIGIBLE_NPC_FOR_DEATH); + for (const auto &row : results) { + int characterId = std::stoi(row.at("id")); + int age = std::stoi(row.at("age")); + if (calculateDeathProbability(age)) { + handleCharacterDeath(characterId); + } + } + } catch (const std::exception &e) { + std::cerr << "[CharacterCreationWorker] Fehler beim Überprüfen von Todesfällen: " << e.what() << std::endl; + } + + std::this_thread::sleep_for(std::chrono::hours(1)); + } +} + +bool CharacterCreationWorker::calculateDeathProbability(int age) { + if (age < 60) { + return false; + } + + double baseProbability = 0.01; + double increasePerYear = 0.01; + double deathProbability = baseProbability + (increasePerYear * (age - 60)); + + std::uniform_real_distribution deathDist(0.0, 1.0); + return deathDist(gen) < deathProbability; +} + +void CharacterCreationWorker::handleCharacterDeath(int characterId) { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + // 1) Director löschen und User benachrichtigen + db.prepare("delete_director", QUERY_DELETE_DIRECTOR); + auto dirResult = db.execute("delete_director", { std::to_string(characterId) }); + if (!dirResult.empty()) { + int userId = std::stoi(dirResult[0].at("user_id")); + notifyUser(userId, "director_death"); + } + + // 2) Relationships löschen und betroffene User benachrichtigen + db.prepare("delete_relationship", QUERY_DELETE_RELATIONSHIP); + auto relResult = db.execute("delete_relationship", { std::to_string(characterId) }); + for (auto &row : relResult) { + int relatedUserId = std::stoi(row.at("related_user_id")); + notifyUser(relatedUserId, "relationship_death"); + } + + // 3) Child-Relations löschen und Eltern benachrichtigen + db.prepare("delete_child_relation", QUERY_DELETE_CHILD_RELATION); + auto childResult = db.execute("delete_child_relation", { std::to_string(characterId) }); + for (auto &row : childResult) { + int fatherUserId = std::stoi(row.at("father_user_id")); + int motherUserId = std::stoi(row.at("mother_user_id")); + notifyUser(fatherUserId, "child_death"); + notifyUser(motherUserId, "child_death"); + } + + // 4) Charakter als verstorben markieren + markCharacterAsDeceased(characterId); + + } catch (const std::exception &e) { + std::cerr << "[CharacterCreationWorker] Fehler beim Bearbeiten des Todes: " << e.what() << std::endl; } } +void CharacterCreationWorker::notifyUser(int userId, const std::string &eventType) { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + db.prepare("insert_notification", QUERY_INSERT_NOTIFICATION); + db.execute("insert_notification", { std::to_string(userId) }); + + nlohmann::json message = { + {"event", eventType}, + {"user_id", userId} + }; + broker.publish(message.dump()); + } catch (const std::exception &e) { + std::cerr << "[CharacterCreationWorker] Fehler beim Senden der Benachrichtigung: " + << e.what() << std::endl; + } +} + +void CharacterCreationWorker::markCharacterAsDeceased(int characterId) { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + db.prepare("mark_character_deceased", QUERY_MARK_CHARACTER_DECEASED); + db.execute("mark_character_deceased", {std::to_string(characterId)}); + } catch (const std::exception &e) { + std::cerr << "[CharacterCreationWorker] Fehler beim Markieren des Charakters als verstorben: " << e.what() << std::endl; + } +} + std::vector CharacterCreationWorker::getTownRegionIds() { try { ConnectionGuard connGuard(pool); auto &db = connGuard.get(); - auto rows = db.query(QUERY_GET_TOWN_REGION_IDS); - std::vector ids; - ids.reserve(rows.size()); for (const auto &row : rows) { ids.push_back(std::stoi(row.at("id"))); } return ids; } catch (const std::exception &e) { - std::cerr << "[CharacterCreationWorker] Fehler in getTownRegionIds: " - << e.what() << std::endl; + std::cerr << "[CharacterCreationWorker] Fehler in getTownRegionIds: " << e.what() << std::endl; + return {}; } - return {}; } void CharacterCreationWorker::loadNames() { try { ConnectionGuard connGuard(pool); auto &db = connGuard.get(); - auto firstNameRows = db.query(QUERY_LOAD_FIRST_NAMES); for (const auto &row : firstNameRows) { first_name_cache[row.at("gender")].insert(std::stoi(row.at("id"))); } - auto lastNameRows = db.query(QUERY_LOAD_LAST_NAMES); for (const auto &row : lastNameRows) { last_name_cache.insert(std::stoi(row.at("id"))); } } catch (const std::exception &e) { - std::cerr << "[CharacterCreationWorker] Fehler in loadNames: " - << e.what() << std::endl; + std::cerr << "[CharacterCreationWorker] Fehler in loadNames: " << e.what() << std::endl; } } diff --git a/src/character_creation_worker.h b/src/character_creation_worker.h index 18ead74..a629ca7 100644 --- a/src/character_creation_worker.h +++ b/src/character_creation_worker.h @@ -6,11 +6,13 @@ #include #include #include +#include +#include class CharacterCreationWorker : public Worker { public: CharacterCreationWorker(ConnectionPool &pool, MessageBroker &broker); - ~CharacterCreationWorker() override; + ~CharacterCreationWorker() override; protected: void run() override; @@ -20,6 +22,8 @@ private: std::uniform_int_distribution dist; std::unordered_map> first_name_cache; std::unordered_set last_name_cache; + std::atomic deathCheckRunning{true}; + std::thread deathThread; bool isTodayCharacterCreated(); void createCharactersForToday(); @@ -28,6 +32,11 @@ private: std::vector getTownRegionIds(); void loadNames(); int getRandomFromSet(const std::unordered_set &name_set); + void monitorCharacterDeaths(); + void handleCharacterDeath(int characterId); + void notifyUser(int userId, const std::string &eventType); + void markCharacterAsDeceased(int characterId); + bool calculateDeathProbability(int age); static constexpr const char *QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED = R"( SELECT created_at @@ -57,9 +66,97 @@ private: static constexpr const char *QUERY_INSERT_CHARACTER = R"( INSERT INTO falukant_data."character"( - user_id, region_id, first_name, last_name, + user_id, region_id, first_name, last_name, birthdate, gender, created_at, updated_at, title_of_nobility ) VALUES (NULL, $1, $2, $3, NOW(), $4, NOW(), NOW(), $5); )"; + + static constexpr const char *QUERY_GET_ELIGIBLE_NPC_FOR_DEATH = R"( + WITH aged AS ( + SELECT + c.id, + (current_date - c.birthdate::date) AS age, + c.user_id + FROM + falukant_data."character" c + WHERE + c.user_id IS NULL + AND (current_date - c.birthdate::date) > 60 + ), + always_sel AS ( + -- Immer mitnehmen: alle über 85 Tage + SELECT * + FROM aged + WHERE age > 85 + ), + random_sel AS ( + -- Zufallsstichprobe: alle zwischen 61 und 85 Tagen, hier beispielhaft auf 10 limitiert + SELECT * + FROM aged + WHERE age <= 85 + ORDER BY random() + LIMIT 10 -- <-- hier die gewünschte Anzahl anpassen + ) + -- Zusammenführen der beiden Mengen + SELECT * + FROM always_sel + UNION ALL + SELECT * + FROM random_sel; + )"; + + static constexpr const char *QUERY_DELETE_DIRECTOR = R"( + DELETE FROM falukant_data.director + WHERE director_character_id = $1 + RETURNING employer_user_id; + )"; + + static constexpr const char *QUERY_DELETE_RELATIONSHIP = R"( + WITH deleted AS ( + DELETE FROM falukant_data.relationship + WHERE character1_id = $1 + OR character2_id = $1 + RETURNING + CASE + WHEN character1_id = $1 THEN character2_id + ELSE character1_id + END AS related_character_id, + relationship_type_id + ) + SELECT + c.user_id AS related_user_id + FROM deleted d + JOIN falukant_data."character" c + ON c.id = d.related_character_id; + )"; + + static constexpr const char *QUERY_DELETE_CHILD_RELATION = R"( + WITH deleted AS ( + DELETE FROM falukant_data.child_relation + WHERE child_character_id = $1 + RETURNING + father_character_id, + mother_character_id + ) + SELECT + cf.user_id AS father_user_id, + cm.user_id AS mother_user_id + FROM deleted d + JOIN falukant_data."character" cf + ON cf.id = d.father_character_id + JOIN falukant_data."character" cm + ON cm.id = d.mother_character_id; + )"; + + static constexpr const char *QUERY_INSERT_NOTIFICATION = R"( + INSERT INTO falukant_log.notification (user_id, tr, shown, created_at, updated_at) + VALUES ($1, 'director_death', false, NOW(), NOW()); + )"; + + + static constexpr const char *QUERY_MARK_CHARACTER_DECEASED = R"( + DELETE FROM falukant_data."character" + WHERE id = $1; + )"; }; diff --git a/src/database.cpp b/src/database.cpp index c3dd39b..e3e7f9d 100644 --- a/src/database.cpp +++ b/src/database.cpp @@ -1,4 +1,9 @@ #include "database.h" +#include +#include +#include +#include +#include Database::Database(const std::string &conninfo) { @@ -9,7 +14,9 @@ Database::Database(const std::string &conninfo) } } catch (const std::exception &e) { std::cerr << "[Database] Fehler beim Verbinden: " << e.what() << std::endl; - throw; + + + throw; } } @@ -48,32 +55,16 @@ void Database::prepare(const std::string &stmtName, const std::string &sql) } } -#include -#include -#include -#include -#include -#include - -std::vector> -Database::execute(const std::string &stmtName, const std::vector ¶ms) -{ +Database::FieldList Database::execute(const std::string &stmtName, const std::vector ¶ms) { std::vector> rows; - try { pqxx::work txn(*connection_); pqxx::prepare::invocation inv = txn.prepared(stmtName); - - // Parameter einzeln hinzufügen for (auto &p : params) { inv(p); } - - // Ausführung pqxx::result r = inv.exec(); txn.commit(); - - // Ergebnis verarbeiten for (const auto &row : r) { std::unordered_map mapRow; for (pqxx::row::size_type i = 0; i < row.size(); ++i) { @@ -91,6 +82,12 @@ Database::execute(const std::string &stmtName, const std::vector &p return rows; } +void Database::remove(const std::string &stmtName) { + pqxx::work txn(*connection_); + txn.conn().unprepare(stmtName); + txn.commit(); +} + bool Database::isValid() const { try { if (!connection_ || !connection_->is_open()) { diff --git a/src/database.h b/src/database.h index 30fd07c..27cff2c 100644 --- a/src/database.h +++ b/src/database.h @@ -2,24 +2,25 @@ #include #include #include -#include #include -#include #include #include #include -class Database -{ +class Database { public: Database(const std::string &conninfo); + typedef std::unordered_map FieldMap; + typedef std::vector FieldList; + std::vector> query(const std::string &sql); void prepare(const std::string &stmtName, const std::string &sql); - std::vector> execute( + FieldList execute( const std::string &stmtName, const std::vector ¶ms = {} ); + void remove(const std::string &stmtName); bool isOpen() const { return connection_ && connection_->is_open(); } bool isValid() const; diff --git a/src/director_worker.cpp b/src/director_worker.cpp new file mode 100644 index 0000000..e3276aa --- /dev/null +++ b/src/director_worker.cpp @@ -0,0 +1,192 @@ +#include "director_worker.h" +#include + +DirectorWorker::DirectorWorker(ConnectionPool &pool, MessageBroker &broker) + : Worker(pool, broker, "DirectorWorker") { +} + +DirectorWorker::~DirectorWorker() { +} + +void DirectorWorker::run() { + auto lastExecutionTime = std::chrono::steady_clock::now(); + + while (runningWorker) { + signalActivity(); + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - lastExecutionTime).count(); + if (elapsed >= 60) { + try { + performTask(); + paySalary(); + calculateSatisfaction(); + lastExecutionTime = now; + } catch (const std::exception &e) { + std::cerr << "[DirectorWorker] Fehler beim Ausführen der Aufgabe: " << e.what() << std::endl; + } + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +void DirectorWorker::performTask() { + try { + setCurrentStep("Get Database Connection"); + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + setCurrentStep("Get director actions"); + db.prepare("QUERY_GET_DIRECTORS", QUERY_GET_DIRECTORS); + const auto directors = db.execute("QUERY_GET_DIRECTORS"); + for (const auto &director: directors) { + if (director.at("may_produce") == "t") { + startProductions(director); + } + if (director.at("may_start_transport") == "t") { + startTransports(director); + } + if (director.at("may_sell") == "t") { + startSellings(director); + } + } + } catch (const std::exception &e) { + std::cerr << "[DirectorWorker] Fehler bei der Datenbankoperation: " << e.what() << std::endl; + } +} + +void DirectorWorker::startProductions(std::unordered_map director) { + auto parseIntOrZero = [&](const std::string &s){ + if (s.empty() || s == "null") return 0; + try { + return std::stoi(s); + } catch(...) { + return 0; + } + }; + + setCurrentStep("Get Database Connection - Production"); + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + setCurrentStep("Get to produce"); + db.prepare("get_to_produce", QUERY_GET_BEST_PRODUCTION); + const auto productions = db.execute("get_to_produce", { director.at("id") }); + if (productions.empty()) return; + const auto &production = productions.at(0); + + int runningProductions = parseIntOrZero(production.at("running_productions")); + if (runningProductions >= 2) { + return; + } + + setCurrentStep("Add production to DB"); + int availableStock = parseIntOrZero(production.at("stock_size")); + int usedStock = parseIntOrZero(production.at("used_in_stock")); + int freeCapacity = availableStock - usedStock - runningProductions; + + int certificate = parseIntOrZero(production.at("certificate")); + int onePieceCost = certificate * 6; + int money = parseIntOrZero(production.at("money")); + int maxMoneyProduction = onePieceCost > 0 ? money / onePieceCost : 0; + + int toProduce = std::min(std::min(freeCapacity, maxMoneyProduction), 300); + if (toProduce < 1) { + return; + } + + int falukantUserId = parseIntOrZero(production.at("falukant_user_id")); + int productionCost = toProduce * onePieceCost; + + nlohmann::json msg1 = { { "event", "falukantUpdateStatus" } }; + setCurrentStep("Update money"); + changeFalukantUserMoney(falukantUserId, -productionCost, "director starts production", msg1); + + setCurrentStep("Insert production"); + db.prepare("insert_production", QUERY_INSERT_PRODUCTION); + + int remaining = toProduce; + while (remaining > 0) { + int batch = std::min(100, remaining); + db.execute("insert_production", { + production.at("branch_id"), + production.at("product_id"), + std::to_string(batch) + }); + remaining -= batch; + } + + nlohmann::json msg2 = { + { "event", "production_started" }, + { "branch_id", production.at("branch_id") } + }; + sendMessageToFalukantUsers(falukantUserId, msg2); +} + +void DirectorWorker::startTransports(std::unordered_map) { +} + +void DirectorWorker::startSellings(std::unordered_map director) { + setCurrentStep("Get Database Connection - Production"); + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + setCurrentStep("Get to sell"); + db.prepare("get_to_sell", QUERY_GET_INVENTORY); + const auto inventory = db.execute("get_to_sell", { director.at("id") }); + for (const auto &item: inventory) { + const auto inventoryId = std::stoi(item.at("id")); + const auto productId = std::stoi(item.at("product_id")); + const auto quantity = std::stoi(item.at("quantity")); + const auto quality = std::stoi(item.at("quality")); + const auto maxSellPrice = std::stod(item.at("sell_cost")); + auto falukantUserId = std::stoi(item.at("user_id")); + const auto regionId = std::stoi(item.at("region_id")); + if (quantity > 0) { + const auto minPrice = maxSellPrice * 0.6; + const auto pieceSellPrice = minPrice + (double)(maxSellPrice - minPrice) * (quality / 100.0); + const auto sellPrice = pieceSellPrice * quantity; + const nlohmann::json changeMessage = { + { "productId", productId }, + { "event", "falukantUpdateStatus" } + }; + changeFalukantUserMoney(falukantUserId, sellPrice, "sell products", changeMessage); + db.prepare("QUERY_ADD_SELL_LOG", QUERY_ADD_SELL_LOG); + db.execute("QUERY_ADD_SELL_LOG", { std::to_string(regionId), std::to_string(productId), std::to_string(quantity), + std::to_string(falukantUserId) }); + } + db.prepare("remove_inventory", QUERY_REMOVE_INVENTORY); + db.execute("remove_inventory", { std::to_string(inventoryId) }); + nlohmann::json message = { + { "event", "selled_items" }, + { "branch_id", item.at("branch_id") }, + }; + sendMessageToFalukantUsers(falukantUserId, message); + } +} + +void DirectorWorker::paySalary() { + setCurrentStep("salary - load to pay"); + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_GET_SALARY_TO_PAY", QUERY_GET_SALARY_TO_PAY); + const auto &salariesToPay = db.execute("QUERY_GET_SALARY_TO_PAY"); + nlohmann::json message = { + { "event", "falukantUpdateStatus" } + }; + for (auto const &item: salariesToPay) { + changeFalukantUserMoney(std::stoi(item.at("employer_user_id")), -std::stoi(item.at("income")), "director payed out", message); + db.prepare("QUERY_SET_SALARY_PAYED", QUERY_SET_SALARY_PAYED); + db.execute("QUERY_SET_SALARY_PAYED", { std::to_string(std::stoi(item.at("id"))) }); + } +} + +void DirectorWorker::calculateSatisfaction() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_UPDATE_SATISFACTION", QUERY_UPDATE_SATISFACTION); + const auto &changedDirectors = db.execute("QUERY_UPDATE_SATISFACTION"); + nlohmann::json message = { + { "event", "directorchanged" } + }; + for (auto const &director: changedDirectors) { + sendMessageToFalukantUsers(std::stoi(director.at("employer_user_id")), message); + } +} diff --git a/src/director_worker.h b/src/director_worker.h new file mode 100644 index 0000000..6453875 --- /dev/null +++ b/src/director_worker.h @@ -0,0 +1,155 @@ +#ifndef DIRECTOR_WORKER_H +#define DIRECTOR_WORKER_H + +#include "worker.h" + +class DirectorWorker : public Worker { +public: + explicit DirectorWorker(ConnectionPool &pool, MessageBroker &broker); + ~DirectorWorker() override; + +protected: + void run() override; + +private: + void performTask(); + void startProductions(std::unordered_map director); + void startTransports(std::unordered_map); + void startSellings(std::unordered_map); + void paySalary(); + void calculateSatisfaction(); + + static constexpr const char *QUERY_GET_DIRECTORS = R"( + select d.may_produce, d.may_sell, d.may_start_transport, b.id branch_id, fu.id falukantUserId, d.id + from falukant_data.director d + join falukant_data.falukant_user fu + on fu.id = d.employer_user_id + join falukant_data."character" c + on c.id = d.director_character_id + join falukant_data.branch b + on b.region_id = c.region_id + and b.falukant_user_id = fu.id + where current_time between '08:00:00' and '17:00:00' + )"; + + static constexpr const char *QUERY_GET_BEST_PRODUCTION = R"( + select fdu."id" falukant_user_id, fdu."money", fdu."certificate", ftp."id" product_id, ftp.label_tr,(select sum("quantity") + from "falukant_data"."stock" fds + where fds."branch_id" = fdb."id") stock_size, coalesce((select sum(coalesce(fdi."quantity", 0)) + from "falukant_data"."stock" fds + join "falukant_data"."inventory" fdi + on fdi."stock_id" = fds."id" + where fds."branch_id" = fdb."id"), 0) used_in_stock, + (ftp."sell_cost" * (fdtpw."worth_percent" + (fdk_character."knowledge" * 2 + fdk_director."knowledge") / 3) / 100 - 6 * ftp.category) / (300.0 * ftp. production_time) worth, + fdb."id" branch_id, + (select count("id") from "falukant_data"."production" where "branch_id" = fdb."id") running_productions, + coalesce((select sum(coalesce(fdp.quantity, 0)) quantity from + falukant_data.production fdp where fdp.branch_id = fdb.id), 0) running_productions + from "falukant_data"."director" fdd + join "falukant_data".character fdc + on fdc.id = fdd.director_character_id + join "falukant_data"."falukant_user" fdu + on fdd."employer_user_id" = fdu."id" + join "falukant_data"."character" user_character + on user_character."user_id" = fdu."id" + join "falukant_data"."branch" fdb + on fdb."falukant_user_id" = fdu."id" + and fdb."region_id" = fdc."region_id" + join "falukant_data"."town_product_worth" fdtpw + on fdtpw."region_id" = fdb."region_id" + join "falukant_data"."knowledge" fdk_character + on + fdk_character."product_id" = fdtpw."product_id" + and fdk_character."character_id" = user_character."id" + and fdk_character."product_id" = fdtpw."product_id" + join "falukant_data"."knowledge" fdk_director + on + fdk_director."product_id" = fdtpw."product_id" + and fdk_director."character_id" = fdd."director_character_id" + and fdk_director."product_id" = fdtpw."product_id" + join "falukant_type"."product" ftp + on + ftp."id" = fdtpw."product_id" + and ftp.category <= fdu.certificate + where fdd."id" = $1 + order by worth desc + limit 1; + )"; + + static constexpr const char *QUERY_INSERT_PRODUCTION = R"( + insert into "falukant_data"."production" ("branch_id", "product_id", "quantity") + values ($1, $2, $3) + )"; + + static constexpr const char *QUERY_GET_INVENTORY = R"( + select i.id, i.product_id, i.quantity, i.quality, p.sell_cost, fu.id user_id, b.region_id, b.id branch_id + from falukant_data.inventory i + join falukant_data.stock s + on s.id = i.stock_id + join falukant_data.branch b + on b.id = s.branch_id + join falukant_data.falukant_user fu + on fu.id = b.falukant_user_id + join falukant_data.director d + on d.employer_user_id = fu.id + join falukant_type.product p + on p.id = i.product_id + where d.id = $1 + )"; + + static constexpr const char *QUERY_REMOVE_INVENTORY = R"( + delete from falukant_data.inventory + where id = $1 + )"; + + static constexpr const char *QUERY_ADD_SELL_LOG = R"( + INSERT INTO falukant_log.sell ("region_id", "product_id", "quantity", "seller_id") + values ($1, $2, $3, $4) + ON CONFLICT ("region_id", "product_id", "seller_id") + DO UPDATE + SET "quantity" = falukant_log.sell."quantity" + EXCLUDED.quantity + )"; + + static constexpr const char *QUERY_GET_SALARY_TO_PAY = R"( + select d.id, d.employer_user_id, d.income + from falukant_data.director d + where date(d.last_salary_payout) < date(now()) + )"; + + static constexpr const char *QUERY_SET_SALARY_PAYED = R"( + update falukant_data.director + set last_salary_payout = NOW() + where id = $1 + )"; + + static constexpr const char *QUERY_UPDATE_SATISFACTION = R"( + WITH new_sats AS ( + SELECT + d.id, + ROUND( + d.income::numeric + / + ( + c.title_of_nobility + * POWER(1.231, AVG(k.knowledge) / 1.5) + ) + * 100 + ) AS new_satisfaction + FROM falukant_data.director d + JOIN falukant_data.knowledge k + ON d.director_character_id = k.character_id + JOIN falukant_data.character c + ON c.id = d.director_character_id + GROUP BY d.id, c.title_of_nobility, d.income + ) + UPDATE falukant_data.director dir + SET satisfaction = ns.new_satisfaction + FROM new_sats ns + WHERE dir.id = ns.id + -- Nur updaten, wenn sich der Wert tatsächlich ändert: + AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction + RETURNING dir.employer_user_id; + )"; +}; + +#endif // DIRECTOR_WORKER_H diff --git a/src/houseworker.cpp b/src/houseworker.cpp new file mode 100644 index 0000000..029e9bf --- /dev/null +++ b/src/houseworker.cpp @@ -0,0 +1,65 @@ +#include "houseworker.h" + +#include + +HouseWorker::HouseWorker(ConnectionPool &pool, MessageBroker &broker): + Worker(pool, broker, "HouseWorker") { + +} + +HouseWorker::~HouseWorker() { + +} + +void HouseWorker::run() { + auto lastExecutionTime = std::chrono::steady_clock::now(); + auto lastHouseStateChange = std::chrono::system_clock::now(); + while (runningWorker) { + signalActivity(); + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - lastExecutionTime).count(); + if (elapsed >= 3600) { + performTask(); + } + auto nowSystem = std::chrono::system_clock::now(); + auto lastDay = floor(lastHouseStateChange); + auto today = floor(nowSystem); + if (lastDay < today) { + performHouseStateChange(); + lastHouseStateChange = nowSystem; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +void HouseWorker::performTask() { + try { + setCurrentStep("Get Database Connection"); + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + setCurrentStep("Get new houses data"); + db.prepare("QUERY_GET_NEW_HOUSE_DATA", QUERY_GET_NEW_HOUSE_DATA); + const auto newHouses = db.execute("QUERY_GET_NEW_HOUSE_DATA"); + for (const auto &newHouse: newHouses) { + db.prepare("QUERY_ADD_NEW_BUYABLE_HOUSE", QUERY_ADD_NEW_BUYABLE_HOUSE); + db.execute("QUERY_ADD_NEW_BUYABLE_HOUSE", { newHouse.at("house_id") }); + } + } catch (const std::exception &e) { + std::cerr << "[HouseWorker] Fehler bei der Datenbankoperation: " << e.what() << std::endl; + } +} + +void HouseWorker::performHouseStateChange() { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.remove("QUERY_UPDATE_BUYABLE_HOUSE_STATE"); + db.remove("QUERY_UPDATE_USER_HOUSE_STATE"); + db.prepare("QUERY_UPDATE_BUYABLE_HOUSE_STATE", QUERY_UPDATE_BUYABLE_HOUSE_STATE); + db.prepare("QUERY_UPDATE_USER_HOUSE_STATE", QUERY_UPDATE_USER_HOUSE_STATE); + db.execute("QUERY_UPDATE_BUYABLE_HOUSE_STATE"); + db.execute("QUERY_UPDATE_USER_HOUSE_STATE"); + } catch(const std::exception &e) { + std::cerr << "[HouseWorker] Fehler bei der Datenbankoperation: " << e.what() << std::endl; + } +} diff --git a/src/houseworker.h b/src/houseworker.h new file mode 100644 index 0000000..5bdf5c2 --- /dev/null +++ b/src/houseworker.h @@ -0,0 +1,54 @@ +#ifndef HOUSEWORKER_H +#define HOUSEWORKER_H + +#include "worker.h" + +class HouseWorker : public Worker { +public: + HouseWorker(ConnectionPool &pool, MessageBroker &broker); + ~HouseWorker() override; + +protected: + void run() override; + +private: + void performTask(); + void performHouseStateChange(); + + static constexpr const char *QUERY_GET_NEW_HOUSE_DATA = R"( + SELECT + h.id AS house_id + FROM + falukant_type.house AS h + WHERE + random() < 0.0001 + and "label_tr" != 'under_bridge'; + )"; + + static constexpr const char *QUERY_ADD_NEW_BUYABLE_HOUSE = R"( + insert into falukant_data.buyable_house (house_type_id) values ($1); + )"; + + static constexpr const char *QUERY_UPDATE_BUYABLE_HOUSE_STATE = R"( + update falukant_data.buyable_house + set roof_condition = round(roof_condition - random() * (3 + 0 * id)), + floor_condition = round(floor_condition - random() * (3 + 0 * id)), + wall_condition = round(wall_condition - random() * (3 + 0 * id)), + window_condition = round(wall_condition - random() * (3 + 0 * id)) + )"; + + static constexpr const char *QUERY_UPDATE_USER_HOUSE_STATE = R"( + update falukant_data.user_house + set roof_condition = round(roof_condition - random() * (3 + 0 * id)), + floor_condition = round(floor_condition - random() * (3 + 0 * id)), + wall_condition = round(wall_condition - random() * (3 + 0 * id)), + window_condition = round(window_condition - random() * (3 + 0 * id)) + where house_type_id not in ( + select id + from falukant_type.house h + where h.label_tr = 'under_bridge' + ) + )"; +}; + +#endif // HOUSEWORKER_H diff --git a/src/main.cpp b/src/main.cpp index dfa9c39..85c29a2 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,13 +1,21 @@ #include "character_creation_worker.h" #include "produce_worker.h" +#include "stockagemanager.h" +#include "director_worker.h" +#include "valuerecalculationworker.h" #include "connection_pool.h" #include "websocket_server.h" #include "message_broker.h" +#include "usercharacterworker.h" +#include "houseworker.h" +#include "politics_worker.h" #include "config.h" #include #include #include #include +#include +#include std::atomic keepRunning(true); @@ -25,39 +33,44 @@ int main() { try { Config config("/etc/yourpart/daemon.conf"); ConnectionPool pool( - config.get("DB_HOST"), - config.get("DB_PORT"), + config.get("DB_HOST"), + config.get("DB_PORT"), config.get("DB_NAME"), - config.get("DB_USER"), - config.get("DB_PASSWORD"), + config.get("DB_USER"), + config.get("DB_PASSWORD"), 10 ); - int websocketPort = std::stoi(config.get("WEBSOCKET_PORT")); MessageBroker broker; WebSocketServer websocketServer(websocketPort, pool, broker); - CharacterCreationWorker creationWorker(pool, broker); - ProduceWorker produceWorker(pool, broker); + std::vector> workers; + workers.push_back(std::make_unique(pool, broker)); + workers.push_back(std::make_unique(pool, broker)); + workers.push_back(std::make_unique(pool, broker)); + workers.push_back(std::make_unique(pool, broker)); + workers.push_back(std::make_unique(pool, broker)); + workers.push_back(std::make_unique(pool, broker)); + workers.push_back(std::make_unique(pool, broker)); + workers.push_back(std::make_unique(pool, broker)); + websocketServer.setWorkers(workers); broker.start(); websocketServer.run(); - creationWorker.startWorkerThread(); - produceWorker.startWorkerThread(); - creationWorker.enableWatchdog(); - produceWorker.enableWatchdog(); - + for (auto &worker : workers) { + worker->startWorkerThread(); + worker->enableWatchdog(); + } while (keepRunning) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); } - - creationWorker.stopWorkerThread(); - produceWorker.stopWorkerThread(); + for (auto &worker : workers) { + worker->stopWorkerThread(); + } websocketServer.stop(); broker.stop(); } catch (const std::exception &e) { std::cerr << "Fehler: " << e.what() << std::endl; return 1; } - return 0; } diff --git a/src/politics_worker.cpp b/src/politics_worker.cpp new file mode 100644 index 0000000..d8188fb --- /dev/null +++ b/src/politics_worker.cpp @@ -0,0 +1,224 @@ +// File: politics_worker.cpp + +#include "politics_worker.h" +#include +#include + +PoliticsWorker::PoliticsWorker(ConnectionPool &pool, MessageBroker &broker) + : Worker(pool, broker, "PoliticsWorker") +{ +} + +PoliticsWorker::~PoliticsWorker() +{ +} + +void PoliticsWorker::run() { + auto lastExecutionDate = std::chrono::system_clock::time_point{}; + while (runningWorker) { + signalActivity(); + auto now = std::chrono::system_clock::now(); + auto todayFloor = std::chrono::floor(now); + auto targetTime = todayFloor + std::chrono::hours(3); // 03:00 Uhr + if (now >= targetTime && lastExecutionDate < todayFloor) { + signalActivity(); + performDailyPoliticsTask(); + lastExecutionDate = todayFloor; + } + for (int i = 0; i < 5 && runningWorker.load(); ++i) { + signalActivity(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } +} + +void PoliticsWorker::performDailyPoliticsTask() { + try { + // … (Schritte für Notifications und evaluatePoliticalPositions) … + + // 3) Elections anlegen und **je 2 × posts_to_fill** Kandidaten hinzufügen + { + setCurrentStep("Schedule Elections and Insert Candidates"); + + // 3a) Neue Elections erzeugen (liefert jetzt auch posts_to_fill) + auto elections = scheduleElections(); + + if (!elections.empty()) { + for (auto const & tpl : elections) { + int electionId = std::get<0>(tpl); + int regionId = std::get<1>(tpl); + int postsToFill = std::get<2>(tpl); + + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + db.prepare("INSERT_CANDIDATES", QUERY_INSERT_CANDIDATES); + // $1 = electionId, $2 = regionId, $3 = postsToFill + db.execute("INSERT_CANDIDATES", { + std::to_string(electionId), + std::to_string(regionId), + std::to_string(postsToFill) + }); + } + } + } + + // … nach scheduleElections() & Kandidaten … + { + setCurrentStep("Process Elections After 3 Days"); + auto newOffices = processElections(); + for (auto const &tup : newOffices) { + notifyOfficeFilled({tup}); + } + } + + } catch (std::exception const & e) { + std::cerr << "[PoliticsWorker] Fehler bei performDailyPoliticsTask: " << e.what() << "\n"; + } +} + +void PoliticsWorker::evaluatePoliticalPositions( + std::unordered_map& requiredPerRegion, + std::unordered_map& occupiedPerRegion +) { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + signalActivity(); + db.prepare("COUNT_OFFICES_PER_REGION", QUERY_COUNT_OFFICES_PER_REGION); + signalActivity(); + const auto result = db.execute("COUNT_OFFICES_PER_REGION"); + signalActivity(); + + for (const auto &row : result) { + int regionId = std::stoi(row.at("region_id")); + int reqCount = std::stoi(row.at("required_count")); + int occCount = std::stoi(row.at("occupied_count")); + + requiredPerRegion[regionId] = reqCount; + occupiedPerRegion[regionId] = occCount; + + signalActivity(); + } +} + +// politics_worker.cpp (Auszug) + +std::vector> PoliticsWorker::scheduleElections() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + signalActivity(); + db.prepare("SELECT_NEEDED_ELECTIONS", QUERY_SELECT_NEEDED_ELECTIONS); + signalActivity(); + auto result = db.execute("SELECT_NEEDED_ELECTIONS"); + signalActivity(); + std::vector> created; + created.reserve(result.size()); + for (auto const & row : result) { + int electionId = std::stoi(row.at("election_id")); + int regionId = std::stoi(row.at("region_id")); + int postsToFill = std::stoi(row.at("posts_to_fill")); + created.emplace_back(electionId, regionId, postsToFill); + signalActivity(); + } + return created; +} + +std::vector> PoliticsWorker::processExpiredOfficesAndFill() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + signalActivity(); + db.prepare("PROCESS_EXPIRED_AND_FILL", QUERY_PROCESS_EXPIRED_AND_FILL); + signalActivity(); + const auto result = db.execute("PROCESS_EXPIRED_AND_FILL"); + signalActivity(); + + std::vector> created; + for (const auto &row : result) { + int officeId = std::stoi(row.at("office_id")); + int officeTypeId = std::stoi(row.at("office_type_id")); + int characterId = std::stoi(row.at("character_id")); + int regionId = std::stoi(row.at("region_id")); + created.emplace_back(officeId, officeTypeId, characterId, regionId); + signalActivity(); + } + return created; +} + +std::vector PoliticsWorker::getUserIdsInCitiesOfRegions(const std::vector& regionIds) { + if (regionIds.empty()) { + return {}; + } + + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + std::vector userIds; + for (int rid : regionIds) { + signalActivity(); + db.prepare("GET_USERS_IN_CITIES", QUERY_USERS_IN_CITIES_OF_REGIONS); + signalActivity(); + const auto rows = db.execute("GET_USERS_IN_CITIES", { std::to_string(rid) }); + signalActivity(); + + for (const auto &row : rows) { + int uid = std::stoi(row.at("user_id")); + userIds.push_back(uid); + signalActivity(); + } + } + return userIds; +} + +void PoliticsWorker::notifyOfficeExpirations() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + signalActivity(); + db.prepare("NOTIFY_OFFICE_EXPIRATION", QUERY_NOTIFY_OFFICE_EXPIRATION); + signalActivity(); + db.execute("NOTIFY_OFFICE_EXPIRATION"); + signalActivity(); +} + +void PoliticsWorker::notifyElectionCreated(const std::vector>& elections) { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + db.prepare("NOTIFY_ELECTION_CREATED", QUERY_NOTIFY_ELECTION_CREATED); + for (const auto &pr : elections) { + signalActivity(); + db.execute("NOTIFY_ELECTION_CREATED", { std::to_string(pr.first) }); + signalActivity(); + } +} + +void PoliticsWorker::notifyOfficeFilled(const std::vector>& newOffices) { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + db.prepare("NOTIFY_OFFICE_FILLED", QUERY_NOTIFY_OFFICE_FILLED); + for (const auto &tup : newOffices) { + int characterId = std::get<2>(tup); + signalActivity(); + db.execute("NOTIFY_OFFICE_FILLED", { std::to_string(characterId) }); + signalActivity(); + } +} + +std::vector> PoliticsWorker::processElections() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("PROCESS_ELECTIONS", QUERY_PROCESS_ELECTIONS); + auto result = db.execute("PROCESS_ELECTIONS", {}); + std::vector> created; + for (auto const &row : result) { + int officeId = std::stoi(row.at("office_id")); + int officeTypeId = std::stoi(row.at("office_type_id")); + int characterId = std::stoi(row.at("character_id")); + int regionId = std::stoi(row.at("region_id")); + created.emplace_back(officeId, officeTypeId, characterId, regionId); + } + return created; +} diff --git a/src/politics_worker.h b/src/politics_worker.h new file mode 100644 index 0000000..31985e6 --- /dev/null +++ b/src/politics_worker.h @@ -0,0 +1,460 @@ +// File: politics_worker.h + +#ifndef POLITICS_WORKER_H +#define POLITICS_WORKER_H + +#include "worker.h" +#include +#include +#include + +class PoliticsWorker : public Worker { +public: + PoliticsWorker(ConnectionPool &pool, MessageBroker &broker); + ~PoliticsWorker() override; + +protected: + void run() override; + +private: + void performDailyPoliticsTask(); + + void evaluatePoliticalPositions( + std::unordered_map& requiredPerRegion, + std::unordered_map& occupiedPerRegion + ); + + std::vector> scheduleElections(); + std::vector> processExpiredOfficesAndFill(); + std::vector getUserIdsInCitiesOfRegions(const std::vector& regionIds); + + void notifyOfficeExpirations(); + void notifyElectionCreated(const std::vector>& elections); + void notifyOfficeFilled(const std::vector>& newOffices); + std::vector > processElections(); + + // ------------------------------------------------------------ + // QUERY: Zähle pro Region, wie viele Sitze vorgesehen vs. besetzt sind + // ------------------------------------------------------------ + static constexpr const char* QUERY_COUNT_OFFICES_PER_REGION = R"( + WITH + seats_per_region AS ( + SELECT + pot.id AS office_type_id, + rt.id AS region_id, + pot.seats_per_region AS seats_total + FROM + falukant_type.political_office_type AS pot + JOIN + falukant_type.region AS rt + ON pot.region_type = rt.label_tr + ), + occupied AS ( + SELECT + po.office_type_id, + po.region_id, + COUNT(*) AS occupied_count + FROM + falukant_data.political_office AS po + GROUP BY + po.office_type_id, po.region_id + ), + combined AS ( + SELECT + spr.region_id, + spr.seats_total AS required_count, + COALESCE(o.occupied_count, 0) AS occupied_count + FROM + seats_per_region AS spr + LEFT JOIN + occupied AS o + ON spr.office_type_id = o.office_type_id + AND spr.region_id = o.region_id + ) + SELECT + region_id, + SUM(required_count) AS required_count, + SUM(occupied_count) AS occupied_count + FROM combined + GROUP BY region_id; + )"; + + // ------------------------------------------------------------ + // STEP 1: Erzeuge nur diejenigen Wahlen, bei denen noch keine Election + // für denselben Termin (NOW()+2 Tage) existiert. + // ------------------------------------------------------------ + static constexpr const char* QUERY_SELECT_NEEDED_ELECTIONS = R"( + WITH + -- 1) Definiere das heutige Datum einmal als Referenz + target_date AS ( + SELECT NOW()::date AS election_date + ), + + -- 2) Lösche nur diejenigen Ämter, deren Ablaufdatum heute erreicht ist, + -- und merke deren (office_type_id, region_id) + expired_today AS ( + DELETE FROM falukant_data.political_office AS po + USING falukant_type.political_office_type AS pot + WHERE po.office_type_id = pot.id + AND (po.created_at + (pot.term_length * INTERVAL '1 day'))::date = (SELECT election_date FROM target_date) + RETURNING + pot.id AS office_type_id, + po.region_id AS region_id + ), + + -- 3) Gruppiere nach Typ+Region und zähle, wie viele Sitze heute frei geworden sind + gaps_per_region AS ( + SELECT + office_type_id, + region_id, + COUNT(*) AS gaps + FROM expired_today + GROUP BY office_type_id, region_id + ), + + -- 4) Filtere nur diejenigen Typ+Region‐Kombinationen, für die noch **keine** Election + -- mit genau demselben Datum angelegt wurde + to_schedule AS ( + SELECT + g.office_type_id, + g.region_id, + g.gaps, + td.election_date + FROM + gaps_per_region AS g + CROSS JOIN + target_date AS td + WHERE NOT EXISTS ( + SELECT 1 + FROM falukant_data.election AS e + WHERE e.office_type_id = g.office_type_id + AND e.region_id = g.region_id + AND e."date"::date = td.election_date + ) + ), + + -- 5) Lege für jede so gefilterte Kombination genau eine Election an + new_elections AS ( + INSERT INTO falukant_data.election + (office_type_id, "date", posts_to_fill, created_at, updated_at, region_id) + SELECT + ts.office_type_id, + ts.election_date AS "date", + ts.gaps AS posts_to_fill, + NOW() AS created_at, + NOW() AS updated_at, + ts.region_id + FROM + to_schedule AS ts + RETURNING + id AS election_id, + region_id, + posts_to_fill + ) + + -- 6) Gib alle neu angelegten Wahlen zurück + SELECT + ne.election_id, + ne.region_id, + ne.posts_to_fill + FROM + new_elections AS ne + ORDER BY + ne.region_id, + ne.election_id; + )"; + + // ----------------------------------------------------------------------- + // 2) Fügt für eine gegebene Election genau LIMIT = ($3 * 2) Kandidaten ein: + // $1 = election_id, $2 = region_id, $3 = Anzahl der Sitze (posts_to_fill) + // ----------------------------------------------------------------------- + static constexpr const char* QUERY_INSERT_CANDIDATES = R"( + INSERT INTO falukant_data.candidate + (election_id, character_id, created_at, updated_at) + SELECT + $1 AS election_id, + sub.id AS character_id, + NOW() AS created_at, + NOW() AS updated_at + FROM ( + WITH RECURSIVE region_tree AS ( + SELECT r.id + FROM falukant_data.region AS r + WHERE r.id = $2 + UNION ALL + SELECT r2.id + FROM falukant_data.region AS r2 + JOIN region_tree AS rt + ON r2.parent_id = rt.id + ) + SELECT + ch.id + FROM + falukant_data."character" AS ch + JOIN + region_tree AS rt2 + ON ch.region_id = rt2.id + WHERE + ch.user_id IS NULL + AND ch.birthdate <= NOW() - INTERVAL '21 days' + AND ch.title_of_nobility IN ( + SELECT id + FROM falukant_type.title + WHERE label_tr != 'noncivil' + ) + ORDER BY RANDOM() + LIMIT ($3 * 2) + ) AS sub(id); + )"; + + // ------------------------------------------------------------ + // STEP 2: Füge eine einzelne neue Election ein und liefere die neue election_id + // $1 = office_type_id + // $2 = gaps (posts_to_fill) + // ------------------------------------------------------------ + static constexpr const char* QUERY_INSERT_ELECTION = R"( + INSERT INTO falukant_data.election + (office_type_id, "date", posts_to_fill, created_at, updated_at) + VALUES + ( + $1, + NOW() + INTERVAL '2 days', + $2, + NOW(), + NOW() + ) + RETURNING id; + )"; + + // ------------------------------------------------------------ + // QUERY: Process Expired Offices & Refill (Winner + Random) + // ------------------------------------------------------------ + static constexpr const char* QUERY_PROCESS_EXPIRED_AND_FILL = R"( + WITH + expired_offices AS ( + DELETE FROM falukant_data.political_office AS po + USING falukant_type.political_office_type AS pot + WHERE po.office_type_id = pot.id + AND (po.created_at + (pot.term_length * INTERVAL '1 day')) <= NOW() + RETURNING + pot.id AS office_type_id, + po.region_id AS region_id + ), + distinct_types AS ( + SELECT DISTINCT + office_type_id, + region_id + FROM expired_offices + ), + votes_per_candidate AS ( + SELECT + dt.office_type_id, + dt.region_id, + c.character_id, + COUNT(v.id) AS vote_count + FROM distinct_types AS dt + JOIN falukant_data.election AS e + ON e.office_type_id = dt.office_type_id + JOIN falukant_data.vote AS v + ON v.election_id = e.id + JOIN falukant_data.candidate AS c + ON c.election_id = e.id + AND c.id = v.candidate_id + WHERE e."date" >= (NOW() - INTERVAL '30 days') + GROUP BY + dt.office_type_id, + dt.region_id, + c.character_id + ), + ranked_winners AS ( + SELECT + vpc.office_type_id, + vpc.region_id, + vpc.character_id, + ROW_NUMBER() OVER ( + PARTITION BY vpc.office_type_id, vpc.region_id + ORDER BY vpc.vote_count DESC + ) AS rn + FROM votes_per_candidate AS vpc + ), + selected_winners AS ( + SELECT + rw.office_type_id, + rw.region_id, + rw.character_id + FROM ranked_winners AS rw + JOIN falukant_type.political_office_type AS pot + ON pot.id = rw.office_type_id + WHERE rw.rn <= pot.seats_per_region + ), + insert_winners AS ( + INSERT INTO falukant_data.political_office + (office_type_id, character_id, created_at, updated_at, region_id) + SELECT + sw.office_type_id, + sw.character_id, + NOW() AS created_at, + NOW() AS updated_at, + sw.region_id + FROM selected_winners AS sw + RETURNING + id AS new_office_id, + office_type_id, + character_id, + region_id + ), + count_inserted AS ( + SELECT + office_type_id, + region_id, + COUNT(*) AS inserted_count + FROM insert_winners + GROUP BY office_type_id, region_id + ), + needed_to_fill AS ( + SELECT + dt.office_type_id, + dt.region_id, + (pot.seats_per_region - COALESCE(ci.inserted_count, 0)) AS gaps + FROM distinct_types AS dt + JOIN falukant_type.political_office_type AS pot + ON pot.id = dt.office_type_id + LEFT JOIN count_inserted AS ci + ON ci.office_type_id = dt.office_type_id + AND ci.region_id = dt.region_id + WHERE (pot.seats_per_region - COALESCE(ci.inserted_count, 0)) > 0 + ), + random_candidates AS ( + SELECT + rtf.office_type_id, + rtf.region_id, + ch.id AS character_id, + ROW_NUMBER() OVER ( + PARTITION BY rtf.office_type_id, rtf.region_id + ORDER BY RANDOM() + ) AS rn + FROM needed_to_fill AS rtf + JOIN falukant_data."character" AS ch + ON ch.region_id = rtf.region_id + AND ch.user_id IS NULL + AND ch.birthdate <= NOW() - INTERVAL '21 days' + AND ch.title_of_nobility IN ( + SELECT id FROM falukant_type.title WHERE label_tr != 'noncivil' + ) + AND NOT EXISTS ( + SELECT 1 + FROM falukant_data.political_office AS po2 + JOIN falukant_type.political_office_type AS pot2 + ON pot2.id = po2.office_type_id + WHERE po2.character_id = ch.id + AND (po2.created_at + (pot2.term_length * INTERVAL '1 day')) > NOW() + INTERVAL '2 days' + ) + ), + insert_random AS ( + INSERT INTO falukant_data.political_office + (office_type_id, character_id, created_at, updated_at, region_id) + SELECT + rc.office_type_id, + rc.character_id, + NOW() AS created_at, + NOW() AS updated_at, + rc.region_id + FROM random_candidates AS rc + JOIN needed_to_fill AS rtf + ON rtf.office_type_id = rc.office_type_id + AND rtf.region_id = rc.region_id + WHERE rc.rn <= rtf.gaps + RETURNING + id AS new_office_id, + office_type_id, + character_id, + region_id + ) + SELECT + new_office_id AS office_id, + office_type_id, + character_id, + region_id + FROM insert_winners + UNION ALL + SELECT + new_office_id AS office_id, + office_type_id, + character_id, + region_id + FROM insert_random; + )"; + + // ------------------------------------------------------------ + // QUERY: Hole User-IDs in allen Cities untergeordneter Regionen: + // ------------------------------------------------------------ + static constexpr const char* QUERY_USERS_IN_CITIES_OF_REGIONS = R"( + WITH RECURSIVE region_tree AS ( + SELECT id + FROM falukant_data.region + WHERE id = $1 + UNION ALL + SELECT r2.id + FROM falukant_data.region AS r2 + JOIN region_tree AS rt + ON r2.parent_id = rt.id + ) + SELECT DISTINCT + ch.user_id + FROM + falukant_data."character" AS ch + JOIN + region_tree AS rt2 + ON ch.region_id = rt2.id + WHERE + ch.user_id IS NOT NULL; + )"; + + // ------------------------------------------------------------ + // QUERY: Benachrichtige User, deren Amt in 2 Tagen abläuft + // ------------------------------------------------------------ + static constexpr const char* QUERY_NOTIFY_OFFICE_EXPIRATION = R"( + INSERT INTO falukant_log.notification + (user_id, tr, created_at, updated_at) + SELECT + po.character_id, + 'notify_office_expiring', + NOW(), NOW() + FROM + falukant_data.political_office AS po + JOIN + falukant_type.political_office_type AS pot + ON po.office_type_id = pot.id + WHERE + (po.created_at + (pot.term_length * INTERVAL '1 day')) + BETWEEN (NOW() + INTERVAL '2 days') + AND (NOW() + INTERVAL '2 days' + INTERVAL '1 second'); + )"; + + // ------------------------------------------------------------ + // QUERY: Benachrichtige User, wenn Election angelegt wurde + // ------------------------------------------------------------ + static constexpr const char* QUERY_NOTIFY_ELECTION_CREATED = R"( + INSERT INTO falukant_log.notification + (user_id, tr, created_at, updated_at) + VALUES + ($1, 'notify_election_created', NOW(), NOW()); + )"; + + // ------------------------------------------------------------ + // QUERY: Benachrichtige User, wenn Amt neu besetzt wurde + // ------------------------------------------------------------ + static constexpr const char* QUERY_NOTIFY_OFFICE_FILLED = R"( + INSERT INTO falukant_log.notification + (user_id, tr, created_at, updated_at) + VALUES + ($1, 'notify_office_filled', NOW(), NOW()); + )"; + + static constexpr const char* QUERY_PROCESS_ELECTIONS = R"( + SELECT office_id, office_type_id, character_id, region_id + FROM falukant_data.process_elections(); + )"; +}; + +#endif // POLITICS_WORKER_H diff --git a/src/produce_worker.cpp b/src/produce_worker.cpp index df749ea..bca873f 100644 --- a/src/produce_worker.cpp +++ b/src/produce_worker.cpp @@ -1,14 +1,12 @@ #include "produce_worker.h" -#include "connection_guard.h" // Include for ConnectionGuard +#include "connection_guard.h" #include #include #include #include ProduceWorker::ProduceWorker(ConnectionPool &pool, MessageBroker &broker) - : Worker(pool, broker, "ProduceWorker") -{ -} + : Worker(pool, broker, "ProduceWorker") {} ProduceWorker::~ProduceWorker() { } @@ -43,10 +41,8 @@ void ProduceWorker::processProductions() { setCurrentStep("Get Database Connection"); ConnectionGuard connGuard(pool); auto &db = connGuard.get(); - setCurrentStep("Fetch Finished Productions"); auto finishedProductions = getFinishedProductions(db); - setCurrentStep("Process Finished Productions"); for (const auto &production : finishedProductions) { if (production.find("branch_id") == production.end() || @@ -56,16 +52,20 @@ void ProduceWorker::processProductions() { production.find("user_id") == production.end()) { continue; } - int branchId = std::stoi(production.at("branch_id")); int productId = std::stoi(production.at("product_id")); int quantity = std::stoi(production.at("quantity")); int quality = std::stoi(production.at("quality")); int userId = std::stoi(production.at("user_id")); - - if (addToInventory(db, branchId, productId, quantity, quality, userId)) { - } + int regionId = std::stoi(production.at("region_id")); + addToInventory(db, branchId, productId, quantity, quality, userId); deleteProduction(db, production.at("production_id")); + addProductionToLog(regionId, userId, productId, quantity); + const nlohmann::json message = { + {"event", "production_ready"}, + {"branch_id", std::to_string(branchId) } + }; + sendMessageToFalukantUsers(userId, message); } } catch (const std::exception &e) { std::cerr << "[ProduceWorker] Fehler in processProductions: " << e.what() << std::endl; @@ -83,13 +83,12 @@ std::vector> ProduceWorker::getFini return {}; } -bool ProduceWorker::addToInventory(Database &db, - int branchId, - int productId, - int quantity, - int quality, - int userId) -{ +bool ProduceWorker::addToInventory(Database &db, + int branchId, + int productId, + int quantity, + int quality, + int userId) { try { db.prepare("get_stocks", QUERY_GET_AVAILABLE_STOCKS); auto stocks = db.execute("get_stocks", {std::to_string(branchId)}); @@ -99,22 +98,29 @@ bool ProduceWorker::addToInventory(Database &db, int totalCapacity = std::stoi(stock.at("total_capacity")); int filledCapacity = std::stoi(stock.at("filled")); int freeCapacity = totalCapacity - filledCapacity; - if (freeCapacity <= 0) { continue; } - int toStore = std::min(remainingQuantity, freeCapacity); if (!storeInStock(db, stockId, productId, toStore, quality)) { return false; } remainingQuantity -= toStore; - sendProductionReadyEvent(userId, productId, quantity, quality, branchId); if (remainingQuantity <= 0) { break; } } - return (remainingQuantity == 0); + if (remainingQuantity == 0) { + sendProductionReadyEvent(userId, productId, quantity, quality, branchId); + return true; + } + db.prepare("QUERY_ADD_OVERPRODUCTION_NOTIFICATION", QUERY_ADD_OVERPRODUCTION_NOTIFICATION); + nlohmann::json notification = { + {"tr", "production.overproduction"}, + {"value", remainingQuantity} + }; + db.execute("QUERY_ADD_OVERPRODUCTION_NOTIFICATION", {std::to_string(userId), notification.dump()}); + return true; } catch (const std::exception &e) { std::cerr << "[ProduceWorker] Fehler in addToInventory: " << e.what() << std::endl; } @@ -125,8 +131,7 @@ bool ProduceWorker::storeInStock(Database &db, int stockId, int productId, int quantity, - int quality) -{ + int quality) { try { db.prepare("insert_inventory", QUERY_INSERT_INVENTORY); db.execute("insert_inventory", {std::to_string(stockId), @@ -170,3 +175,15 @@ void ProduceWorker::sendProductionReadyEvent(int userId, << e.what() << std::endl; } } + +void ProduceWorker::addProductionToLog(int regionId, int userId, int productId, int quantity) { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_INSERT_UPDATE_PRODUCTION_LOG", QUERY_INSERT_UPDATE_PRODUCTION_LOG); + db.execute("QUERY_INSERT_UPDATE_PRODUCTION_LOG", { std::to_string(regionId), std::to_string(productId), + std::to_string(productId), std::to_string(userId) }); + } catch (const std::exception &e) { + + } +} diff --git a/src/produce_worker.h b/src/produce_worker.h index d3737bc..1355f92 100644 --- a/src/produce_worker.h +++ b/src/produce_worker.h @@ -2,38 +2,35 @@ #include "worker.h" #include -#include #include - -class ProduceWorker : public Worker -{ +class ProduceWorker : public Worker { public: explicit ProduceWorker(ConnectionPool &pool, MessageBroker &broker); ~ProduceWorker() override; protected: - void run() override; // überschreibt Worker::run() + void run() override; private: - // Fachlogik void processProductions(); std::vector> getFinishedProductions(Database &db); bool addToInventory(Database &db, int branchId, int productId, int quantity, int quality, int userId); bool storeInStock(Database &db, int stockId, int productId, int quantity, int quality); void deleteProduction(Database &db, const std::string &productionId); void sendProductionReadyEvent(int userId, int productId, int quantity, int quality, int branchId); + void addProductionToLog(int regionId, int userId, int productId, int quantity); static constexpr const char *QUERY_GET_FINISHED_PRODUCTIONS = R"( - SELECT - p.id AS production_id, - p.branch_id, - p.product_id, - p.quantity, - p.start_timestamp, - pr.production_time, - k.character_id, - k.knowledge AS quality, + SELECT DISTINCT + p.id AS production_id, + p.branch_id, + p.product_id, + p.quantity, + p.start_timestamp, + pr.production_time, + k.character_id, + case when k2.id is not null then (k.knowledge * 2 + k2.knowledge) / 3 else k.knowledge end AS quality, br.region_id, br.falukant_user_id user_id FROM falukant_data.production p @@ -42,6 +39,8 @@ private: JOIN falukant_data.character c ON c.user_id = br.falukant_user_id JOIN falukant_data.knowledge k ON p.product_id = k.product_id AND k.character_id = c.id JOIN falukant_data.stock s ON s.branch_id = br.id + LEFT JOIN falukant_data.director d on d.employer_user_id = c.user_id + LEFT JOIN falukant_data.knowledge k2 on k2.character_id = d.director_character_id and k2.product_id = p.product_id WHERE p.start_timestamp + interval '1 minute' * pr.production_time <= NOW() ORDER BY p.start_timestamp; )"; @@ -67,4 +66,24 @@ private: INSERT INTO falukant_data.inventory (stock_id, product_id, quantity, quality, produced_at) VALUES ($1, $2, $3, $4, NOW()); )"; + + static constexpr const char *QUERY_INSERT_UPDATE_PRODUCTION_LOG = R"( + INSERT INTO falukant_log.production ( + region_id, + product_id, + quantity, + producer_id, + production_date + ) + VALUES ($1, $2, $3, $4, CURRENT_DATE) + ON CONFLICT (producer_id, product_id, region_id, production_date) + DO UPDATE + SET quantity = falukant_log.production.quantity + EXCLUDED.quantity; + )"; + + static constexpr const char *QUERY_ADD_OVERPRODUCTION_NOTIFICATION = R"( + INSERT INTO falukant_log.notification + (user_id, tr, shown, created_at, updated_at) + VALUES($1, $2, false, now(), now()); + )"; }; diff --git a/src/stockagemanager.cpp b/src/stockagemanager.cpp new file mode 100644 index 0000000..ccf7820 --- /dev/null +++ b/src/stockagemanager.cpp @@ -0,0 +1,98 @@ +#include "stockagemanager.h" +#include "connection_guard.h" +#include +#include +#include +#include + +StockageManager::StockageManager(ConnectionPool &pool, MessageBroker &broker) + : Worker(pool, broker, "StockageManager") {} + +StockageManager::~StockageManager() { + addStocksRunning = false; + if (addStocksThread.joinable()) addStocksThread.join(); +} + +void StockageManager::run() { + addStocksThread = std::thread([this]() { addLocalStocks(); }); + while (runningWorker) { + setCurrentStep("Main loop: Running..."); + std::this_thread::sleep_for(std::chrono::seconds(1)); + signalActivity(); + } +} + +void StockageManager::addLocalStocks() { + auto lastExecutionTime = std::chrono::steady_clock::now(); + std::uniform_real_distribution<> dist(0.0, 1.0); + while (addStocksRunning) { + signalActivity(); + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - lastExecutionTime).count(); + if (elapsed >= 60) { + try { + setCurrentStep("Add Local Stocks: Fetch Town IDs"); + auto townIds = getTownIds(); + for (const auto &townId : townIds) { + std::mt19937 gen(std::random_device{}()); + double chance = round(dist(gen) * 2160); + if (chance <= 1) { + addStockForTown(townId); + } + } + } catch (const std::exception &e) { + std::cerr << "[StockageManager] Fehler in addLocalStocks: " << e.what() << std::endl; + } + lastExecutionTime = now; + } + cleanupBuyableSotck(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + + +std::vector StockageManager::getTownIds() { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("get_towns", QUERY_GET_TOWNS); + const auto towns = db.execute("get_towns"); + std::vector townIds; + for (const auto &town: towns) { + auto id = town.at("id"); + townIds.push_back(std::stoi(id)); + } + return townIds; + } catch (const std::exception &e) { + std::cerr << "[ProduceWorker] Fehler beim Abrufen abgeschlossener Produktionen: " + << e.what() << std::endl; + } + return {}; +} + +void StockageManager::addStockForTown(int townId) { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("add_stock", QUERY_INSERT_STOCK); + db.execute("add_stock", {std::to_string(townId)}); + nlohmann::json message = { + {"event", "stock_change"}, + {"branch", std::to_string(townId) } + }; + sendMessageToRegionUsers(townId, message); + } catch (const std::exception &e) { + std::cerr << "[StockageManager] Fehler in addStockForTown: " << e.what() << std::endl; + } +} + +void StockageManager::cleanupBuyableSotck() { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("cleanup_stock", QUERY_CLEANUP_STOCK); + db.execute("cleanup_stock", {}); + } catch (const std::exception &e) { + std::cerr << "[StockageManager] Fehler bei stock cleanup: " << e.what() << std::endl; + } +} diff --git a/src/stockagemanager.h b/src/stockagemanager.h new file mode 100644 index 0000000..18f4cb1 --- /dev/null +++ b/src/stockagemanager.h @@ -0,0 +1,59 @@ +#pragma once + +#include "worker.h" +#include +#include + +class StockageManager : public Worker { +public: + explicit StockageManager(ConnectionPool &pool, MessageBroker &broker); + ~StockageManager() override; + +protected: + void run() override; + +private: + void addLocalStocks(); + std::vector getTownIds(); + void addStockForTown(int townId); + + std::atomic addStocksRunning{true}; + std::thread addStocksThread; + + static constexpr const char *QUERY_GET_TOWNS = R"( + SELECT fdr.id + from falukant_data.region fdr + join falukant_type.region ftr + on ftr.id = fdr.region_type_id + where ftr.label_tr = 'city' + )"; + + static constexpr const char *QUERY_INSERT_STOCK = R"( + INSERT INTO falukant_data.buyable_stock (region_id, stock_type_id, quantity) + SELECT + $1 AS region_id, + s.id AS stock_type_id, + GREATEST(1, ROUND(RANDOM() * 5 * COUNT(br.id))) AS quantity + FROM + falukant_data.branch AS br + CROSS JOIN + falukant_type.stock AS s + WHERE + br.region_id = $1 + GROUP BY + s.id + ORDER BY + RANDOM() + LIMIT + GREATEST( + ROUND(RANDOM() * (SELECT COUNT(id) FROM falukant_type.stock)), + 1 + ); + )"; + + static constexpr const char *QUERY_CLEANUP_STOCK = R"( + delete from falukant_data.buyable_stock + where quantity <= 0 + )"; + void cleanupBuyableSotck(); +}; diff --git a/src/usercharacterworker.cpp b/src/usercharacterworker.cpp new file mode 100644 index 0000000..7f8dbe1 --- /dev/null +++ b/src/usercharacterworker.cpp @@ -0,0 +1,361 @@ +#include "usercharacterworker.h" +#include "connection_guard.h" +#include +#include +#include +#include +#include + +UserCharacterWorker::UserCharacterWorker(ConnectionPool &pool, MessageBroker &broker) + : Worker(pool, broker, "UserCharacterWorker"), + gen(rd()), dist(0.0, 1.0) {} + +UserCharacterWorker::~UserCharacterWorker() {} + +void UserCharacterWorker::run() { + using namespace std::chrono; + + auto lastExecutionTime = steady_clock::now(); + int lastPregnancyDay = -1; + while (runningWorker) { + signalActivity(); + auto nowSteady = steady_clock::now(); + auto elapsed = duration_cast(nowSteady - lastExecutionTime).count(); + if (elapsed >= 3600) { + try { + processCharacterEvents(); + updateCharactersMood(); + handleCredits(); + } catch (const std::exception &e) { + std::cerr << "[UserCharacterWorker] Fehler in processCharacterEvents: " << e.what() << std::endl; + } + lastExecutionTime = nowSteady; + } + { + auto nowSys = system_clock::now(); + std::time_t t = system_clock::to_time_t(nowSys); + std::tm local_tm; + localtime_r(&t, &local_tm); + if (local_tm.tm_hour == 6 && local_tm.tm_yday != lastPregnancyDay) { + try { + processPregnancies(); + } catch (const std::exception &e) { + std::cerr << "[UserCharacterWorker] Fehler in processPregnancies: " << e.what() << std::endl; + } + lastPregnancyDay = local_tm.tm_yday; + } + } + std::this_thread::sleep_for(seconds(1)); + recalculateKnowledge(); + } +} + +void UserCharacterWorker::processCharacterEvents() { + setCurrentStep("Get character data"); + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare(QUERY_GET_USERS_TO_UPDATE, QUERY_GET_USERS_TO_UPDATE); + auto rows = db.execute(QUERY_GET_USERS_TO_UPDATE); + std::vector characters; + for (const auto &row : rows) { + characters.push_back({ std::stoi(row.at("id")), std::stoi(row.at("age")), std::stoi(row.at("health")) }); + } + for (auto &character : characters) { + updateCharacterHealth(character); + } +} + +void UserCharacterWorker::updateCharacterHealth(Character& character) { + int healthChange = calculateHealthChange(character.age); + + if (healthChange != 0) { + character.health = std::max(0, character.health + healthChange); + if (character.health == 0) { + handleCharacterDeath(character.id); + return; + } + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_UPDATE_CHARACTERS_HEALTH", QUERY_UPDATE_CHARACTERS_HEALTH); + db.execute("QUERY_UPDATE_CHARACTERS_HEALTH", + { std::to_string(character.health), std::to_string(character.id) }); + } +} + +void UserCharacterWorker::updateCharactersMood() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_UPDATE_MOOD", QUERY_UPDATE_MOOD); + db.execute("QUERY_UPDATE_MOOD"); +} + +int UserCharacterWorker::calculateHealthChange(int age) { + if (age < 30) { + return 0; + } + if (age >= 45) { + double probability = std::min(1.0, 0.1 + (age - 45) * 0.02); + if (dist(gen) < probability) { + return -std::uniform_int_distribution(1, 10)(gen); + } + return 0; + } + double probability = (age - 30) / 30.0; + return (dist(gen) < probability) ? -1 : 0; +} + +void UserCharacterWorker::handleCharacterDeath(int characterId) { + setHeir(characterId); + + nlohmann::json deathEvent = { + {"event", "CharacterDeath"}, + {"character_id", characterId} + }; + + broker.publish(deathEvent.dump()); + + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + db.prepare("delete_character", "DELETE FROM falukant_data.character WHERE id = $1"); + db.execute("delete_character", { std::to_string(characterId) }); +} + +void UserCharacterWorker::setHeir(int characterId) { + auto falukantUserId = getFalukantUserId(characterId); + auto heirId = getHeirFromChildren(characterId); + auto newMoney = calculateNewMoney(falukantUserId, true); + if (heirId < 1) { + getRandomHeir(characterId); + newMoney = calculateNewMoney(falukantUserId, false); + } + setNewCharacter(falukantUserId, heirId); + setNewMoney(falukantUserId, newMoney); +} + +int UserCharacterWorker::getFalukantUserId(int characterId) { + ConnectionGuard guard(pool); + auto &db = guard.get(); + + db.prepare("QUERY_GET_FALUKANT_USER_ID", QUERY_GET_FALUKANT_USER_ID); + const auto rows = db.execute("QUERY_GET_FALUKANT_USER_ID", { std::to_string(characterId) }); + if (!rows.empty() && !rows.front().at("user_id").empty()) { + return std::stoi(rows.front().at("user_id")); + } + return -1; +} + +int UserCharacterWorker::getHeirFromChildren(int deceasedCharacterId) { + ConnectionGuard guard(pool); + auto &db = guard.get(); + + db.prepare("QUERY_GET_HEIR", QUERY_GET_HEIR); + const auto rows = db.execute("QUERY_GET_HEIR", { std::to_string(deceasedCharacterId) }); + if (!rows.empty()) { + return std::stoi(rows.front().at("child_character_id")); + } + return -1; +} + +int UserCharacterWorker::getRandomHeir(int deceasedCharacterId) { + ConnectionGuard guard(pool); + auto &db = guard.get(); + + db.prepare("QUERY_RANDOM_HEIR", QUERY_RANDOM_HEIR); + const auto rows = db.execute("QUERY_RANDOM_HEIR", { std::to_string(deceasedCharacterId) }); + if (!rows.empty()) { + return std::stoi(rows.front().at("child_character_id")); + } + return -1; +} + +void UserCharacterWorker::setNewCharacter(int falukantUserId, int heirCharacterId) { + if (heirCharacterId < 1) return; + + ConnectionGuard guard(pool); + auto &db = guard.get(); + + db.prepare("QUERY_SET_CHARACTER_USER", QUERY_SET_CHARACTER_USER); + db.execute("QUERY_SET_CHARACTER_USER", { + std::to_string(falukantUserId), + std::to_string(heirCharacterId) + }); +} + +void UserCharacterWorker::setNewMoney(int falukantUserId, double newAmount) { + ConnectionGuard guard(pool); + auto &db = guard.get(); + + db.prepare("QUERY_UPDATE_USER_MONEY", QUERY_UPDATE_USER_MONEY); + db.execute("QUERY_UPDATE_USER_MONEY", { + std::to_string(newAmount), + std::to_string(falukantUserId) + }); +} + +void UserCharacterWorker::recalculateKnowledge() { + setCurrentStep("Get character data"); + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_UPDATE_GET_ITEMS_TO_UPDATE", QUERY_UPDATE_GET_ITEMS_TO_UPDATE); + auto rows = db.execute("QUERY_UPDATE_GET_ITEMS_TO_UPDATE"); + for (const auto &updateItem: rows) { + if (std::stoi(updateItem.at("quantity")) >= 10) { + db.prepare("QUERY_UPDATE_GET_CHARACTER_IDS", QUERY_UPDATE_GET_CHARACTER_IDS); + auto charactersData = db.execute("QUERY_UPDATE_GET_CHARACTER_IDS", { updateItem.at("producer_id") }); + for (const auto &characterRow: charactersData) { + db.prepare("QUERY_UPDATE_KNOWLEDGE", QUERY_UPDATE_KNOWLEDGE); + if (characterRow.at("director_id") == "") { + db.execute("QUERY_UPDATE_KNOWLEDGE", { characterRow.at("character_id"), updateItem.at("product_id"), "2" }); + } else { + db.execute("QUERY_UPDATE_KNOWLEDGE", { characterRow.at("character_id"), updateItem.at("product_id"), "1" }); + db.execute("QUERY_UPDATE_KNOWLEDGE", { characterRow.at("director_id"), updateItem.at("product_id"), "1" }); + } + } + } + db.prepare("QUERY_DELETE_LOG_ENTRY", QUERY_DELETE_LOG_ENTRY); + db.execute("QUERY_DELETE_LOG_ENTRY", { updateItem.at("id") }); + const nlohmann::json message = { + {"event", "knowledge_update"}, + }; + sendMessageToFalukantUsers(std::stoi(updateItem.at("producer_id")), message); + } +} + +void UserCharacterWorker::processPregnancies() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_AUTOBATISM", QUERY_AUTOBATISM); + db.execute("QUERY_AUTOBATISM"); + db.prepare("get_candidates", QUERY_GET_PREGNANCY_CANDIDATES); + auto rows = db.execute("get_candidates"); + const nlohmann::json message = { + {"event", "children_update"}, + }; + for (auto const &row : rows) { + int fatherCid = std::stoi(row.at("father_cid")); + int motherCid = std::stoi(row.at("mother_cid")); + int fatherUid = std::stoi(row.at("father_uid")); + int motherUid = std::stoi(row.at("mother_uid")); + int titleOfNobility = std::stoi(row.at("title_of_nobility")); + int lastName = std::stoi(row.at("last_name")); + int regionId = std::stoi(row.at("region_id")); + std::string gender = (dist(gen) < 0.5) ? "male" : "female"; + db.prepare("insert_child", QUERY_INSERT_CHILD); + auto resChild = db.execute("insert_child", { + std::to_string(regionId), // $1 + gender, // $2 + std::to_string(lastName), // $3 + std::to_string(titleOfNobility) // $4 + }); + int childCid = std::stoi(resChild.front().at("child_cid")); + db.prepare("insert_relation", QUERY_INSERT_CHILD_RELATION); + auto resRel = db.execute("insert_relation", { + std::to_string(fatherCid), + std::to_string(motherCid), + std::to_string(childCid) + }); + const nlohmann::json message = {{"event", "children_update"}}; + sendMessageToFalukantUsers(fatherUid, message); + sendMessageToFalukantUsers(motherUid, message); + } +} + +void UserCharacterWorker::handleCredits() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_GET_OPEN_CREDITS", QUERY_GET_OPEN_CREDITS); + const auto &credits = db.execute("QUERY_GET_OPEN_CREDITS"); + const nlohmann::json message = { + { "event", "falukantUpdateStatus" } + }; + db.prepare("QUERY_UPDATE_CREDIT", QUERY_UPDATE_CREDIT); + db.prepare("QUERY_ADD_CHARACTER_TO_DEBTORS_PRISM", QUERY_ADD_CHARACTER_TO_DEBTORS_PRISM); + for (const auto &credit: credits) { + const auto userMoney = std::stod(credit.at("money")); + auto remainingAmount = std::stod(credit.at("remaining_amount")); + const auto amount = std::stod(credit.at("amount")); + const auto fee = std::stoi(credit.at("interest_rate")); + const auto falukantUserId = std::stoi(credit.at("user_id")); + const auto payRate = amount / 10 + amount * fee / 100; + remainingAmount -= payRate; + if (payRate <= userMoney - (payRate * 3)) { + changeFalukantUserMoney(falukantUserId, -payRate, "credit pay rate", message); + } else { + if (credit.at("prism_started_previously") == "t") { + changeFalukantUserMoney(falukantUserId, payRate, "debitor_prism", message); + } else { + db.execute("QUERY_ADD_CHARACTER_TO_DEBTORS_PRISM", { credit.at("character_id") }); + } + } + db.execute("QUERY_UPDATE_CREDIT", { std::to_string(remainingAmount), std::to_string(falukantUserId) }); + } + db.prepare("QUERY_CLEANUP_CREDITS", QUERY_CLEANUP_CREDITS); + db.execute("QUERY_CLEANUP_CREDITS"); +} + +double UserCharacterWorker::getCurrentMoney(int falukantUserId) { + ConnectionGuard g(pool); auto &db = g.get(); + db.prepare("GET_CURRENT_MONEY", QUERY_GET_CURRENT_MONEY); + auto rows = db.execute("GET_CURRENT_MONEY", {std::to_string(falukantUserId)}); + return rows.empty()? 0.0 : std::stod(rows.front().at("sum")); +} + +double UserCharacterWorker::getHouseValue(int falukantUserId) { + ConnectionGuard g(pool); auto &db = g.get(); + db.prepare("HOUSE_VALUE", QUERY_HOUSE_VALUE); + auto rows = db.execute("HOUSE_VALUE", {std::to_string(falukantUserId)}); + return rows.empty()? 0.0 : std::stod(rows.front().at("sum")); +} + +double UserCharacterWorker::getSettlementValue(int falukantUserId) { + ConnectionGuard g(pool); auto &db = g.get(); + db.prepare("SETTLEMENT_VALUE", QUERY_SETTLEMENT_VALUE); + auto rows = db.execute("SETTLEMENT_VALUE", {std::to_string(falukantUserId)}); + return rows.empty()? 0.0 : std::stod(rows.front().at("sum")); +} + +double UserCharacterWorker::getInventoryValue(int falukantUserId) { + ConnectionGuard g(pool); auto &db = g.get(); + db.prepare("INVENTORY_VALUE", QUERY_INVENTORY_VALUE); + auto rows = db.execute("INVENTORY_VALUE", {std::to_string(falukantUserId)}); + return rows.empty()? 0.0 : std::stod(rows.front().at("sum")); +} + +double UserCharacterWorker::getCreditDebt(int falukantUserId) { + ConnectionGuard guard(pool); + auto &db = guard.get(); + + db.prepare("CREDIT_DEBT", QUERY_CREDIT_DEBT); + auto rows = db.execute("CREDIT_DEBT", { std::to_string(falukantUserId) }); + return rows.empty() + ? 0.0 + : std::stod(rows.front().at("sum")); +} + +int UserCharacterWorker::getChildCount(int deceasedUserId) { + ConnectionGuard g(pool); auto &db = g.get(); + db.prepare("COUNT_CHILDREN", QUERY_COUNT_CHILDREN); + auto rows = db.execute("COUNT_CHILDREN", {std::to_string(deceasedUserId)}); + return rows.empty()? 0 : std::stoi(rows.front().at("cnt")); +} + +double UserCharacterWorker::calculateNewMoney(int falukantUserId, bool hasHeir) { + if (!hasHeir) { + return 800.0; + } + double cash = getCurrentMoney(falukantUserId); + double houses = getHouseValue(falukantUserId); + double sets = getSettlementValue(falukantUserId); + double inv = getInventoryValue(falukantUserId); + double debt = getCreditDebt(falukantUserId); + double totalAssets = cash + houses + sets + inv - debt; + int childCount = getChildCount(falukantUserId); + bool single = (childCount <= 1); + double heirShare = single ? totalAssets : totalAssets * 0.8; + double net = heirShare - (houses + sets + inv + debt); + if (net <= 1000.0) { + return 1000.0; + } + return net; +} diff --git a/src/usercharacterworker.h b/src/usercharacterworker.h new file mode 100644 index 0000000..392e3a6 --- /dev/null +++ b/src/usercharacterworker.h @@ -0,0 +1,374 @@ +#ifndef USERCHARACTERWORKER_H +#define USERCHARACTERWORKER_H + +#include "worker.h" +#include + +class UserCharacterWorker : public Worker { +public: + UserCharacterWorker(ConnectionPool &pool, MessageBroker &broker); + ~UserCharacterWorker() override; + +protected: + void run() override; + +private: + struct Character { + int id; + int age; + int health; + }; + + void processCharacterEvents(); + void updateCharacterHealth(Character& character); + void updateCharactersMood(); + int calculateHealthChange(int age); + void handleCharacterDeath(int characterId); + void recalculateKnowledge(); + void processPregnancies(); + void handleCredits(); + void setHeir(int characterId); + int getFalukantUserId(int characterId); + int getHeirFromChildren(int deceasedCharacterId); + int getRandomHeir(int deceasedCharacterId); + void setNewCharacter(int falukantUserId, int heirCharacterId); + void setNewMoney(int falukantUserId, double newAmount); + double getHouseValue(int falukantUserId); + double getSettlementValue(int falukantUserId); + double getInventoryValue(int falukantUserId); + double getCreditDebt(int falukantUserId); + double getCurrentMoney(int falukantUserId); + double calculateNewMoney(int falukantUserId, bool hasHeir); + int getChildCount(int deceasedUserId); + + std::random_device rd; + std::mt19937 gen; + std::uniform_real_distribution<> dist; + bool didRunToday { false }; + + static constexpr const char *QUERY_GET_USERS_TO_UPDATE = R"( + SELECT "id", CURRENT_DATE - birthdate::date AS age, "health" + FROM "falukant_data"."character" + WHERE "user_id" IS NOT NULL; + )"; + + static constexpr const char *QUERY_UPDATE_CHARACTERS_HEALTH = R"( + UPDATE "falukant_data"."character" + SET health = $1 + WHERE id = $2 + )"; + + static constexpr const char *QUERY_UPDATE_GET_ITEMS_TO_UPDATE = R"( + SELECT id, product_id, producer_id, quantity + FROM falukant_log.production p + WHERE p.production_timestamp::date < current_date + )"; + + static constexpr const char *QUERY_UPDATE_GET_CHARACTER_IDS = R"( + select fu.id user_id, c.id character_id, c2.id director_id + from falukant_data.falukant_user fu + join falukant_data."character" c + on c.user_id = fu.id + left join falukant_data.director d + on d.employer_user_id = fu.id + left join falukant_data."character" c2 + on c2.id = d.director_character_id + where fu.id = $1 + )"; + + static constexpr const char *QUERY_UPDATE_KNOWLEDGE = R"( + update falukant_data.knowledge + set knowledge = least(knowledge + $3, 100) + where character_id = $1 + and product_id = $2 + )"; + + static constexpr const char *QUERY_DELETE_LOG_ENTRY = R"( + delete from falukant_log.production + where id = $1 + )"; + + static constexpr char const* QUERY_GET_PREGNANCY_CANDIDATES = R"( + SELECT + r.character1_id AS father_cid, + r.character2_id AS mother_cid, + c1.title_of_nobility, + c1.last_name, + c1.region_id, + fu1.id AS father_uid, + fu2.id AS mother_uid, + -- Durchschnittsalter in Tagen + ((NOW()::date - c1.birthdate::date) + + (NOW()::date - c2.birthdate::date)) / 2 AS avg_age_days, + -- Angepasste Schwangerschaftswahrscheinlichkeit in Prozent + 100.0 / + (1 + + EXP( + 0.0647 * ( + ((NOW()::date - c1.birthdate::date) + + (NOW()::date - c2.birthdate::date)) / 2 + ) + - 0.0591 + ) + ) AS prob_pct + FROM falukant_data.relationship r + JOIN falukant_type.relationship r2 + ON r2.id = r.relationship_type_id + AND r2.tr = 'married' + JOIN falukant_data."character" c1 + ON c1.id = r.character1_id + JOIN falukant_data."character" c2 + ON c2.id = r.character2_id + LEFT JOIN falukant_data.falukant_user fu1 + ON fu1.id = c1.user_id + LEFT JOIN falukant_data.falukant_user fu2 + ON fu2.id = c2.user_id + WHERE random()*100 < ( + 100.0 / + (1 + + EXP( + 0.11166347 * ( + ((NOW()::date - c1.birthdate::date) + + (NOW()::date - c2.birthdate::date)) / 2 + ) + - 2.638267 + ) + ) + ); + )"; + + static constexpr char const* QUERY_INSERT_CHILD = R"( + INSERT INTO falukant_data."character" ( + user_id, + region_id, + first_name, + last_name, + birthdate, + gender, + title_of_nobility, + mood_id, + created_at, + updated_at + ) VALUES ( + NULL, + $1::int, -- region_id + /* zufälliger Vorname passend zum Gender */ + ( + SELECT id + FROM falukant_predefine.firstname + WHERE gender = $2 + ORDER BY RANDOM() + LIMIT 1 + ), + $3::int, -- last_name (Eltern-Nachname) + NOW(), + $2::varchar, -- gender + $4::int, -- title_of_nobility + /* zufällige Stimmung */ + ( + SELECT id + FROM falukant_type.mood + ORDER BY RANDOM() + LIMIT 1 + ), + NOW(), + NOW() + ) + RETURNING id AS child_cid + )"; + + static constexpr char const* QUERY_INSERT_CHILD_RELATION = R"( + -- QUERY_INSERT_CHILD_RELATION + INSERT INTO falukant_data.child_relation ( + father_character_id, + mother_character_id, + child_character_id, + name_set, + created_at, + updated_at + ) + VALUES ( + $1::int, -- father_cid + $2::int, -- mother_cid + $3::int, -- child_cid + false, + NOW(), NOW() + ) + RETURNING + father_character_id, + -- Vater-User + (SELECT user_id FROM falukant_data."character" WHERE id = father_character_id) AS father_user_id, + mother_character_id, + -- Mutter-User + (SELECT user_id FROM falukant_data."character" WHERE id = mother_character_id) AS mother_user_id, + child_character_id, + -- Kind-User + (SELECT user_id FROM falukant_data."character" WHERE id = child_character_id) AS child_user_id; + )"; + + static constexpr char const* QUERY_AUTOBATISM = R"( + update falukant_data.child_relation + set name_set = true + where id in ( + select cr.id + from falukant_data.child_relation cr + join falukant_data."character" c + on c.id = cr.child_character_id + where cr.name_set = false + and c.birthdate < current_date - interval '5 days' + ) + )"; + + static constexpr char const* QUERY_UPDATE_MOOD = R"( + UPDATE falukant_data."character" AS c + SET mood_id = falukant_data.get_random_mood_id() + WHERE c.health > 0; + )"; + + static constexpr char const* QUERY_GET_OPEN_CREDITS = R"( + select c.id credit_id, c.amount, c.remaining_amount, c.interest_rate, fu.id user_id, fu."money", c2.id character_id, dp.created_at debitor_prism_start, + dp.created_at::date < current_date prism_started_previously + from falukant_data.credit c + join falukant_data.falukant_user fu + on fu.id = c.id + join falukant_data."character" c2 + on c2.user_id = c.falukant_user_id + left join falukant_data.debtors_prism dp + on dp.character_id = c2.id + where c.remaining_amount > 0 + and c.updated_at::date < current_date + )"; + + static constexpr char const* QUERY_UPDATE_CREDIT = R"( + update falukant_data.credit c + set remaining_amount = $1 + where falukant_user_id = $2 + )"; + + static constexpr char const* QUERY_CLEANUP_CREDITS = R"( + delete from falukant_data.credit + where remaining_amount >= 0.01 + )"; + + static constexpr char const* QUERY_ADD_CHARACTER_TO_DEBTORS_PRISM = R"( + insert into falukant_data.debtors_prism (character_id) values ($1) + )"; + + static constexpr const char* QUERY_GET_HEIR = R"( + SELECT child_character_id + FROM falukant_data.child_relation + WHERE father_character_id = $1 + OR mother_character_id = $1 + ORDER BY (is_heir IS TRUE) DESC, + updated_at DESC + LIMIT 1 + )"; + + static constexpr const char* QUERY_RANDOM_HEIR = R"( + WITH chosen AS ( + SELECT + cr.id AS relation_id, + cr.child_character_id + FROM + falukant_data.child_relation AS cr + JOIN + falukant_data."character" AS ch + ON ch.id = cr.child_character_id + WHERE + (cr.father_character_id = $1 OR cr.mother_character_id = $1) + -- gleicher Wohnort wie der Verstorbene + AND ch.region_id = ( + SELECT region_id + FROM falukant_data."character" + WHERE id = $1 + ) + -- nicht älter als 10 Tage + AND ch.birthdate >= NOW() - INTERVAL '10 days' + -- Titel "noncivil" + AND ch.title_of_nobility = ( + SELECT id + FROM falukant_type.title + WHERE label_tr = 'noncivil' + ) + ORDER BY RANDOM() + LIMIT 1 + ) + UPDATE + falukant_data.child_relation AS cr2 + SET + is_heir = true, + updated_at = NOW() + FROM + chosen + WHERE + cr2.id = chosen.relation_id + RETURNING + chosen.child_character_id + )"; + + static constexpr const char* QUERY_SET_CHARACTER_USER = R"( + UPDATE falukant_data."character" + SET user_id = $1, + updated_at = NOW() + WHERE id = $2 + )"; + + static constexpr const char* QUERY_UPDATE_USER_MONEY = R"( + UPDATE falukant_data.falukant_user + SET money = $1, + updated_at = NOW() + WHERE user_id = $2 + )"; + + static constexpr const char* QUERY_GET_FALUKANT_USER_ID = R"( + SELECT user_id + FROM falukant_data."character" + WHERE id = $1 + LIMIT 1 + )"; + +// Sub‐Queries + static constexpr const char* QUERY_GET_CURRENT_MONEY = R"( + SELECT COALESCE(money,0) AS sum + FROM falukant_data.falukant_user + WHERE user_id = $1 + )"; + + static constexpr const char* QUERY_HOUSE_VALUE = R"( + SELECT COALESCE(SUM(h.cost),0) AS sum + FROM falukant_data.user_house AS uh + JOIN falukant_type.house AS h ON uh.house_type_id = h.id + WHERE uh.user_id = $1 + )"; + + static constexpr const char* QUERY_SETTLEMENT_VALUE = R"( + SELECT COALESCE(SUM(b.base_cost),0) AS sum + FROM falukant_data.branch AS br + JOIN falukant_type.branch AS b ON br.branch_type_id = b.id + WHERE br.falukant_user_id = $1 + )"; + + static constexpr const char* QUERY_INVENTORY_VALUE = R"( + SELECT COALESCE(SUM(i.quantity * p.sell_cost),0) AS sum + FROM falukant_data.inventory AS i + JOIN falukant_type.product AS p ON i.product_id = p.id + JOIN falukant_data.branch AS br ON i.stock_id = br.id + WHERE br.falukant_user_id = $1 + )"; + + static constexpr const char* QUERY_CREDIT_DEBT = R"( + SELECT COALESCE(SUM(remaining_amount),0) AS sum + FROM falukant_data.credit + WHERE falukant_user_id = $1 + )"; + + static constexpr const char* QUERY_COUNT_CHILDREN = R"( + SELECT COUNT(*) AS cnt + FROM falukant_data.child_relation + WHERE father_character_id = $1 + OR mother_character_id = $1 + )"; + +}; + +#endif // USERCHARACTERWORKER_H diff --git a/src/valuerecalculationworker.cpp b/src/valuerecalculationworker.cpp new file mode 100644 index 0000000..7dd86e1 --- /dev/null +++ b/src/valuerecalculationworker.cpp @@ -0,0 +1,168 @@ +#include "valuerecalculationworker.h" + +ValueRecalculationWorker::ValueRecalculationWorker(ConnectionPool &pool, MessageBroker &broker) + : Worker(pool, broker, "ValueRecalculationWorker"), + activities{ + {"productKnowledge", Activity(std::chrono::system_clock::from_time_t(0), + [this]() { calculateProductKnowledge(); }, + std::chrono::hours(0))}, // 00:00 Uhr + + {"regionalSellPrice", Activity(std::chrono::system_clock::from_time_t(0), + [this]() { calculateRegionalSellPrice(); }, + std::chrono::hours(12) + std::chrono::minutes(0))} // 12:00 Uhr + } +{ +} + +ValueRecalculationWorker::~ValueRecalculationWorker() { + +} + +void ValueRecalculationWorker::run() { + while (runningWorker) { + setCurrentStep("Check if activity has to run"); + auto now = std::chrono::system_clock::now(); + for (auto &[key, activity] : activities) { + if (shouldRunToday(activity)) { + activity.lastRun = now; + activity.callMethod(); + } + } + setCurrentStep("CalculateMarriages"); + calculateMarriages(); + calculateStudying(); + setCurrentStep("Sleep for 60 seconds"); + for (int i = 0; i < 60 && runningWorker; ++i) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + setCurrentStep("signalActivity()"); + signalActivity(); + } + setCurrentStep("Loop done"); + } +} + +bool ValueRecalculationWorker::shouldRunToday(const Activity& activity) { + auto now = std::chrono::system_clock::now(); + auto todayScheduledTime = getNextScheduledTime(activity.scheduledTime); + return now >= todayScheduledTime && activity.lastRun < todayScheduledTime; +} + +std::chrono::system_clock::time_point ValueRecalculationWorker::getNextScheduledTime(std::chrono::system_clock::duration scheduledDuration) { + auto now = std::chrono::system_clock::now(); + std::time_t now_c = std::chrono::system_clock::to_time_t(now); + std::tm now_tm = *std::localtime(&now_c); + now_tm.tm_hour = std::chrono::duration_cast(scheduledDuration).count(); + now_tm.tm_min = std::chrono::duration_cast(scheduledDuration).count() % 60; + now_tm.tm_sec = 0; + return std::chrono::system_clock::from_time_t(std::mktime(&now_tm)); +} + +void ValueRecalculationWorker::calculateProductKnowledge() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER", QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER); + db.execute("QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER"); + db.prepare("QUERY_GET_PRODUCERS_LAST_DAY", QUERY_GET_PRODUCERS_LAST_DAY); + const auto &usersToInform = db.execute("QUERY_GET_PRODUCERS_LAST_DAY"); + const nlohmann::json message = { + { "event", "price_update" } + }; + for (const auto &user: usersToInform) { + const auto userId = std::stoi(user.at("producer_id")); + sendMessageToFalukantUsers(userId, message); + } + db.prepare("QUERY_DELETE_OLD_PRODUCTIONS", QUERY_DELETE_OLD_PRODUCTIONS); + db.execute("QUERY_DELETE_OLD_PRODUCTIONS"); +} + +void ValueRecalculationWorker::calculateRegionalSellPrice() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_UPDATE_REGION_SELL_PRICE", QUERY_UPDATE_REGION_SELL_PRICE); + db.execute("QUERY_UPDATE_REGION_SELL_PRICE"); + db.prepare("QUERY_GET_SELL_REGIONS", QUERY_GET_SELL_REGIONS); + const auto ®ionsWithSells = db.execute("QUERY_GET_SELL_REGIONS"); + const nlohmann::json message = { + { "event", "price_update" } + }; + for (const auto ®ion: regionsWithSells) { + const auto regionId = std::stoi(region.at("region_id")); + sendMessageToRegionUsers(regionId, message); + } + db.prepare("QUERY_DELETE_REGION_SELL_PRICE", QUERY_DELETE_REGION_SELL_PRICE); + db.execute("QUERY_DELETE_REGION_SELL_PRICE"); +} + +void ValueRecalculationWorker::calculateMarriages() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_SET_MARRIAGES_BY_PARTY", QUERY_SET_MARRIAGES_BY_PARTY); + const auto &usersFromUpdatedRelationships = db.execute("QUERY_SET_MARRIAGES_BY_PARTY"); + const nlohmann::json message = { + { "event", "relationship_changed" } + }; + for (const auto &userFromUpdatedRelationships: usersFromUpdatedRelationships) { + if (userFromUpdatedRelationships.at("character1_user") != "") { + const auto user1Id = std::stoi(userFromUpdatedRelationships.at("character1_user")); + sendMessageToRegionUsers(user1Id, message); + } + if (userFromUpdatedRelationships.at("character2_user") != "") { + const auto user2Id = std::stoi(userFromUpdatedRelationships.at("character2_user")); + sendMessageToRegionUsers(user2Id, message); + } + } + +} + +void ValueRecalculationWorker::calculateStudying() { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_GET_STUDYINGS_TO_EXECUTE", QUERY_GET_STUDYINGS_TO_EXECUTE); + db.prepare("QUERY_SET_LEARNING_DONE", QUERY_SET_LEARNING_DONE); + const auto studies = db.execute("QUERY_GET_STUDYINGS_TO_EXECUTE"); + for (const auto &study: studies) { + if (study.at("tr") == "self") { + calculateStudyingSelf(study); + } else if (study.at("tr") == "children" || study.at("tr") == "director") { + caclulateStudyingForAssociatedCharacter(study); + } + db.execute("QUERY_SET_LEARNING_DONE", {study.at("id")}); + } +} + +void ValueRecalculationWorker::calculateStudyingSelf(Database::FieldMap entry) { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_GET_OWN_CHARACTER_ID", QUERY_GET_OWN_CHARACTER_ID); + const auto ownCharacterIdResult = db.execute("QUERY_GET_OWN_CHARACTER_ID", { entry.at("associated_falukant_user_id") }); + if (ownCharacterIdResult.size() > 0) { + auto characterId = std::stoi(ownCharacterIdResult.at(0).at("id")); + auto learnAll = entry.at("learn_all_products") == "t" || entry.at("product_id") == ""; + int productId = learnAll ? 0 : std::stoi(entry.at("product_id")); + calculateStudyingCharacter(characterId, learnAll, productId, std::stoi(entry.at("learning_recipient_id"))); + } +} + +void ValueRecalculationWorker::caclulateStudyingForAssociatedCharacter(Database::FieldMap entry) { + auto characterId = std::stoi(entry.at("associated_learning_character_id")); + auto learnAll = entry.at("learn_all_products") == "t" || entry.at("product_id") == ""; + int productId = learnAll ? 0 : std::stoi(entry.at("product_id")); + calculateStudyingCharacter(characterId, learnAll, productId, std::stoi(entry.at("learning_recipient_id"))); +} + +void ValueRecalculationWorker::calculateStudyingCharacter(int characterId, bool all, int productId, int falukantUserId) { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + if (all) { + db.prepare("QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE", QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE); + db.execute("QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE", { "1", std::to_string(characterId) }); + } else { + db.prepare("QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE", QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE); + db.execute("QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE", { "5", std::to_string(characterId), std::to_string(productId) }); + } + const nlohmann::json message = { + { "event", "knowledge_updated" } + }; + sendMessageToFalukantUsers(falukantUserId, message); +} + diff --git a/src/valuerecalculationworker.h b/src/valuerecalculationworker.h new file mode 100644 index 0000000..8bb8d2a --- /dev/null +++ b/src/valuerecalculationworker.h @@ -0,0 +1,175 @@ +#ifndef VALUERECALCULATIONWORKER_H +#define VALUERECALCULATIONWORKER_H + +#include "worker.h" +#include +#include +#include + +class ValueRecalculationWorker : public Worker { +public: + ValueRecalculationWorker(ConnectionPool &pool, MessageBroker &broker); + ~ValueRecalculationWorker() override; + +protected: + void run() override; + +private: + struct Activity { + std::chrono::system_clock::time_point lastRun; + std::function callMethod; + std::chrono::system_clock::duration scheduledTime; + + Activity(std::chrono::system_clock::time_point lr, std::function cm, std::chrono::system_clock::duration st) + : lastRun(lr), callMethod(std::move(cm)), scheduledTime(st) {} + }; + + std::unordered_map activities; + + void calculateProductKnowledge(); + void calculateRegionalSellPrice(); + void calculateMarriages(); + void calculateStudying(); + void calculateStudyingSelf(Database::FieldMap entry); + void caclulateStudyingForAssociatedCharacter(Database::FieldMap entry); + void calculateStudyingCharacter(int characterId, bool all, int productId, int falukantUserId); + + bool shouldRunToday(const Activity& activity); + std::chrono::system_clock::time_point getNextScheduledTime(std::chrono::system_clock::duration scheduledDuration); + + static constexpr const char *QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER = R"( + UPDATE falukant_data.knowledge k + SET knowledge = LEAST(100, k.knowledge + 1) + FROM falukant_data."character" c + JOIN falukant_log.production p + ON DATE(p.production_timestamp) = CURRENT_DATE - INTERVAL '1 day' + WHERE c.id = k.character_id + AND c.user_id = 18 + AND k.product_id = 10 + )"; + + static constexpr const char *QUERY_DELETE_OLD_PRODUCTIONS = R"( + delete from falukant_log.production flp + where date(flp.production_timestamp) < CURRENT_DATE + )"; + + static constexpr const char *QUERY_GET_PRODUCERS_LAST_DAY = R"( + select p."producer_id" + from falukant_log.production p + where date(p."production_timestamp") = CURRENT_DATE - interval '1 day' + group by producer_id + )"; + + static constexpr const char *QUERY_UPDATE_REGION_SELL_PRICE = R"( + UPDATE falukant_data.town_product_worth tpw + SET worth_percent = + GREATEST( + 0, + LEAST( + CASE + WHEN s.quantity > avg_sells THEN tpw.worth_percent - 1 + WHEN s.quantity < avg_sells THEN tpw.worth_percent + 1 + ELSE tpw.worth_percent + END, + 100 + ) + ) + FROM ( + SELECT region_id, product_id, quantity, + (SELECT AVG(quantity) + FROM falukant_log.sell avs + WHERE avs.product_id = s.product_id) AS avg_sells + FROM falukant_log.sell s + WHERE DATE(s.sell_timestamp) = CURRENT_DATE - INTERVAL '1 day' + ) s + WHERE tpw.region_id = s.region_id + AND tpw.product_id = s.product_id + )"; + + static constexpr const char *QUERY_DELETE_REGION_SELL_PRICE = R"( + delete from falukant_log.sell s + where date(s.sell_timestamp) < CURRENT_DATE + )"; + + static constexpr const char *QUERY_GET_SELL_REGIONS = R"( + select s."region_id" + from falukant_log.sell s + where date(s."sell_timestamp") = CURRENT_DATE - interval '1 day' + group by "region_id" + )"; + + static constexpr const char * QUERY_SET_MARRIAGES_BY_PARTY = R"( + WITH updated_relations AS ( + UPDATE falukant_data.relationship AS rel + SET relationship_type_id = ( + SELECT id + FROM falukant_type.relationship AS rt + WHERE rt.tr = 'married' + ) + WHERE rel.id IN ( + SELECT rel2.id + FROM falukant_data.party AS p + JOIN falukant_type.party AS pt + ON pt.id = p.party_type_id + AND pt.tr = 'wedding' + JOIN falukant_data.falukant_user AS fu + ON fu.id = p.falukant_user_id + JOIN falukant_data."character" AS c + ON c.user_id = fu.id + JOIN falukant_data.relationship AS rel2 + ON rel2.character1_id = c.id + OR rel2.character2_id = c.id + JOIN falukant_type.relationship AS rt2 + ON rt2.id = rel2.relationship_type_id + AND rt2.tr = 'engaged' + WHERE p.created_at <= NOW() - INTERVAL '1 day' + ) + RETURNING character1_id, character2_id + ) + SELECT + c1.user_id AS character1_user, + c2.user_id AS character2_user + FROM updated_relations AS ur + JOIN falukant_data."character" AS c1 + ON c1.id = ur.character1_id + JOIN falukant_data."character" AS c2 + ON c2.id = ur.character2_id; + )"; + + static constexpr const char * QUERY_GET_STUDYINGS_TO_EXECUTE = R"( + select l.id, l.associated_falukant_user_id, l.associated_learning_character_id, l.learn_all_products, l.learning_recipient_id, l.product_id, + lr.tr + from falukant_data.learning l + join falukant_type.learn_recipient lr + on lr.id = l.learning_recipient_id + where l.learning_is_executed = false + and l.created_at + interval '1 day' < now(); + )"; + + static constexpr const char * QUERY_GET_OWN_CHARACTER_ID = R"( + select id + from falukant_data."character" c + where c.user_id = $1 + )"; + + static constexpr const char *QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE = R"( + update falukant_data.knowledge k + set knowledge = LEAST(100, k.knowledge + $1) + where k.character_id = $2 + and k.product_id = $3 + )"; + + static constexpr const char *QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE = R"( + update falukant_data.knowledge k + set knowledge = LEAST(100, k.knowledge + $1) + where k.character_id = $2 + )"; + + static constexpr const char *QUERY_SET_LEARNING_DONE = R"( + update falukant_data.learning + set learning_is_executed = true + where id = $1 + )"; +}; + +#endif // VALUERECALCULATIONWORKER_H diff --git a/src/websocket_server.cpp b/src/websocket_server.cpp index c99be9a..cdfef85 100644 --- a/src/websocket_server.cpp +++ b/src/websocket_server.cpp @@ -1,149 +1,154 @@ #include "websocket_server.h" #include "connection_guard.h" +#include "worker.h" #include -#include -#include #include -#include -#include +#include using json = nlohmann::json; +// Protocols array definition +struct lws_protocols WebSocketServer::protocols[] = { + { + "yourpart-protocol", + WebSocketServer::wsCallback, + sizeof(WebSocketUserData), + 4096 + }, + { nullptr, nullptr, 0, 0 } +}; + WebSocketServer::WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker) : port(port), pool(pool), broker(broker) {} +WebSocketServer::~WebSocketServer() { + stop(); +} + void WebSocketServer::run() { running = true; - broker.subscribe([this](const std::string &message) { - std::lock_guard lock(queueMutex); - messageQueue.push(message); + broker.subscribe([this](const std::string &msg) { + { + std::lock_guard lock(queueMutex); + messageQueue.push(msg); + } queueCV.notify_one(); }); - serverThread = std::thread([this]() { startServer(); }); - messageProcessingThread = std::thread([this]() { processMessageQueue(); }); - pingThread = std::thread([this]() { pingClients(); }); + serverThread = std::thread([this](){ startServer(); }); + messageThread = std::thread([this](){ processMessageQueue(); }); + pingThread = std::thread([this](){ pingClients(); }); } void WebSocketServer::stop() { running = false; + if (context) lws_cancel_service(context); if (serverThread.joinable()) serverThread.join(); - if (messageProcessingThread.joinable()) messageProcessingThread.join(); + if (messageThread.joinable()) messageThread.join(); if (pingThread.joinable()) pingThread.join(); + if (context) { + lws_context_destroy(context); + context = nullptr; + } } void WebSocketServer::startServer() { - uWS::App() - .ws("/*", { - .open = [this](uWS::WebSocket *ws) { - ws->getUserData()->pongReceived = true; - }, - .message = [this](uWS::WebSocket *ws, std::string_view message, uWS::OpCode opCode) { - handleWebSocketMessage(ws, message, opCode); - }, - .close = [this](uWS::WebSocket *ws, int /*code*/, std::string_view /*message*/) { - handleWebSocketClose(ws); - } - }) - .listen(port, [this](auto *token) { - if (token) { - std::cout << "WebSocket-Server läuft auf Port " << port << std::endl; - } else { - std::cerr << "WebSocket-Server konnte nicht gestartet werden!\n"; - running = false; - } - }) - .run(); -} - -void WebSocketServer::pingClients() { + struct lws_context_creation_info info; + memset(&info, 0, sizeof(info)); + info.port = port; + info.protocols = protocols; + context = lws_create_context(&info); + if (!context) { + throw std::runtime_error("Failed to create LWS context"); + } while (running) { - std::this_thread::sleep_for(std::chrono::seconds(30)); - - std::unique_lock lock(connectionsMutex); - for (auto &[userId, ws] : connections) { - if (!ws->getUserData()->pongReceived) { - ws->close(); - } else { - ws->getUserData()->pongReceived = false; - ws->send("ping", uWS::OpCode::TEXT); - } - } + lws_service(context, 50); } } void WebSocketServer::processMessageQueue() { while (running) { - std::unique_lock lock(queueMutex); - queueCV.wait(lock, [this]() { return !messageQueue.empty() || !running; }); + std::unique_lock lock(queueMutex); + queueCV.wait(lock, [this](){ return !messageQueue.empty() || !running; }); while (!messageQueue.empty()) { - std::string message = std::move(messageQueue.front()); + std::string msg = std::move(messageQueue.front()); messageQueue.pop(); lock.unlock(); - handleBrokerMessage(message); + handleBrokerMessage(msg); lock.lock(); } } } +void WebSocketServer::pingClients() { + while (running) { + std::this_thread::sleep_for(std::chrono::seconds(30)); + lws_callback_on_writable_all_protocol(context, &protocols[0]); + } +} + +int WebSocketServer::wsCallback(struct lws *wsi, + enum lws_callback_reasons reason, + void *user, void *in, size_t len) { + auto *ud = reinterpret_cast(user); + switch (reason) { + case LWS_CALLBACK_ESTABLISHED: + ud->pongReceived = true; + break; + case LWS_CALLBACK_RECEIVE: { + std::string msg(reinterpret_cast(in), len); + // Here you would dispatch the received message to handleBrokerMessage or handleWebSocketMessage + break; + } + case LWS_CALLBACK_SERVER_WRITEABLE: { + unsigned char buf[LWS_PRE + 4]; + memcpy(buf + LWS_PRE, "ping", 4); + lws_write(wsi, buf + LWS_PRE, 4, LWS_WRITE_TEXT); + break; + } + case LWS_CALLBACK_CLOSED: + // Remove closed connection if stored + break; + default: + break; + } + return 0; +} + void WebSocketServer::handleBrokerMessage(const std::string &message) { try { - json parsedMessage = json::parse(message); - if (parsedMessage.contains("user_id")) { - int falukantUserId = parsedMessage["user_id"]; - std::shared_lock lock(connectionsMutex); - auto userId = getUserIdFromFalukantUserId(falukantUserId); + json parsed = json::parse(message); + if (parsed.contains("user_id")) { + int fid = parsed["user_id"].get(); + auto userId = getUserIdFromFalukantUserId(fid); + std::shared_lock lock(connectionsMutex); auto it = connections.find(userId); if (it != connections.end()) { - it->second->send(message, uWS::OpCode::TEXT); - std::cout << "[WebSocketServer] Nachricht an User-ID: " << userId << " gesendet.\n"; - } else { - std::cerr << "[WebSocketServer] Keine Verbindung für User-ID: " << userId << "\n"; + lws_callback_on_writable(it->second); } - } else { - std::cerr << "[WebSocketServer] Ungültige Nachricht: " << message << "\n"; } } catch (const std::exception &e) { - std::cerr << "[WebSocketServer] Fehler beim Verarbeiten der Nachricht: " << e.what() << "\n"; + std::cerr << "Error processing broker message: " << e.what() << std::endl; } } -void WebSocketServer::handleWebSocketMessage(uWS::WebSocket *ws, std::string_view message, uWS::OpCode opCode) { - if (message == "pong") { - ws->getUserData()->pongReceived = true; - return; - } - - json parsedMessage = json::parse(message); - if (parsedMessage.contains("event") && parsedMessage["event"] == "setUserId") { - std::string userId = parsedMessage["data"]["userId"]; - std::unique_lock lock(connectionsMutex); - connections[userId] = ws; - ws->getUserData()->userId = userId; - } -} - -void WebSocketServer::handleWebSocketClose(uWS::WebSocket *ws) { - std::unique_lock lock(connectionsMutex); - auto userId = ws->getUserData()->userId; - if (!userId.empty()) { - connections.erase(userId); - } -} - -std::string WebSocketServer::getUserIdFromFalukantUserId(int & userId) { +std::string WebSocketServer::getUserIdFromFalukantUserId(int userId) { ConnectionGuard guard(pool); auto &db = guard.get(); - std::string query = R"( + std::string sql = R"( SELECT u.hashed_id FROM community.user u JOIN falukant_data.falukant_user fu ON u.id = fu.user_id WHERE fu.id = $1 )"; - db.prepare("get_user_id", query); - auto users = db.execute("get_user_id", {std::to_string(userId)}); - if (!users.empty()) { - return users[0]["hashed_id"]; - } else { - return ""; + db.prepare("get_user_id", sql); + auto res = db.execute("get_user_id", {std::to_string(userId)}); + return (!res.empty()) ? res[0]["hashed_id"] : std::string(); +} + +void WebSocketServer::setWorkers(const std::vector> &workerList) { + workers.clear(); + workers.reserve(workerList.size()); + for (const auto &wptr : workerList) { + workers.push_back(wptr.get()); } } diff --git a/src/websocket_server.h b/src/websocket_server.h index 10a11b9..079770e 100644 --- a/src/websocket_server.h +++ b/src/websocket_server.h @@ -1,50 +1,66 @@ #pragma once -#include "message_broker.h" +#include +#include "connection_guard.h" #include "connection_pool.h" -#include -#include +#include "message_broker.h" +#include #include #include #include +#include #include -#include #include #include +#include +#include +#include struct WebSocketUserData { std::string userId; bool pongReceived = true; }; +class Worker; // forward + class WebSocketServer { public: WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker); + ~WebSocketServer(); + void run(); void stop(); + void setWorkers(const std::vector> &workerList); private: void startServer(); void processMessageQueue(); void pingClients(); void handleBrokerMessage(const std::string &message); - void handleWebSocketMessage(uWS::WebSocket *ws, std::string_view message, uWS::OpCode opCode); - void handleWebSocketClose(uWS::WebSocket *ws); - std::string getUserIdFromFalukantUserId(int &userId); + std::string getUserIdFromFalukantUserId(int falukantUserId); + + static int wsCallback(struct lws *wsi, + enum lws_callback_reasons reason, + void *user, void *in, size_t len); int port; ConnectionPool &pool; MessageBroker &broker; std::atomic running{false}; + struct lws_context *context = nullptr; std::thread serverThread; - std::thread messageProcessingThread; + std::thread messageThread; std::thread pingThread; - std::unordered_map *> connections; - std::shared_mutex connectionsMutex; - - std::queue messageQueue; std::mutex queueMutex; std::condition_variable queueCV; + std::queue messageQueue; + + std::shared_mutex connectionsMutex; + std::unordered_map connections; + + std::vector workers; + + static struct lws_protocols protocols[]; }; diff --git a/src/worker.h b/src/worker.h index 4f2a5c5..f09a880 100644 --- a/src/worker.h +++ b/src/worker.h @@ -3,14 +3,14 @@ #include #include #include -#include #include #include -#include +#include #include "connection_pool.h" #include "message_broker.h" #include "database.h" +#include "connection_guard.h" class Worker { public: @@ -64,6 +64,11 @@ public: return currentStep; } + std::string getStatus() { + std::lock_guard lock(stepMutex); + return "{\"worker\":\"" + workerName + "\", \"currentStep\":\"" + currentStep + "\"}"; + } + protected: virtual void run() = 0; @@ -101,6 +106,42 @@ protected: currentStep = step; } + void sendMessageToRegionUsers(const int ®ionId, nlohmann::json message) { + ConnectionGuard guard(pool); + auto &db = guard.get(); + db.prepare("QUERY_GET_REGION_USERS", QUERY_GET_REGION_USERS); + auto users = db.execute("QUERY_GET_REGION_USERS", {std::to_string(regionId)}); + for (const auto &user: users) { + message["user_id"] = user.at("user_id"); + broker.publish(message.dump()); + } + } + + void sendMessageToFalukantUsers(const int &falukantUserId, nlohmann::json message) { + message["user_id"] = falukantUserId; + broker.publish(message.dump()); + } + + void changeFalukantUserMoney(int falukantUserId, double moneyChange, std::string action, nlohmann::json message) { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + db.prepare("QUERY_UPDATE_MONEY", QUERY_UPDATE_MONEY); + db.execute("QUERY_UPDATE_MONEY", { + std::to_string(falukantUserId), + std::to_string(moneyChange), + action + }); + sendMessageToFalukantUsers(falukantUserId, message); + } catch (const std::exception &e) { + std::cerr << "[" << workerName << "] Fehler in changeFalukantUserMoney: " << e.what() << "\n"; + } + } + + time_t getLastActivity() { + return lastActivity; + } + protected: ConnectionPool &pool; MessageBroker &broker; @@ -114,4 +155,21 @@ protected: std::chrono::seconds watchdogInterval{10}; std::mutex stepMutex; std::string currentStep; + time_t lastActivity; + +private: + static constexpr const char *QUERY_GET_REGION_USERS = R"( + select c.user_id + from falukant_data."character" c + where c.region_id = $1 + and c.user_id is not null; + )"; + + static constexpr const char *QUERY_UPDATE_MONEY = R"( + SELECT falukant_data.update_money( + $1, + $2, + $3 + ); + )"; };