This commit is contained in:
Torsten (PC)
2026-01-14 14:36:57 +01:00
parent cd739fb52e
commit 1fe77c0905
21 changed files with 1267 additions and 0 deletions

5
.gitignore vendored
View File

@@ -17,3 +17,8 @@ frontend/dist
frontend/dist/*
frontedtree.txt
backend/dist/
build
build/*
.vscode
.vscode/*
.clang-format

49
CMakeLists.txt Normal file
View File

@@ -0,0 +1,49 @@
cmake_minimum_required(VERSION 3.20)
project(YourPartDaemon VERSION 1.0 LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_C_COMPILER "gcc-13")
set(CMAKE_CPP_COMPILER "g++-13")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -flto=auto")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -flto")
find_package(PostgreSQL REQUIRED)
find_package(Threads REQUIRED)
find_package(PkgConfig REQUIRED)
pkg_check_modules(LIBPQXX REQUIRED libpqxx)
include_directories(${LIBPQXX_INCLUDE_DIRS})
link_directories(${LIBPQXX_LIBRARY_DIRS})
add_definitions(${LIBPQXX_CFLAGS_OTHER})
set(SOURCES
src/main.cpp
src/config.cpp
src/connection_pool.cpp
src/database.cpp
src/character_creation_worker.cpp
src/produce_worker.cpp
src/message_broker.cpp
src/websocket_server.cpp
)
set(HEADERS
src/config.h
src/database.h
src/connection_pool.h
src/worker.h
src/character_creation_worker.h
src/produce_worker.h
src/message_broker.h
src/websocket_server.h
)
add_executable(yourpart-daemon ${SOURCES} ${HEADERS})
find_package(nlohmann_json CONFIG REQUIRED)
target_include_directories(yourpart-daemon PRIVATE ${PostgreSQL_INCLUDE_DIRS})
target_link_libraries(yourpart-daemon PRIVATE ${PostgreSQL_LIBRARIES} Threads::Threads z ssl crypto ${CMAKE_SOURCE_DIR}/lib/uSockets.a ${LIBPQXX_LIBRARIES})
install(TARGETS yourpart-daemon DESTINATION /usr/local/bin)
install(FILES daemon.conf DESTINATION /etc/yourpart/)

7
daemon.conf Normal file
View File

@@ -0,0 +1,7 @@
DB_HOST=localhost
DB_PORT=5432
DB_NAME=yp3
DB_USER=yourpart
DB_PASSWORD=hitomisan
THREAD_COUNT=4
WEBSOCKET_PORT=4551

BIN
lib/uSockets.a Normal file

Binary file not shown.

View File

@@ -0,0 +1,157 @@
#include "character_creation_worker.h"
#include "connection_guard.h"
#include <iostream>
#include <chrono>
#include <thread>
CharacterCreationWorker::CharacterCreationWorker(ConnectionPool &pool, MessageBroker &broker)
: Worker(pool, broker, "CharacterCreationWorker")
, gen(std::random_device{}())
, dist(2, 3)
{
}
CharacterCreationWorker::~CharacterCreationWorker() {
}
void CharacterCreationWorker::run() {
while (runningWorker) {
setCurrentStep("Check if previous day character was created");
if (!isPreviousDayCharacterCreated()) {
setCurrentStep("Create characters for today");
createCharactersForToday();
}
setCurrentStep("Sleep for 60 seconds");
for (int i = 0; i < 60 && runningWorker; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
setCurrentStep("signalActivity()");
signalActivity();
}
setCurrentStep("Loop done");
}
}
bool CharacterCreationWorker::isPreviousDayCharacterCreated() {
try {
setCurrentStep("Get Database Connection");
ConnectionGuard connGuard(pool);
auto &db = connGuard.get();
setCurrentStep("Execute Query");
auto results = db.query(QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED);
if (!results.empty()) {
std::string created_at_str = results[0].at("created_at");
return true;
}
} catch (const std::exception &e) {
std::cerr << "[CharacterCreationWorker] Fehler in isPreviousDayCharacterCreated: "
<< e.what() << std::endl;
}
setCurrentStep("No previous day character found");
return false;
}
void CharacterCreationWorker::createCharactersForToday() {
loadNames();
if (first_name_cache.empty() || last_name_cache.empty()) {
std::cerr << "Fehler: Namen konnten nicht geladen werden." << std::endl;
return;
}
auto town_ids = getTownRegionIds();
for (auto region_id : town_ids) {
createCharactersForRegion(region_id);
}
}
void CharacterCreationWorker::createCharactersForRegion(int region_id) {
std::vector<int> nobility_stands = {1, 2, 3};
std::vector<std::string> genders = {"male", "female"};
for (auto nobility : nobility_stands) {
for (auto &gender : genders) {
int num_chars = dist(gen);
for (int i = 0; i < num_chars; ++i) {
createCharacter(region_id, gender, nobility);
}
}
}
}
void CharacterCreationWorker::createCharacter(int region_id,
const std::string &gender,
int title_of_nobility) {
int first_name_id = getRandomFromSet(first_name_cache[gender]);
if (first_name_id == -1) {
std::cerr << "Fehler: Kein passender Vorname gefunden." << std::endl;
return;
}
int last_name_id = getRandomFromSet(last_name_cache);
if (last_name_id == -1) {
std::cerr << "Fehler: Kein passender Nachname gefunden." << std::endl;
return;
}
try {
ConnectionGuard connGuard(pool);
auto &db = connGuard.get();
db.prepare("insert_character", QUERY_INSERT_CHARACTER);
db.execute("insert_character", {std::to_string(region_id),
std::to_string(first_name_id),
std::to_string(last_name_id),
gender,
std::to_string(title_of_nobility)});
} catch (const std::exception &e) {
std::cerr << "[CharacterCreationWorker] Fehler in createCharacter: "
<< e.what() << std::endl;
}
}
std::vector<int> CharacterCreationWorker::getTownRegionIds() {
try {
ConnectionGuard connGuard(pool);
auto &db = connGuard.get();
auto rows = db.query(QUERY_GET_TOWN_REGION_IDS);
std::vector<int> ids;
ids.reserve(rows.size());
for (const auto &row : rows) {
ids.push_back(std::stoi(row.at("id")));
}
return ids;
} catch (const std::exception &e) {
std::cerr << "[CharacterCreationWorker] Fehler in getTownRegionIds: "
<< e.what() << std::endl;
}
return {};
}
void CharacterCreationWorker::loadNames() {
try {
ConnectionGuard connGuard(pool);
auto &db = connGuard.get();
auto firstNameRows = db.query(QUERY_LOAD_FIRST_NAMES);
for (const auto &row : firstNameRows) {
first_name_cache[row.at("gender")].insert(std::stoi(row.at("id")));
}
auto lastNameRows = db.query(QUERY_LOAD_LAST_NAMES);
for (const auto &row : lastNameRows) {
last_name_cache.insert(std::stoi(row.at("id")));
}
} catch (const std::exception &e) {
std::cerr << "[CharacterCreationWorker] Fehler in loadNames: "
<< e.what() << std::endl;
}
}
int CharacterCreationWorker::getRandomFromSet(const std::unordered_set<int> &name_set) {
if (name_set.empty()) {
return -1;
}
auto it = name_set.begin();
std::advance(it, std::uniform_int_distribution<int>(0, name_set.size() - 1)(gen));
return *it;
}

View File

@@ -0,0 +1,64 @@
#pragma once
#include "worker.h"
#include <random>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <string>
class CharacterCreationWorker : public Worker {
public:
CharacterCreationWorker(ConnectionPool &pool, MessageBroker &broker);
~CharacterCreationWorker() override;
protected:
void run() override;
private:
std::mt19937 gen;
std::uniform_int_distribution<int> dist;
std::unordered_map<std::string, std::unordered_set<int>> first_name_cache;
std::unordered_set<int> last_name_cache;
bool isPreviousDayCharacterCreated();
void createCharactersForToday();
void createCharactersForRegion(int region_id);
void createCharacter(int region_id, const std::string &gender, int title_of_nobility);
std::vector<int> getTownRegionIds();
void loadNames();
int getRandomFromSet(const std::unordered_set<int> &name_set);
static constexpr const char *QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED = R"(
SELECT created_at
FROM falukant_data."character"
WHERE user_id IS NULL
ORDER BY created_at DESC
LIMIT 1;
)";
static constexpr const char *QUERY_GET_TOWN_REGION_IDS = R"(
SELECT fdr.id
FROM falukant_data.region fdr
JOIN falukant_type.region ftr ON fdr.region_type_id = ftr.id
WHERE ftr.label_tr = 'city';
)";
static constexpr const char *QUERY_LOAD_FIRST_NAMES = R"(
SELECT id, gender
FROM falukant_predefine.firstname;
)";
static constexpr const char *QUERY_LOAD_LAST_NAMES = R"(
SELECT id
FROM falukant_predefine.lastname;
)";
static constexpr const char *QUERY_INSERT_CHARACTER = R"(
INSERT INTO falukant_data."character"(
user_id, region_id, first_name, last_name,
birthdate, gender, created_at, updated_at, title_of_nobility
)
VALUES ($1, $2, $3, $4, NOW(), $5, NOW(), NOW(), $6);
)";
};

39
src/config.cpp Normal file
View File

@@ -0,0 +1,39 @@
#include "config.h"
#include <fstream>
#include <sstream>
#include <stdexcept>
Config::Config(const std::string &filepath)
{
load(filepath);
}
void Config::load(const std::string &filepath)
{
std::ifstream file(filepath);
if (!file)
{
throw std::runtime_error("Konfigurationsdatei konnte nicht geöffnet werden.");
}
std::string line;
while (std::getline(file, line))
{
std::istringstream iss(line);
std::string key, value;
if (std::getline(iss, key, '=') && std::getline(iss, value))
{
config_map[key] = value;
}
}
}
std::string Config::get(const std::string &key) const
{
auto it = config_map.find(key);
if (it != config_map.end())
{
return it->second;
}
throw std::runtime_error("Konfigurationsschlüssel nicht gefunden: " + key);
}

14
src/config.h Normal file
View File

@@ -0,0 +1,14 @@
#pragma once
#include <string>
#include <map>
class Config
{
public:
Config(const std::string &filepath);
std::string get(const std::string &key) const;
private:
std::map<std::string, std::string> config_map;
void load(const std::string &filepath);
};

24
src/connection_guard.h Normal file
View File

@@ -0,0 +1,24 @@
#pragma once
#include "connection_pool.h"
#include <memory>
class ConnectionGuard {
public:
ConnectionGuard(ConnectionPool &pool)
: pool(pool), connection(pool.getConnection()) {}
~ConnectionGuard() {
if (connection) {
pool.releaseConnection(connection);
}
}
Database &get() {
return *connection;
}
private:
ConnectionPool &pool;
std::shared_ptr<Database> connection;
};

49
src/connection_pool.cpp Normal file
View File

@@ -0,0 +1,49 @@
#include "connection_pool.h"
#include <iostream>
#include "connection_guard.h"
ConnectionPool::ConnectionPool(const std::string &host, const std::string &port,
const std::string &name, const std::string &user,
const std::string &password, int pool_size)
: host(host), port(port), name(name), user(user), password(password) {
createPool(pool_size);
}
void ConnectionPool::createPool(int pool_size) {
std::string conninfo = "host=" + host + " port=" + port + " dbname=" + name +
" user=" + user + " password=" + password;
for (int i = 0; i < pool_size; ++i) {
auto conn = std::make_shared<Database>(conninfo);
pool.push(conn);
}
}
std::shared_ptr<Database> ConnectionPool::getConnection() {
std::unique_lock<std::mutex> lock(pool_mutex);
pool_cv.wait(lock, [this]() { return !pool.empty(); });
auto conn = pool.front();
pool.pop();
if (!conn->isValid()) {
std::cerr << "[ConnectionPool] Ungültige Verbindung. Erstelle neu.\n";
std::string conninfo = "host=" + host +
" port=" + port +
" dbname=" + name +
" user=" + user +
" password=" + password;
conn = std::make_shared<Database>(conninfo);
if (!conn->isValid()) {
std::cerr << "[ConnectionPool] Erneut fehlgeschlagen.\n";
return nullptr;
}
}
return conn;
}
void ConnectionPool::releaseConnection(std::shared_ptr<Database> conn) {
{
std::lock_guard<std::mutex> lock(pool_mutex);
pool.push(conn);
}
pool_cv.notify_one();
}

31
src/connection_pool.h Normal file
View File

@@ -0,0 +1,31 @@
#pragma once
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
#include "database.h"
class ConnectionPool {
public:
ConnectionPool(const std::string &host, const std::string &port,
const std::string &name, const std::string &user,
const std::string &password, int pool_size);
std::shared_ptr<Database> getConnection();
void releaseConnection(std::shared_ptr<Database> conn);
private:
std::queue<std::shared_ptr<Database>> pool;
std::mutex pool_mutex;
std::condition_variable pool_cv;
std::string host;
std::string port;
std::string name;
std::string user;
std::string password;
void createPool(int pool_size);
void refreshConnection(std::shared_ptr<Database> &conn);
};

107
src/database.cpp Normal file
View File

@@ -0,0 +1,107 @@
#include "database.h"
Database::Database(const std::string &conninfo)
{
try {
connection_ = std::make_unique<pqxx::connection>(conninfo);
if (!connection_->is_open()) {
throw std::runtime_error("Konnte DB-Verbindung nicht öffnen!");
}
} catch (const std::exception &e) {
std::cerr << "[Database] Fehler beim Verbinden: " << e.what() << std::endl;
throw;
}
}
std::vector<std::map<std::string, std::string>>
Database::query(const std::string &sql)
{
std::vector<std::map<std::string, std::string>> rows;
try {
pqxx::work txn(*connection_);
pqxx::result r = txn.exec(sql);
txn.commit();
for (auto row : r) {
std::map<std::string, std::string> oneRow;
for (auto f = 0u; f < row.size(); f++) {
std::string colName = r.column_name(f);
std::string value = row[f].c_str() ? row[f].c_str() : "";
oneRow[colName] = value;
}
rows.push_back(std::move(oneRow));
}
} catch (const std::exception &ex) {
std::cerr << "[Database] query-Fehler: " << ex.what() << "\nSQL: " << sql << std::endl;
}
return rows;
}
void Database::prepare(const std::string &stmtName, const std::string &sql)
{
try {
pqxx::work txn(*connection_);
txn.conn().prepare(stmtName, sql);
txn.commit();
} catch (const std::exception &ex) {
std::cerr << "[Database] prepare-Fehler: " << ex.what()
<< "\nSQL: " << sql << std::endl;
}
}
#include <pqxx/pqxx>
#include <vector>
#include <unordered_map>
#include <string>
#include <iostream>
#include <sstream>
std::vector<std::unordered_map<std::string, std::string>>
Database::execute(const std::string &stmtName, const std::vector<std::string> &params)
{
std::vector<std::unordered_map<std::string, std::string>> rows;
try {
pqxx::work txn(*connection_);
pqxx::prepare::invocation inv = txn.prepared(stmtName);
// Parameter einzeln hinzufügen
for (auto &p : params) {
inv(p);
}
// Ausführung
pqxx::result r = inv.exec();
txn.commit();
// Ergebnis verarbeiten
for (const auto &row : r) {
std::unordered_map<std::string, std::string> mapRow;
for (pqxx::row::size_type i = 0; i < row.size(); ++i) {
std::string colName = r.column_name(i);
mapRow[colName] = row[i].c_str() ? row[i].c_str() : "";
}
rows.push_back(std::move(mapRow));
}
}
catch (const std::exception &ex) {
std::cerr << "[Database] execute-Fehler: " << ex.what()
<< "\nStatement: " << stmtName << std::endl;
}
return rows;
}
bool Database::isValid() const {
try {
if (!connection_ || !connection_->is_open()) {
return false;
}
pqxx::work txn(*connection_);
txn.exec("SELECT 1"); // Einfacher Ping
txn.commit();
return true;
} catch (const std::exception &ex) {
std::cerr << "[Database] Verbindung ungültig: " << ex.what() << "\n";
return false;
}
}

28
src/database.h Normal file
View File

@@ -0,0 +1,28 @@
#pragma once
#include <string>
#include <vector>
#include <unordered_map>
#include <sstream>
#include <pgsql/libpq-fe.h>
#include <iostream>
#include <memory>
#include <unordered_map>
#include <pqxx/pqxx>
class Database
{
public:
Database(const std::string &conninfo);
std::vector<std::map<std::string, std::string>> query(const std::string &sql);
void prepare(const std::string &stmtName, const std::string &sql);
std::vector<std::unordered_map<std::string,std::string>> execute(
const std::string &stmtName,
const std::vector<std::string> &params = {}
);
bool isOpen() const { return connection_ && connection_->is_open(); }
bool isValid() const;
private:
std::unique_ptr<pqxx::connection> connection_;
};

63
src/main.cpp Normal file
View File

@@ -0,0 +1,63 @@
#include "character_creation_worker.h"
#include "produce_worker.h"
#include "connection_pool.h"
#include "websocket_server.h"
#include "message_broker.h"
#include "config.h"
#include <csignal>
#include <atomic>
#include <iostream>
#include <thread>
std::atomic<bool> keepRunning(true);
void handleSignal(int signal) {
if (signal == SIGINT || signal == SIGTERM) {
std::cerr << "Signal erhalten: " << signal << ". Beende Anwendung..." << std::endl;
keepRunning.store(false, std::memory_order_relaxed);
}
}
int main() {
std::signal(SIGINT, handleSignal);
std::signal(SIGTERM, handleSignal);
try {
Config config("/etc/yourpart/daemon.conf");
ConnectionPool pool(
config.get("DB_HOST"),
config.get("DB_PORT"),
config.get("DB_NAME"),
config.get("DB_USER"),
config.get("DB_PASSWORD"),
10
);
int websocketPort = std::stoi(config.get("WEBSOCKET_PORT"));
MessageBroker broker;
WebSocketServer websocketServer(websocketPort, pool, broker);
CharacterCreationWorker creationWorker(pool, broker);
ProduceWorker produceWorker(pool, broker);
broker.start();
websocketServer.run();
creationWorker.startWorkerThread();
produceWorker.startWorkerThread();
creationWorker.enableWatchdog();
produceWorker.enableWatchdog();
while (keepRunning) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
creationWorker.stopWorkerThread();
produceWorker.stopWorkerThread();
websocketServer.stop();
broker.stop();
} catch (const std::exception &e) {
std::cerr << "Fehler: " << e.what() << std::endl;
return 1;
}
return 0;
}

43
src/message_broker.cpp Normal file
View File

@@ -0,0 +1,43 @@
#include "message_broker.h"
#include <iostream>
void MessageBroker::publish(const std::string &message) {
std::lock_guard<std::mutex> lock(mutex);
messageQueue.push(message);
cv.notify_all();
}
void MessageBroker::subscribe(const MessageCallback &callback) {
std::lock_guard<std::mutex> lock(mutex);
subscribers.push_back(callback);
}
void MessageBroker::start() {
running = true;
brokerThread = std::thread([this]() { processMessages(); });
}
void MessageBroker::stop() {
running = false;
cv.notify_all();
if (brokerThread.joinable()) {
brokerThread.join();
}
}
void MessageBroker::processMessages() {
while (running) {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this]() { return !messageQueue.empty() || !running; });
if (!running) break;
while (!messageQueue.empty()) {
std::string message = std::move(messageQueue.front());
messageQueue.pop();
lock.unlock();
for (const auto &callback : subscribers) {
callback(message);
}
lock.lock();
}
}
}

29
src/message_broker.h Normal file
View File

@@ -0,0 +1,29 @@
#pragma once
#include <functional>
#include <string>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
#include <vector>
class MessageBroker {
public:
using MessageCallback = std::function<void(const std::string &message)>;
void publish(const std::string &message);
void subscribe(const MessageCallback &callback);
void start();
void stop();
private:
std::queue<std::string> messageQueue;
std::vector<MessageCallback> subscribers;
std::mutex mutex;
std::condition_variable cv;
std::atomic<bool> running{false};
std::thread brokerThread;
void processMessages();
};

172
src/produce_worker.cpp Normal file
View File

@@ -0,0 +1,172 @@
#include "produce_worker.h"
#include "connection_guard.h" // Include for ConnectionGuard
#include <iostream>
#include <algorithm>
#include <thread>
#include <nlohmann/json.hpp>
ProduceWorker::ProduceWorker(ConnectionPool &pool, MessageBroker &broker)
: Worker(pool, broker, "ProduceWorker")
{
}
ProduceWorker::~ProduceWorker() {
}
void ProduceWorker::run() {
auto lastIterationTime = std::chrono::steady_clock::now();
while (runningWorker) {
setCurrentStep("Check runningWorker Variable");
{
std::lock_guard<std::mutex> lock(activityMutex);
if (!runningWorker) {
break;
}
}
setCurrentStep("Calculate elapsed time");
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - lastIterationTime);
if (elapsed < std::chrono::milliseconds(200)) {
std::this_thread::sleep_for(std::chrono::milliseconds(200) - elapsed);
}
lastIterationTime = std::chrono::steady_clock::now();
setCurrentStep("Process Productions");
processProductions();
setCurrentStep("Signal Activity");
signalActivity();
setCurrentStep("Loop Done");
}
}
void ProduceWorker::processProductions() {
try {
setCurrentStep("Get Database Connection");
ConnectionGuard connGuard(pool);
auto &db = connGuard.get();
setCurrentStep("Fetch Finished Productions");
auto finishedProductions = getFinishedProductions(db);
setCurrentStep("Process Finished Productions");
for (const auto &production : finishedProductions) {
if (production.find("branch_id") == production.end() ||
production.find("product_id") == production.end() ||
production.find("quantity") == production.end() ||
production.find("quality") == production.end() ||
production.find("user_id") == production.end()) {
continue;
}
int branchId = std::stoi(production.at("branch_id"));
int productId = std::stoi(production.at("product_id"));
int quantity = std::stoi(production.at("quantity"));
int quality = std::stoi(production.at("quality"));
int userId = std::stoi(production.at("user_id"));
if (addToInventory(db, branchId, productId, quantity, quality, userId)) {
}
deleteProduction(db, production.at("production_id"));
}
} catch (const std::exception &e) {
std::cerr << "[ProduceWorker] Fehler in processProductions: " << e.what() << std::endl;
}
}
std::vector<std::unordered_map<std::string, std::string>> ProduceWorker::getFinishedProductions(Database &db) {
try {
db.prepare("get_finished_productions", QUERY_GET_FINISHED_PRODUCTIONS);
return db.execute("get_finished_productions");
} catch (const std::exception &e) {
std::cerr << "[ProduceWorker] Fehler beim Abrufen abgeschlossener Produktionen: "
<< e.what() << std::endl;
}
return {};
}
bool ProduceWorker::addToInventory(Database &db,
int branchId,
int productId,
int quantity,
int quality,
int userId)
{
try {
db.prepare("get_stocks", QUERY_GET_AVAILABLE_STOCKS);
auto stocks = db.execute("get_stocks", {std::to_string(branchId)});
int remainingQuantity = quantity;
for (const auto &stock : stocks) {
int stockId = std::stoi(stock.at("id"));
int totalCapacity = std::stoi(stock.at("total_capacity"));
int filledCapacity = std::stoi(stock.at("filled"));
int freeCapacity = totalCapacity - filledCapacity;
if (freeCapacity <= 0) {
continue;
}
int toStore = std::min(remainingQuantity, freeCapacity);
if (!storeInStock(db, stockId, productId, toStore, quality)) {
return false;
}
remainingQuantity -= toStore;
sendProductionReadyEvent(userId, productId, quantity, quality, branchId);
if (remainingQuantity <= 0) {
break;
}
}
return (remainingQuantity == 0);
} catch (const std::exception &e) {
std::cerr << "[ProduceWorker] Fehler in addToInventory: " << e.what() << std::endl;
}
return false;
}
bool ProduceWorker::storeInStock(Database &db,
int stockId,
int productId,
int quantity,
int quality)
{
try {
db.prepare("insert_inventory", QUERY_INSERT_INVENTORY);
db.execute("insert_inventory", {std::to_string(stockId),
std::to_string(productId),
std::to_string(quantity),
std::to_string(quality)});
return true;
} catch (const std::exception &e) {
std::cerr << "[ProduceWorker] Fehler in storeInStock: " << e.what() << std::endl;
}
return false;
}
void ProduceWorker::deleteProduction(Database &db, const std::string &productionId) {
try {
db.prepare("delete_production", QUERY_DELETE_PRODUCTION);
db.execute("delete_production", {productionId});
} catch (const std::exception &e) {
std::cerr << "[ProduceWorker] Fehler beim Löschen der Produktion: " << e.what() << std::endl;
}
}
void ProduceWorker::sendProductionReadyEvent(int userId,
int productId,
int quantity,
int quality,
int branchId)
{
try {
nlohmann::json message = {
{"event", "production_ready"},
{"user_id", userId},
{"product_id", productId},
{"quantity", quantity},
{"quality", quality},
{"branch_id", branchId}
};
broker.publish(message.dump());
} catch (const std::exception &e) {
std::cerr << "[ProduceWorker] Fehler beim Senden des Production Ready Events: "
<< e.what() << std::endl;
}
}

70
src/produce_worker.h Normal file
View File

@@ -0,0 +1,70 @@
#pragma once
#include "worker.h"
#include <vector>
#include <map>
#include <string>
class ProduceWorker : public Worker
{
public:
explicit ProduceWorker(ConnectionPool &pool, MessageBroker &broker);
~ProduceWorker() override;
protected:
void run() override; // überschreibt Worker::run()
private:
// Fachlogik
void processProductions();
std::vector<std::unordered_map<std::string, std::string>> getFinishedProductions(Database &db);
bool addToInventory(Database &db, int branchId, int productId, int quantity, int quality, int userId);
bool storeInStock(Database &db, int stockId, int productId, int quantity, int quality);
void deleteProduction(Database &db, const std::string &productionId);
void sendProductionReadyEvent(int userId, int productId, int quantity, int quality, int branchId);
static constexpr const char *QUERY_GET_FINISHED_PRODUCTIONS = R"(
SELECT
p.id AS production_id,
p.branch_id,
p.product_id,
p.quantity,
p.start_timestamp,
pr.production_time,
k.character_id,
k.knowledge AS quality,
br.region_id,
br.falukant_user_id user_id
FROM falukant_data.production p
JOIN falukant_type.product pr ON p.product_id = pr.id
JOIN falukant_data.branch br ON p.branch_id = br.id
JOIN falukant_data.character c ON c.user_id = br.falukant_user_id
JOIN falukant_data.knowledge k ON p.product_id = k.product_id AND k.character_id = c.id
JOIN falukant_data.stock s ON s.branch_id = br.id
WHERE p.start_timestamp + interval '1 minute' * pr.production_time <= NOW()
ORDER BY p.start_timestamp;
)";
static constexpr const char *QUERY_GET_AVAILABLE_STOCKS = R"(
SELECT stock.id, stock.quantity AS total_capacity, (
SELECT COALESCE(SUM(inventory.quantity), 0)
FROM falukant_data.inventory
WHERE inventory.stock_id = stock.id
) AS filled, stock.branch_id
FROM falukant_data.stock stock
JOIN falukant_data.branch branch
ON stock.branch_id = branch.id
WHERE branch.id = $1
ORDER BY total_capacity DESC;
)";
static constexpr const char *QUERY_DELETE_PRODUCTION = R"(
DELETE FROM falukant_data.production WHERE id = $1;
)";
static constexpr const char *QUERY_INSERT_INVENTORY = R"(
INSERT INTO falukant_data.inventory (stock_id, product_id, quantity, quality, produced_at)
VALUES ($1, $2, $3, $4, NOW());
)";
};

149
src/websocket_server.cpp Normal file
View File

@@ -0,0 +1,149 @@
#include "websocket_server.h"
#include "connection_guard.h"
#include <iostream>
#include <nlohmann/json.hpp>
#include <thread>
#include <chrono>
#include <queue>
#include <condition_variable>
using json = nlohmann::json;
WebSocketServer::WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker)
: port(port), pool(pool), broker(broker) {}
void WebSocketServer::run() {
running = true;
broker.subscribe([this](const std::string &message) {
std::lock_guard<std::mutex> lock(queueMutex);
messageQueue.push(message);
queueCV.notify_one();
});
serverThread = std::thread([this]() { startServer(); });
messageProcessingThread = std::thread([this]() { processMessageQueue(); });
pingThread = std::thread([this]() { pingClients(); });
}
void WebSocketServer::stop() {
running = false;
if (serverThread.joinable()) serverThread.join();
if (messageProcessingThread.joinable()) messageProcessingThread.join();
if (pingThread.joinable()) pingThread.join();
}
void WebSocketServer::startServer() {
uWS::App()
.ws<WebSocketUserData>("/*", {
.open = [this](uWS::WebSocket<false, true, WebSocketUserData> *ws) {
ws->getUserData()->pongReceived = true;
},
.message = [this](uWS::WebSocket<false, true, WebSocketUserData> *ws, std::string_view message, uWS::OpCode opCode) {
handleWebSocketMessage(ws, message, opCode);
},
.close = [this](uWS::WebSocket<false, true, WebSocketUserData> *ws, int /*code*/, std::string_view /*message*/) {
handleWebSocketClose(ws);
}
})
.listen(port, [this](auto *token) {
if (token) {
std::cout << "WebSocket-Server läuft auf Port " << port << std::endl;
} else {
std::cerr << "WebSocket-Server konnte nicht gestartet werden!\n";
running = false;
}
})
.run();
}
void WebSocketServer::pingClients() {
while (running) {
std::this_thread::sleep_for(std::chrono::seconds(30));
std::unique_lock lock(connectionsMutex);
for (auto &[userId, ws] : connections) {
if (!ws->getUserData()->pongReceived) {
ws->close();
} else {
ws->getUserData()->pongReceived = false;
ws->send("ping", uWS::OpCode::TEXT);
}
}
}
}
void WebSocketServer::processMessageQueue() {
while (running) {
std::unique_lock lock(queueMutex);
queueCV.wait(lock, [this]() { return !messageQueue.empty() || !running; });
while (!messageQueue.empty()) {
std::string message = std::move(messageQueue.front());
messageQueue.pop();
lock.unlock();
handleBrokerMessage(message);
lock.lock();
}
}
}
void WebSocketServer::handleBrokerMessage(const std::string &message) {
try {
json parsedMessage = json::parse(message);
if (parsedMessage.contains("user_id")) {
int falukantUserId = parsedMessage["user_id"];
std::shared_lock lock(connectionsMutex);
auto userId = getUserIdFromFalukantUserId(falukantUserId);
auto it = connections.find(userId);
if (it != connections.end()) {
it->second->send(message, uWS::OpCode::TEXT);
std::cout << "[WebSocketServer] Nachricht an User-ID: " << userId << " gesendet.\n";
} else {
std::cerr << "[WebSocketServer] Keine Verbindung für User-ID: " << userId << "\n";
}
} else {
std::cerr << "[WebSocketServer] Ungültige Nachricht: " << message << "\n";
}
} catch (const std::exception &e) {
std::cerr << "[WebSocketServer] Fehler beim Verarbeiten der Nachricht: " << e.what() << "\n";
}
}
void WebSocketServer::handleWebSocketMessage(uWS::WebSocket<false, true, WebSocketUserData> *ws, std::string_view message, uWS::OpCode opCode) {
if (message == "pong") {
ws->getUserData()->pongReceived = true;
return;
}
json parsedMessage = json::parse(message);
if (parsedMessage.contains("event") && parsedMessage["event"] == "setUserId") {
std::string userId = parsedMessage["data"]["userId"];
std::unique_lock lock(connectionsMutex);
connections[userId] = ws;
ws->getUserData()->userId = userId;
}
}
void WebSocketServer::handleWebSocketClose(uWS::WebSocket<false, true, WebSocketUserData> *ws) {
std::unique_lock lock(connectionsMutex);
auto userId = ws->getUserData()->userId;
if (!userId.empty()) {
connections.erase(userId);
}
}
std::string WebSocketServer::getUserIdFromFalukantUserId(int & userId) {
ConnectionGuard guard(pool);
auto &db = guard.get();
std::string query = R"(
SELECT u.hashed_id
FROM community.user u
JOIN falukant_data.falukant_user fu ON u.id = fu.user_id
WHERE fu.id = $1
)";
db.prepare("get_user_id", query);
auto users = db.execute("get_user_id", {std::to_string(userId)});
if (!users.empty()) {
return users[0]["hashed_id"];
} else {
return "";
}
}

50
src/websocket_server.h Normal file
View File

@@ -0,0 +1,50 @@
#pragma once
#include "message_broker.h"
#include "connection_pool.h"
#include <uWebSockets/App.h>
#include <unordered_map>
#include <string>
#include <atomic>
#include <thread>
#include <shared_mutex>
#include <nlohmann/json.hpp>
#include <queue>
#include <condition_variable>
struct WebSocketUserData {
std::string userId;
bool pongReceived = true;
};
class WebSocketServer {
public:
WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker);
void run();
void stop();
private:
void startServer();
void processMessageQueue();
void pingClients();
void handleBrokerMessage(const std::string &message);
void handleWebSocketMessage(uWS::WebSocket<false, true, WebSocketUserData> *ws, std::string_view message, uWS::OpCode opCode);
void handleWebSocketClose(uWS::WebSocket<false, true, WebSocketUserData> *ws);
std::string getUserIdFromFalukantUserId(int &userId);
int port;
ConnectionPool &pool;
MessageBroker &broker;
std::atomic<bool> running{false};
std::thread serverThread;
std::thread messageProcessingThread;
std::thread pingThread;
std::unordered_map<std::string, uWS::WebSocket<false, true, WebSocketUserData> *> connections;
std::shared_mutex connectionsMutex;
std::queue<std::string> messageQueue;
std::mutex queueMutex;
std::condition_variable queueCV;
};

117
src/worker.h Normal file
View File

@@ -0,0 +1,117 @@
#pragma once
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <iostream>
#include <functional>
#include "connection_pool.h"
#include "message_broker.h"
#include "database.h"
class Worker {
public:
Worker(ConnectionPool &pool, MessageBroker &broker, std::string name)
: pool(pool),
broker(broker),
workerName(std::move(name)),
runningWorker(false),
runningWatchdog(false)
{}
virtual ~Worker() {
stopWorkerThread();
stopWatchdogThread();
}
void startWorkerThread() {
if (runningWorker.load()) {
std::cerr << "[" << workerName << "] Worker thread already running, skipping start.\n";
return;
}
runningWorker.store(true);
workerThread = std::thread([this]() { run(); });
}
void stopWorkerThread() {
runningWorker.store(false);
if (workerThread.joinable()) {
workerThread.join();
}
}
void enableWatchdog() {
if (runningWatchdog.load()) {
std::cerr << "[" << workerName << "] Watchdog already enabled, skipping.\n";
return;
}
runningWatchdog.store(true);
watchdogThread = std::thread([this]() { watchdog(); });
}
void stopWatchdogThread() {
runningWatchdog.store(false);
if (watchdogThread.joinable()) {
watchdogThread.join();
}
}
std::string getCurrentStep() {
std::lock_guard<std::mutex> lock(stepMutex);
return currentStep;
}
protected:
virtual void run() = 0;
void watchdog() {
try {
while (runningWatchdog.load()) {
std::this_thread::sleep_for(watchdogInterval);
bool isActive = false;
{
std::lock_guard<std::mutex> lock(activityMutex);
isActive = active;
active = false;
}
if (!isActive) {
std::cerr << "[" << workerName << "] Watchdog: Keine Aktivität! Starte Worker neu...\n";
std::cerr << "[" << workerName << "] Letzte Aktivität: " << getCurrentStep() << "\n";
stopWorkerThread();
startWorkerThread();
}
}
} catch (const std::exception &e) {
std::cerr << "[" << workerName << "] Watchdog: Ausnahme gefangen: " << e.what() << "\n";
} catch (...) {
std::cerr << "[" << workerName << "] Watchdog: Unbekannte Ausnahme gefangen.\n";
}
}
void signalActivity() {
std::lock_guard<std::mutex> lock(activityMutex);
active = true;
}
void setCurrentStep(const std::string &step) {
std::lock_guard<std::mutex> lock(stepMutex);
currentStep = step;
}
protected:
ConnectionPool &pool;
MessageBroker &broker;
std::string workerName;
std::atomic<bool> runningWorker;
std::atomic<bool> runningWatchdog;
std::atomic<bool> active{false};
std::thread workerThread;
std::thread watchdogThread;
std::mutex activityMutex;
std::chrono::seconds watchdogInterval{10};
std::mutex stepMutex;
std::string currentStep;
};