diff --git a/.gitignore b/.gitignore index 0fa1bee..7e0c390 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,8 @@ frontend/dist frontend/dist/* frontedtree.txt backend/dist/ +build +build/* +.vscode +.vscode/* +.clang-format diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..eab6b24 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,49 @@ +cmake_minimum_required(VERSION 3.20) +project(YourPartDaemon VERSION 1.0 LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_C_COMPILER "gcc-13") +set(CMAKE_CPP_COMPILER "g++-13") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -flto=auto") +set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -flto") + +find_package(PostgreSQL REQUIRED) +find_package(Threads REQUIRED) +find_package(PkgConfig REQUIRED) +pkg_check_modules(LIBPQXX REQUIRED libpqxx) + +include_directories(${LIBPQXX_INCLUDE_DIRS}) +link_directories(${LIBPQXX_LIBRARY_DIRS}) +add_definitions(${LIBPQXX_CFLAGS_OTHER}) + +set(SOURCES + src/main.cpp + src/config.cpp + src/connection_pool.cpp + src/database.cpp + src/character_creation_worker.cpp + src/produce_worker.cpp + src/message_broker.cpp + src/websocket_server.cpp +) + +set(HEADERS + src/config.h + src/database.h + src/connection_pool.h + src/worker.h + src/character_creation_worker.h + src/produce_worker.h + src/message_broker.h + src/websocket_server.h +) + +add_executable(yourpart-daemon ${SOURCES} ${HEADERS}) +find_package(nlohmann_json CONFIG REQUIRED) + +target_include_directories(yourpart-daemon PRIVATE ${PostgreSQL_INCLUDE_DIRS}) +target_link_libraries(yourpart-daemon PRIVATE ${PostgreSQL_LIBRARIES} Threads::Threads z ssl crypto ${CMAKE_SOURCE_DIR}/lib/uSockets.a ${LIBPQXX_LIBRARIES}) + +install(TARGETS yourpart-daemon DESTINATION /usr/local/bin) +install(FILES daemon.conf DESTINATION /etc/yourpart/) diff --git a/daemon.conf b/daemon.conf new file mode 100644 index 0000000..9cf670f --- /dev/null +++ b/daemon.conf @@ -0,0 +1,7 @@ +DB_HOST=localhost +DB_PORT=5432 +DB_NAME=yp3 +DB_USER=yourpart +DB_PASSWORD=hitomisan +THREAD_COUNT=4 +WEBSOCKET_PORT=4551 \ No newline at end of file diff --git a/lib/uSockets.a b/lib/uSockets.a new file mode 100644 index 0000000..34ea565 Binary files /dev/null and b/lib/uSockets.a differ diff --git a/src/character_creation_worker.cpp b/src/character_creation_worker.cpp new file mode 100644 index 0000000..7f381c8 --- /dev/null +++ b/src/character_creation_worker.cpp @@ -0,0 +1,157 @@ +#include "character_creation_worker.h" +#include "connection_guard.h" +#include +#include +#include + +CharacterCreationWorker::CharacterCreationWorker(ConnectionPool &pool, MessageBroker &broker) + : Worker(pool, broker, "CharacterCreationWorker") + , gen(std::random_device{}()) + , dist(2, 3) +{ +} + +CharacterCreationWorker::~CharacterCreationWorker() { +} + +void CharacterCreationWorker::run() { + while (runningWorker) { + setCurrentStep("Check if previous day character was created"); + if (!isPreviousDayCharacterCreated()) { + setCurrentStep("Create characters for today"); + createCharactersForToday(); + } + setCurrentStep("Sleep for 60 seconds"); + for (int i = 0; i < 60 && runningWorker; ++i) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + setCurrentStep("signalActivity()"); + signalActivity(); + } + setCurrentStep("Loop done"); + } +} + +bool CharacterCreationWorker::isPreviousDayCharacterCreated() { + try { + setCurrentStep("Get Database Connection"); + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + setCurrentStep("Execute Query"); + auto results = db.query(QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED); + + if (!results.empty()) { + std::string created_at_str = results[0].at("created_at"); + return true; + } + } catch (const std::exception &e) { + std::cerr << "[CharacterCreationWorker] Fehler in isPreviousDayCharacterCreated: " + << e.what() << std::endl; + } + setCurrentStep("No previous day character found"); + return false; +} + +void CharacterCreationWorker::createCharactersForToday() { + loadNames(); + if (first_name_cache.empty() || last_name_cache.empty()) { + std::cerr << "Fehler: Namen konnten nicht geladen werden." << std::endl; + return; + } + + auto town_ids = getTownRegionIds(); + for (auto region_id : town_ids) { + createCharactersForRegion(region_id); + } +} + +void CharacterCreationWorker::createCharactersForRegion(int region_id) { + std::vector nobility_stands = {1, 2, 3}; + std::vector genders = {"male", "female"}; + for (auto nobility : nobility_stands) { + for (auto &gender : genders) { + int num_chars = dist(gen); + for (int i = 0; i < num_chars; ++i) { + createCharacter(region_id, gender, nobility); + } + } + } +} + +void CharacterCreationWorker::createCharacter(int region_id, + const std::string &gender, + int title_of_nobility) { + int first_name_id = getRandomFromSet(first_name_cache[gender]); + if (first_name_id == -1) { + std::cerr << "Fehler: Kein passender Vorname gefunden." << std::endl; + return; + } + int last_name_id = getRandomFromSet(last_name_cache); + if (last_name_id == -1) { + std::cerr << "Fehler: Kein passender Nachname gefunden." << std::endl; + return; + } + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + db.prepare("insert_character", QUERY_INSERT_CHARACTER); + db.execute("insert_character", {std::to_string(region_id), + std::to_string(first_name_id), + std::to_string(last_name_id), + gender, + std::to_string(title_of_nobility)}); + } catch (const std::exception &e) { + std::cerr << "[CharacterCreationWorker] Fehler in createCharacter: " + << e.what() << std::endl; + } +} + +std::vector CharacterCreationWorker::getTownRegionIds() { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + auto rows = db.query(QUERY_GET_TOWN_REGION_IDS); + + std::vector ids; + ids.reserve(rows.size()); + for (const auto &row : rows) { + ids.push_back(std::stoi(row.at("id"))); + } + return ids; + } catch (const std::exception &e) { + std::cerr << "[CharacterCreationWorker] Fehler in getTownRegionIds: " + << e.what() << std::endl; + } + return {}; +} + +void CharacterCreationWorker::loadNames() { + try { + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + auto firstNameRows = db.query(QUERY_LOAD_FIRST_NAMES); + for (const auto &row : firstNameRows) { + first_name_cache[row.at("gender")].insert(std::stoi(row.at("id"))); + } + + auto lastNameRows = db.query(QUERY_LOAD_LAST_NAMES); + for (const auto &row : lastNameRows) { + last_name_cache.insert(std::stoi(row.at("id"))); + } + } catch (const std::exception &e) { + std::cerr << "[CharacterCreationWorker] Fehler in loadNames: " + << e.what() << std::endl; + } +} + +int CharacterCreationWorker::getRandomFromSet(const std::unordered_set &name_set) { + if (name_set.empty()) { + return -1; + } + auto it = name_set.begin(); + std::advance(it, std::uniform_int_distribution(0, name_set.size() - 1)(gen)); + return *it; +} diff --git a/src/character_creation_worker.h b/src/character_creation_worker.h new file mode 100644 index 0000000..0b7367e --- /dev/null +++ b/src/character_creation_worker.h @@ -0,0 +1,64 @@ +#pragma once + +#include "worker.h" +#include +#include +#include +#include +#include + +class CharacterCreationWorker : public Worker { +public: + CharacterCreationWorker(ConnectionPool &pool, MessageBroker &broker); + ~CharacterCreationWorker() override; + +protected: + void run() override; + +private: + std::mt19937 gen; + std::uniform_int_distribution dist; + std::unordered_map> first_name_cache; + std::unordered_set last_name_cache; + + bool isPreviousDayCharacterCreated(); + void createCharactersForToday(); + void createCharactersForRegion(int region_id); + void createCharacter(int region_id, const std::string &gender, int title_of_nobility); + std::vector getTownRegionIds(); + void loadNames(); + int getRandomFromSet(const std::unordered_set &name_set); + + static constexpr const char *QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED = R"( + SELECT created_at + FROM falukant_data."character" + WHERE user_id IS NULL + ORDER BY created_at DESC + LIMIT 1; + )"; + + static constexpr const char *QUERY_GET_TOWN_REGION_IDS = R"( + SELECT fdr.id + FROM falukant_data.region fdr + JOIN falukant_type.region ftr ON fdr.region_type_id = ftr.id + WHERE ftr.label_tr = 'city'; + )"; + + static constexpr const char *QUERY_LOAD_FIRST_NAMES = R"( + SELECT id, gender + FROM falukant_predefine.firstname; + )"; + + static constexpr const char *QUERY_LOAD_LAST_NAMES = R"( + SELECT id + FROM falukant_predefine.lastname; + )"; + + static constexpr const char *QUERY_INSERT_CHARACTER = R"( + INSERT INTO falukant_data."character"( + user_id, region_id, first_name, last_name, + birthdate, gender, created_at, updated_at, title_of_nobility + ) + VALUES ($1, $2, $3, $4, NOW(), $5, NOW(), NOW(), $6); + )"; +}; diff --git a/src/config.cpp b/src/config.cpp new file mode 100644 index 0000000..bfaf6c1 --- /dev/null +++ b/src/config.cpp @@ -0,0 +1,39 @@ +#include "config.h" +#include +#include +#include + +Config::Config(const std::string &filepath) +{ + load(filepath); +} + +void Config::load(const std::string &filepath) +{ + std::ifstream file(filepath); + if (!file) + { + throw std::runtime_error("Konfigurationsdatei konnte nicht geöffnet werden."); + } + + std::string line; + while (std::getline(file, line)) + { + std::istringstream iss(line); + std::string key, value; + if (std::getline(iss, key, '=') && std::getline(iss, value)) + { + config_map[key] = value; + } + } +} + +std::string Config::get(const std::string &key) const +{ + auto it = config_map.find(key); + if (it != config_map.end()) + { + return it->second; + } + throw std::runtime_error("Konfigurationsschlüssel nicht gefunden: " + key); +} diff --git a/src/config.h b/src/config.h new file mode 100644 index 0000000..14d240d --- /dev/null +++ b/src/config.h @@ -0,0 +1,14 @@ +#pragma once +#include +#include + +class Config +{ +public: + Config(const std::string &filepath); + std::string get(const std::string &key) const; + +private: + std::map config_map; + void load(const std::string &filepath); +}; diff --git a/src/connection_guard.h b/src/connection_guard.h new file mode 100644 index 0000000..dc14c8f --- /dev/null +++ b/src/connection_guard.h @@ -0,0 +1,24 @@ +#pragma once + +#include "connection_pool.h" +#include + +class ConnectionGuard { +public: + ConnectionGuard(ConnectionPool &pool) + : pool(pool), connection(pool.getConnection()) {} + + ~ConnectionGuard() { + if (connection) { + pool.releaseConnection(connection); + } + } + + Database &get() { + return *connection; + } + +private: + ConnectionPool &pool; + std::shared_ptr connection; +}; diff --git a/src/connection_pool.cpp b/src/connection_pool.cpp new file mode 100644 index 0000000..b815eae --- /dev/null +++ b/src/connection_pool.cpp @@ -0,0 +1,49 @@ +#include "connection_pool.h" +#include +#include "connection_guard.h" + +ConnectionPool::ConnectionPool(const std::string &host, const std::string &port, + const std::string &name, const std::string &user, + const std::string &password, int pool_size) + : host(host), port(port), name(name), user(user), password(password) { + createPool(pool_size); +} + +void ConnectionPool::createPool(int pool_size) { + std::string conninfo = "host=" + host + " port=" + port + " dbname=" + name + + " user=" + user + " password=" + password; + + for (int i = 0; i < pool_size; ++i) { + auto conn = std::make_shared(conninfo); + pool.push(conn); + } +} + +std::shared_ptr ConnectionPool::getConnection() { + std::unique_lock lock(pool_mutex); + pool_cv.wait(lock, [this]() { return !pool.empty(); }); + auto conn = pool.front(); + pool.pop(); + if (!conn->isValid()) { + std::cerr << "[ConnectionPool] Ungültige Verbindung. Erstelle neu.\n"; + std::string conninfo = "host=" + host + + " port=" + port + + " dbname=" + name + + " user=" + user + + " password=" + password; + conn = std::make_shared(conninfo); + if (!conn->isValid()) { + std::cerr << "[ConnectionPool] Erneut fehlgeschlagen.\n"; + return nullptr; + } + } + return conn; +} + +void ConnectionPool::releaseConnection(std::shared_ptr conn) { + { + std::lock_guard lock(pool_mutex); + pool.push(conn); + } + pool_cv.notify_one(); +} diff --git a/src/connection_pool.h b/src/connection_pool.h new file mode 100644 index 0000000..590c8c9 --- /dev/null +++ b/src/connection_pool.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include +#include "database.h" + +class ConnectionPool { +public: + ConnectionPool(const std::string &host, const std::string &port, + const std::string &name, const std::string &user, + const std::string &password, int pool_size); + + std::shared_ptr getConnection(); + void releaseConnection(std::shared_ptr conn); + +private: + std::queue> pool; + std::mutex pool_mutex; + std::condition_variable pool_cv; + + std::string host; + std::string port; + std::string name; + std::string user; + std::string password; + + void createPool(int pool_size); + void refreshConnection(std::shared_ptr &conn); +}; diff --git a/src/database.cpp b/src/database.cpp new file mode 100644 index 0000000..c3dd39b --- /dev/null +++ b/src/database.cpp @@ -0,0 +1,107 @@ +#include "database.h" + +Database::Database(const std::string &conninfo) +{ + try { + connection_ = std::make_unique(conninfo); + if (!connection_->is_open()) { + throw std::runtime_error("Konnte DB-Verbindung nicht öffnen!"); + } + } catch (const std::exception &e) { + std::cerr << "[Database] Fehler beim Verbinden: " << e.what() << std::endl; + throw; + } +} + +std::vector> +Database::query(const std::string &sql) +{ + std::vector> rows; + try { + pqxx::work txn(*connection_); + pqxx::result r = txn.exec(sql); + txn.commit(); + for (auto row : r) { + std::map oneRow; + for (auto f = 0u; f < row.size(); f++) { + std::string colName = r.column_name(f); + std::string value = row[f].c_str() ? row[f].c_str() : ""; + oneRow[colName] = value; + } + rows.push_back(std::move(oneRow)); + } + } catch (const std::exception &ex) { + std::cerr << "[Database] query-Fehler: " << ex.what() << "\nSQL: " << sql << std::endl; + } + return rows; +} + +void Database::prepare(const std::string &stmtName, const std::string &sql) +{ + try { + pqxx::work txn(*connection_); + txn.conn().prepare(stmtName, sql); + txn.commit(); + } catch (const std::exception &ex) { + std::cerr << "[Database] prepare-Fehler: " << ex.what() + << "\nSQL: " << sql << std::endl; + } +} + +#include +#include +#include +#include +#include +#include + +std::vector> +Database::execute(const std::string &stmtName, const std::vector ¶ms) +{ + std::vector> rows; + + try { + pqxx::work txn(*connection_); + pqxx::prepare::invocation inv = txn.prepared(stmtName); + + // Parameter einzeln hinzufügen + for (auto &p : params) { + inv(p); + } + + // Ausführung + pqxx::result r = inv.exec(); + txn.commit(); + + // Ergebnis verarbeiten + for (const auto &row : r) { + std::unordered_map mapRow; + for (pqxx::row::size_type i = 0; i < row.size(); ++i) { + std::string colName = r.column_name(i); + mapRow[colName] = row[i].c_str() ? row[i].c_str() : ""; + } + rows.push_back(std::move(mapRow)); + } + } + catch (const std::exception &ex) { + std::cerr << "[Database] execute-Fehler: " << ex.what() + << "\nStatement: " << stmtName << std::endl; + } + + return rows; +} + +bool Database::isValid() const { + try { + if (!connection_ || !connection_->is_open()) { + return false; + } + pqxx::work txn(*connection_); + txn.exec("SELECT 1"); // Einfacher Ping + txn.commit(); + return true; + } catch (const std::exception &ex) { + std::cerr << "[Database] Verbindung ungültig: " << ex.what() << "\n"; + return false; + } +} diff --git a/src/database.h b/src/database.h new file mode 100644 index 0000000..30fd07c --- /dev/null +++ b/src/database.h @@ -0,0 +1,28 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class Database +{ +public: + Database(const std::string &conninfo); + + std::vector> query(const std::string &sql); + void prepare(const std::string &stmtName, const std::string &sql); + std::vector> execute( + const std::string &stmtName, + const std::vector ¶ms = {} + ); + bool isOpen() const { return connection_ && connection_->is_open(); } + bool isValid() const; + +private: + std::unique_ptr connection_; +}; diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..dfa9c39 --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,63 @@ +#include "character_creation_worker.h" +#include "produce_worker.h" +#include "connection_pool.h" +#include "websocket_server.h" +#include "message_broker.h" +#include "config.h" +#include +#include +#include +#include + +std::atomic keepRunning(true); + +void handleSignal(int signal) { + if (signal == SIGINT || signal == SIGTERM) { + std::cerr << "Signal erhalten: " << signal << ". Beende Anwendung..." << std::endl; + keepRunning.store(false, std::memory_order_relaxed); + } +} + +int main() { + std::signal(SIGINT, handleSignal); + std::signal(SIGTERM, handleSignal); + + try { + Config config("/etc/yourpart/daemon.conf"); + ConnectionPool pool( + config.get("DB_HOST"), + config.get("DB_PORT"), + config.get("DB_NAME"), + config.get("DB_USER"), + config.get("DB_PASSWORD"), + 10 + ); + + int websocketPort = std::stoi(config.get("WEBSOCKET_PORT")); + MessageBroker broker; + WebSocketServer websocketServer(websocketPort, pool, broker); + CharacterCreationWorker creationWorker(pool, broker); + ProduceWorker produceWorker(pool, broker); + + broker.start(); + websocketServer.run(); + creationWorker.startWorkerThread(); + produceWorker.startWorkerThread(); + creationWorker.enableWatchdog(); + produceWorker.enableWatchdog(); + + while (keepRunning) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + + creationWorker.stopWorkerThread(); + produceWorker.stopWorkerThread(); + websocketServer.stop(); + broker.stop(); + } catch (const std::exception &e) { + std::cerr << "Fehler: " << e.what() << std::endl; + return 1; + } + + return 0; +} diff --git a/src/message_broker.cpp b/src/message_broker.cpp new file mode 100644 index 0000000..7809aa4 --- /dev/null +++ b/src/message_broker.cpp @@ -0,0 +1,43 @@ +#include "message_broker.h" +#include + +void MessageBroker::publish(const std::string &message) { + std::lock_guard lock(mutex); + messageQueue.push(message); + cv.notify_all(); +} + +void MessageBroker::subscribe(const MessageCallback &callback) { + std::lock_guard lock(mutex); + subscribers.push_back(callback); +} + +void MessageBroker::start() { + running = true; + brokerThread = std::thread([this]() { processMessages(); }); +} + +void MessageBroker::stop() { + running = false; + cv.notify_all(); + if (brokerThread.joinable()) { + brokerThread.join(); + } +} + +void MessageBroker::processMessages() { + while (running) { + std::unique_lock lock(mutex); + cv.wait(lock, [this]() { return !messageQueue.empty() || !running; }); + if (!running) break; + while (!messageQueue.empty()) { + std::string message = std::move(messageQueue.front()); + messageQueue.pop(); + lock.unlock(); + for (const auto &callback : subscribers) { + callback(message); + } + lock.lock(); + } + } +} diff --git a/src/message_broker.h b/src/message_broker.h new file mode 100644 index 0000000..04e32bf --- /dev/null +++ b/src/message_broker.h @@ -0,0 +1,29 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include + +class MessageBroker { +public: + using MessageCallback = std::function; + + void publish(const std::string &message); + void subscribe(const MessageCallback &callback); + void start(); + void stop(); + +private: + std::queue messageQueue; + std::vector subscribers; + std::mutex mutex; + std::condition_variable cv; + std::atomic running{false}; + std::thread brokerThread; + + void processMessages(); +}; diff --git a/src/produce_worker.cpp b/src/produce_worker.cpp new file mode 100644 index 0000000..df749ea --- /dev/null +++ b/src/produce_worker.cpp @@ -0,0 +1,172 @@ +#include "produce_worker.h" +#include "connection_guard.h" // Include for ConnectionGuard +#include +#include +#include +#include + +ProduceWorker::ProduceWorker(ConnectionPool &pool, MessageBroker &broker) + : Worker(pool, broker, "ProduceWorker") +{ +} + +ProduceWorker::~ProduceWorker() { +} + +void ProduceWorker::run() { + auto lastIterationTime = std::chrono::steady_clock::now(); + while (runningWorker) { + setCurrentStep("Check runningWorker Variable"); + { + std::lock_guard lock(activityMutex); + if (!runningWorker) { + break; + } + } + setCurrentStep("Calculate elapsed time"); + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - lastIterationTime); + if (elapsed < std::chrono::milliseconds(200)) { + std::this_thread::sleep_for(std::chrono::milliseconds(200) - elapsed); + } + lastIterationTime = std::chrono::steady_clock::now(); + setCurrentStep("Process Productions"); + processProductions(); + setCurrentStep("Signal Activity"); + signalActivity(); + setCurrentStep("Loop Done"); + } +} + +void ProduceWorker::processProductions() { + try { + setCurrentStep("Get Database Connection"); + ConnectionGuard connGuard(pool); + auto &db = connGuard.get(); + + setCurrentStep("Fetch Finished Productions"); + auto finishedProductions = getFinishedProductions(db); + + setCurrentStep("Process Finished Productions"); + for (const auto &production : finishedProductions) { + if (production.find("branch_id") == production.end() || + production.find("product_id") == production.end() || + production.find("quantity") == production.end() || + production.find("quality") == production.end() || + production.find("user_id") == production.end()) { + continue; + } + + int branchId = std::stoi(production.at("branch_id")); + int productId = std::stoi(production.at("product_id")); + int quantity = std::stoi(production.at("quantity")); + int quality = std::stoi(production.at("quality")); + int userId = std::stoi(production.at("user_id")); + + if (addToInventory(db, branchId, productId, quantity, quality, userId)) { + } + deleteProduction(db, production.at("production_id")); + } + } catch (const std::exception &e) { + std::cerr << "[ProduceWorker] Fehler in processProductions: " << e.what() << std::endl; + } +} + +std::vector> ProduceWorker::getFinishedProductions(Database &db) { + try { + db.prepare("get_finished_productions", QUERY_GET_FINISHED_PRODUCTIONS); + return db.execute("get_finished_productions"); + } catch (const std::exception &e) { + std::cerr << "[ProduceWorker] Fehler beim Abrufen abgeschlossener Produktionen: " + << e.what() << std::endl; + } + return {}; +} + +bool ProduceWorker::addToInventory(Database &db, + int branchId, + int productId, + int quantity, + int quality, + int userId) +{ + try { + db.prepare("get_stocks", QUERY_GET_AVAILABLE_STOCKS); + auto stocks = db.execute("get_stocks", {std::to_string(branchId)}); + int remainingQuantity = quantity; + for (const auto &stock : stocks) { + int stockId = std::stoi(stock.at("id")); + int totalCapacity = std::stoi(stock.at("total_capacity")); + int filledCapacity = std::stoi(stock.at("filled")); + int freeCapacity = totalCapacity - filledCapacity; + + if (freeCapacity <= 0) { + continue; + } + + int toStore = std::min(remainingQuantity, freeCapacity); + if (!storeInStock(db, stockId, productId, toStore, quality)) { + return false; + } + remainingQuantity -= toStore; + sendProductionReadyEvent(userId, productId, quantity, quality, branchId); + if (remainingQuantity <= 0) { + break; + } + } + return (remainingQuantity == 0); + } catch (const std::exception &e) { + std::cerr << "[ProduceWorker] Fehler in addToInventory: " << e.what() << std::endl; + } + return false; +} + +bool ProduceWorker::storeInStock(Database &db, + int stockId, + int productId, + int quantity, + int quality) +{ + try { + db.prepare("insert_inventory", QUERY_INSERT_INVENTORY); + db.execute("insert_inventory", {std::to_string(stockId), + std::to_string(productId), + std::to_string(quantity), + std::to_string(quality)}); + return true; + } catch (const std::exception &e) { + std::cerr << "[ProduceWorker] Fehler in storeInStock: " << e.what() << std::endl; + } + return false; +} + +void ProduceWorker::deleteProduction(Database &db, const std::string &productionId) { + try { + db.prepare("delete_production", QUERY_DELETE_PRODUCTION); + db.execute("delete_production", {productionId}); + } catch (const std::exception &e) { + std::cerr << "[ProduceWorker] Fehler beim Löschen der Produktion: " << e.what() << std::endl; + } +} + +void ProduceWorker::sendProductionReadyEvent(int userId, + int productId, + int quantity, + int quality, + int branchId) +{ + try { + nlohmann::json message = { + {"event", "production_ready"}, + {"user_id", userId}, + {"product_id", productId}, + {"quantity", quantity}, + {"quality", quality}, + {"branch_id", branchId} + }; + broker.publish(message.dump()); + } catch (const std::exception &e) { + std::cerr << "[ProduceWorker] Fehler beim Senden des Production Ready Events: " + << e.what() << std::endl; + } +} diff --git a/src/produce_worker.h b/src/produce_worker.h new file mode 100644 index 0000000..d3737bc --- /dev/null +++ b/src/produce_worker.h @@ -0,0 +1,70 @@ +#pragma once + +#include "worker.h" +#include +#include +#include + + +class ProduceWorker : public Worker +{ +public: + explicit ProduceWorker(ConnectionPool &pool, MessageBroker &broker); + ~ProduceWorker() override; + +protected: + void run() override; // überschreibt Worker::run() + +private: + // Fachlogik + void processProductions(); + std::vector> getFinishedProductions(Database &db); + bool addToInventory(Database &db, int branchId, int productId, int quantity, int quality, int userId); + bool storeInStock(Database &db, int stockId, int productId, int quantity, int quality); + void deleteProduction(Database &db, const std::string &productionId); + void sendProductionReadyEvent(int userId, int productId, int quantity, int quality, int branchId); + + static constexpr const char *QUERY_GET_FINISHED_PRODUCTIONS = R"( + SELECT + p.id AS production_id, + p.branch_id, + p.product_id, + p.quantity, + p.start_timestamp, + pr.production_time, + k.character_id, + k.knowledge AS quality, + br.region_id, + br.falukant_user_id user_id + FROM falukant_data.production p + JOIN falukant_type.product pr ON p.product_id = pr.id + JOIN falukant_data.branch br ON p.branch_id = br.id + JOIN falukant_data.character c ON c.user_id = br.falukant_user_id + JOIN falukant_data.knowledge k ON p.product_id = k.product_id AND k.character_id = c.id + JOIN falukant_data.stock s ON s.branch_id = br.id + WHERE p.start_timestamp + interval '1 minute' * pr.production_time <= NOW() + ORDER BY p.start_timestamp; + )"; + + static constexpr const char *QUERY_GET_AVAILABLE_STOCKS = R"( + SELECT stock.id, stock.quantity AS total_capacity, ( + SELECT COALESCE(SUM(inventory.quantity), 0) + FROM falukant_data.inventory + WHERE inventory.stock_id = stock.id + ) AS filled, stock.branch_id + FROM falukant_data.stock stock + JOIN falukant_data.branch branch + ON stock.branch_id = branch.id + WHERE branch.id = $1 + ORDER BY total_capacity DESC; + )"; + + static constexpr const char *QUERY_DELETE_PRODUCTION = R"( + DELETE FROM falukant_data.production WHERE id = $1; + )"; + + static constexpr const char *QUERY_INSERT_INVENTORY = R"( + INSERT INTO falukant_data.inventory (stock_id, product_id, quantity, quality, produced_at) + VALUES ($1, $2, $3, $4, NOW()); + )"; +}; diff --git a/src/websocket_server.cpp b/src/websocket_server.cpp new file mode 100644 index 0000000..c99be9a --- /dev/null +++ b/src/websocket_server.cpp @@ -0,0 +1,149 @@ +#include "websocket_server.h" +#include "connection_guard.h" +#include +#include +#include +#include +#include +#include + +using json = nlohmann::json; + +WebSocketServer::WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker) + : port(port), pool(pool), broker(broker) {} + +void WebSocketServer::run() { + running = true; + broker.subscribe([this](const std::string &message) { + std::lock_guard lock(queueMutex); + messageQueue.push(message); + queueCV.notify_one(); + }); + serverThread = std::thread([this]() { startServer(); }); + messageProcessingThread = std::thread([this]() { processMessageQueue(); }); + pingThread = std::thread([this]() { pingClients(); }); +} + +void WebSocketServer::stop() { + running = false; + if (serverThread.joinable()) serverThread.join(); + if (messageProcessingThread.joinable()) messageProcessingThread.join(); + if (pingThread.joinable()) pingThread.join(); +} + +void WebSocketServer::startServer() { + uWS::App() + .ws("/*", { + .open = [this](uWS::WebSocket *ws) { + ws->getUserData()->pongReceived = true; + }, + .message = [this](uWS::WebSocket *ws, std::string_view message, uWS::OpCode opCode) { + handleWebSocketMessage(ws, message, opCode); + }, + .close = [this](uWS::WebSocket *ws, int /*code*/, std::string_view /*message*/) { + handleWebSocketClose(ws); + } + }) + .listen(port, [this](auto *token) { + if (token) { + std::cout << "WebSocket-Server läuft auf Port " << port << std::endl; + } else { + std::cerr << "WebSocket-Server konnte nicht gestartet werden!\n"; + running = false; + } + }) + .run(); +} + +void WebSocketServer::pingClients() { + while (running) { + std::this_thread::sleep_for(std::chrono::seconds(30)); + + std::unique_lock lock(connectionsMutex); + for (auto &[userId, ws] : connections) { + if (!ws->getUserData()->pongReceived) { + ws->close(); + } else { + ws->getUserData()->pongReceived = false; + ws->send("ping", uWS::OpCode::TEXT); + } + } + } +} + +void WebSocketServer::processMessageQueue() { + while (running) { + std::unique_lock lock(queueMutex); + queueCV.wait(lock, [this]() { return !messageQueue.empty() || !running; }); + while (!messageQueue.empty()) { + std::string message = std::move(messageQueue.front()); + messageQueue.pop(); + lock.unlock(); + handleBrokerMessage(message); + lock.lock(); + } + } +} + +void WebSocketServer::handleBrokerMessage(const std::string &message) { + try { + json parsedMessage = json::parse(message); + if (parsedMessage.contains("user_id")) { + int falukantUserId = parsedMessage["user_id"]; + std::shared_lock lock(connectionsMutex); + auto userId = getUserIdFromFalukantUserId(falukantUserId); + auto it = connections.find(userId); + if (it != connections.end()) { + it->second->send(message, uWS::OpCode::TEXT); + std::cout << "[WebSocketServer] Nachricht an User-ID: " << userId << " gesendet.\n"; + } else { + std::cerr << "[WebSocketServer] Keine Verbindung für User-ID: " << userId << "\n"; + } + } else { + std::cerr << "[WebSocketServer] Ungültige Nachricht: " << message << "\n"; + } + } catch (const std::exception &e) { + std::cerr << "[WebSocketServer] Fehler beim Verarbeiten der Nachricht: " << e.what() << "\n"; + } +} + +void WebSocketServer::handleWebSocketMessage(uWS::WebSocket *ws, std::string_view message, uWS::OpCode opCode) { + if (message == "pong") { + ws->getUserData()->pongReceived = true; + return; + } + + json parsedMessage = json::parse(message); + if (parsedMessage.contains("event") && parsedMessage["event"] == "setUserId") { + std::string userId = parsedMessage["data"]["userId"]; + std::unique_lock lock(connectionsMutex); + connections[userId] = ws; + ws->getUserData()->userId = userId; + } +} + +void WebSocketServer::handleWebSocketClose(uWS::WebSocket *ws) { + std::unique_lock lock(connectionsMutex); + auto userId = ws->getUserData()->userId; + if (!userId.empty()) { + connections.erase(userId); + } +} + +std::string WebSocketServer::getUserIdFromFalukantUserId(int & userId) { + ConnectionGuard guard(pool); + auto &db = guard.get(); + std::string query = R"( + SELECT u.hashed_id + FROM community.user u + JOIN falukant_data.falukant_user fu ON u.id = fu.user_id + WHERE fu.id = $1 + )"; + db.prepare("get_user_id", query); + auto users = db.execute("get_user_id", {std::to_string(userId)}); + if (!users.empty()) { + return users[0]["hashed_id"]; + } else { + return ""; + } +} diff --git a/src/websocket_server.h b/src/websocket_server.h new file mode 100644 index 0000000..10a11b9 --- /dev/null +++ b/src/websocket_server.h @@ -0,0 +1,50 @@ +#pragma once + +#include "message_broker.h" +#include "connection_pool.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct WebSocketUserData { + std::string userId; + bool pongReceived = true; +}; + +class WebSocketServer { +public: + WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker); + void run(); + void stop(); + +private: + void startServer(); + void processMessageQueue(); + void pingClients(); + void handleBrokerMessage(const std::string &message); + void handleWebSocketMessage(uWS::WebSocket *ws, std::string_view message, uWS::OpCode opCode); + void handleWebSocketClose(uWS::WebSocket *ws); + std::string getUserIdFromFalukantUserId(int &userId); + + int port; + ConnectionPool &pool; + MessageBroker &broker; + + std::atomic running{false}; + std::thread serverThread; + std::thread messageProcessingThread; + std::thread pingThread; + + std::unordered_map *> connections; + std::shared_mutex connectionsMutex; + + std::queue messageQueue; + std::mutex queueMutex; + std::condition_variable queueCV; +}; diff --git a/src/worker.h b/src/worker.h new file mode 100644 index 0000000..4f2a5c5 --- /dev/null +++ b/src/worker.h @@ -0,0 +1,117 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "connection_pool.h" +#include "message_broker.h" +#include "database.h" + +class Worker { +public: + Worker(ConnectionPool &pool, MessageBroker &broker, std::string name) + : pool(pool), + broker(broker), + workerName(std::move(name)), + runningWorker(false), + runningWatchdog(false) + {} + + virtual ~Worker() { + stopWorkerThread(); + stopWatchdogThread(); + } + + void startWorkerThread() { + if (runningWorker.load()) { + std::cerr << "[" << workerName << "] Worker thread already running, skipping start.\n"; + return; + } + runningWorker.store(true); + workerThread = std::thread([this]() { run(); }); + } + + void stopWorkerThread() { + runningWorker.store(false); + if (workerThread.joinable()) { + workerThread.join(); + } + } + + void enableWatchdog() { + if (runningWatchdog.load()) { + std::cerr << "[" << workerName << "] Watchdog already enabled, skipping.\n"; + return; + } + runningWatchdog.store(true); + watchdogThread = std::thread([this]() { watchdog(); }); + } + + void stopWatchdogThread() { + runningWatchdog.store(false); + if (watchdogThread.joinable()) { + watchdogThread.join(); + } + } + + std::string getCurrentStep() { + std::lock_guard lock(stepMutex); + return currentStep; + } + +protected: + virtual void run() = 0; + + void watchdog() { + try { + while (runningWatchdog.load()) { + std::this_thread::sleep_for(watchdogInterval); + bool isActive = false; + { + std::lock_guard lock(activityMutex); + isActive = active; + active = false; + } + if (!isActive) { + std::cerr << "[" << workerName << "] Watchdog: Keine Aktivität! Starte Worker neu...\n"; + std::cerr << "[" << workerName << "] Letzte Aktivität: " << getCurrentStep() << "\n"; + stopWorkerThread(); + startWorkerThread(); + } + } + } catch (const std::exception &e) { + std::cerr << "[" << workerName << "] Watchdog: Ausnahme gefangen: " << e.what() << "\n"; + } catch (...) { + std::cerr << "[" << workerName << "] Watchdog: Unbekannte Ausnahme gefangen.\n"; + } + } + + void signalActivity() { + std::lock_guard lock(activityMutex); + active = true; + } + + void setCurrentStep(const std::string &step) { + std::lock_guard lock(stepMutex); + currentStep = step; + } + +protected: + ConnectionPool &pool; + MessageBroker &broker; + std::string workerName; + std::atomic runningWorker; + std::atomic runningWatchdog; + std::atomic active{false}; + std::thread workerThread; + std::thread watchdogThread; + std::mutex activityMutex; + std::chrono::seconds watchdogInterval{10}; + std::mutex stepMutex; + std::string currentStep; +};