- Ändere die Pfade für SSL-Zertifikate in der Konfigurationsdatei. - Verbessere die Fehlerbehandlung beim Entfernen alter vorbereiteter Anweisungen in HouseWorker. - Füge Debug-Ausgaben zur Nachverfolgung von Verbindungen und Nachrichten im WebSocket-Server hinzu. - Implementiere Timeout-Logik für das Stoppen von Worker- und Watchdog-Threads. - Optimiere die Signalverarbeitung und Shutdown-Logik in main.cpp für bessere Responsivität.
203 lines
8.1 KiB
C++
203 lines
8.1 KiB
C++
#include "produce_worker.h"
|
|
#include "connection_guard.h"
|
|
#include <iostream>
|
|
#include <algorithm>
|
|
#include <thread>
|
|
#include <nlohmann/json.hpp>
|
|
|
|
ProduceWorker::ProduceWorker(ConnectionPool &pool, MessageBroker &broker)
|
|
: Worker(pool, broker, "ProduceWorker") {}
|
|
|
|
ProduceWorker::~ProduceWorker() {
|
|
}
|
|
|
|
void ProduceWorker::run() {
|
|
auto lastIterationTime = std::chrono::steady_clock::now();
|
|
while (runningWorker.load()) {
|
|
setCurrentStep("Check runningWorker Variable");
|
|
if (!runningWorker.load()) {
|
|
break;
|
|
}
|
|
|
|
setCurrentStep("Calculate elapsed time");
|
|
auto now = std::chrono::steady_clock::now();
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - lastIterationTime);
|
|
if (elapsed < std::chrono::milliseconds(200)) {
|
|
// Kürzere Sleep-Intervalle für bessere Shutdown-Responsivität
|
|
auto sleepTime = std::chrono::milliseconds(200) - elapsed;
|
|
for (int i = 0; i < sleepTime.count() && runningWorker.load(); i += 10) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
}
|
|
}
|
|
|
|
if (!runningWorker.load()) break;
|
|
|
|
lastIterationTime = std::chrono::steady_clock::now();
|
|
setCurrentStep("Process Productions");
|
|
processProductions();
|
|
setCurrentStep("Signal Activity");
|
|
signalActivity();
|
|
setCurrentStep("Loop Done");
|
|
}
|
|
}
|
|
|
|
void ProduceWorker::processProductions() {
|
|
try {
|
|
setCurrentStep("Get Database Connection");
|
|
ConnectionGuard connGuard(pool);
|
|
auto &db = connGuard.get();
|
|
setCurrentStep("Fetch Finished Productions");
|
|
auto finishedProductions = getFinishedProductions(db);
|
|
setCurrentStep("Process Finished Productions");
|
|
for (const auto &production : finishedProductions) {
|
|
if (production.find("branch_id") == production.end() ||
|
|
production.find("product_id") == production.end() ||
|
|
production.find("quantity") == production.end() ||
|
|
production.find("quality") == production.end() ||
|
|
production.find("user_id") == production.end()) {
|
|
continue;
|
|
}
|
|
int branchId = std::stoi(production.at("branch_id"));
|
|
int productId = std::stoi(production.at("product_id"));
|
|
int quantity = std::stoi(production.at("quantity"));
|
|
int quality = std::stoi(production.at("quality"));
|
|
int userId = std::stoi(production.at("user_id"));
|
|
int regionId = std::stoi(production.at("region_id"));
|
|
addToInventory(db, branchId, productId, quantity, quality, userId);
|
|
deleteProduction(db, production.at("production_id"));
|
|
addProductionToLog(regionId, userId, productId, quantity);
|
|
const nlohmann::json message = {
|
|
{"event", "production_ready"},
|
|
{"branch_id", std::to_string(branchId) }
|
|
};
|
|
sendMessageToFalukantUsers(userId, message);
|
|
}
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[ProduceWorker] Fehler in processProductions: " << e.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
std::vector<std::unordered_map<std::string, std::string>> ProduceWorker::getFinishedProductions(Database &db) {
|
|
try {
|
|
db.prepare("get_finished_productions", QUERY_GET_FINISHED_PRODUCTIONS);
|
|
return db.execute("get_finished_productions");
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[ProduceWorker] Fehler beim Abrufen abgeschlossener Produktionen: "
|
|
<< e.what() << std::endl;
|
|
}
|
|
return {};
|
|
}
|
|
|
|
bool ProduceWorker::addToInventory(Database &db,
|
|
int branchId,
|
|
int productId,
|
|
int quantity,
|
|
int quality,
|
|
int userId) {
|
|
try {
|
|
db.prepare("get_stocks", QUERY_GET_AVAILABLE_STOCKS);
|
|
auto stocks = db.execute("get_stocks", {std::to_string(branchId)});
|
|
int remainingQuantity = quantity;
|
|
for (const auto &stock : stocks) {
|
|
int stockId = std::stoi(stock.at("id"));
|
|
int totalCapacity = std::stoi(stock.at("total_capacity"));
|
|
int filledCapacity = std::stoi(stock.at("filled"));
|
|
int freeCapacity = totalCapacity - filledCapacity;
|
|
if (freeCapacity <= 0) {
|
|
continue;
|
|
}
|
|
int toStore = std::min(remainingQuantity, freeCapacity);
|
|
if (!storeInStock(db, stockId, productId, toStore, quality)) {
|
|
return false;
|
|
}
|
|
remainingQuantity -= toStore;
|
|
if (remainingQuantity <= 0) {
|
|
break;
|
|
}
|
|
}
|
|
if (remainingQuantity == 0) {
|
|
sendProductionReadyEvent(userId, productId, quantity, quality, branchId);
|
|
return true;
|
|
}
|
|
db.prepare("QUERY_ADD_OVERPRODUCTION_NOTIFICATION", QUERY_ADD_OVERPRODUCTION_NOTIFICATION);
|
|
nlohmann::json notification = {
|
|
{"tr", "production.overproduction"},
|
|
{"value", remainingQuantity}
|
|
};
|
|
db.execute("QUERY_ADD_OVERPRODUCTION_NOTIFICATION", {std::to_string(userId), notification.dump()});
|
|
|
|
// Sende falukantUpdateStatus nach dem Einfügen der Benachrichtigung
|
|
nlohmann::json updateMessage = {
|
|
{"event", "falukantUpdateStatus"},
|
|
{"user_id", userId}
|
|
};
|
|
broker.publish(updateMessage.dump());
|
|
|
|
return true;
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[ProduceWorker] Fehler in addToInventory: " << e.what() << std::endl;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool ProduceWorker::storeInStock(Database &db,
|
|
int stockId,
|
|
int productId,
|
|
int quantity,
|
|
int quality) {
|
|
try {
|
|
db.prepare("insert_inventory", QUERY_INSERT_INVENTORY);
|
|
db.execute("insert_inventory", {std::to_string(stockId),
|
|
std::to_string(productId),
|
|
std::to_string(quantity),
|
|
std::to_string(quality)});
|
|
return true;
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[ProduceWorker] Fehler in storeInStock: " << e.what() << std::endl;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void ProduceWorker::deleteProduction(Database &db, const std::string &productionId) {
|
|
try {
|
|
db.prepare("delete_production", QUERY_DELETE_PRODUCTION);
|
|
db.execute("delete_production", {productionId});
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[ProduceWorker] Fehler beim Löschen der Produktion: " << e.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
void ProduceWorker::sendProductionReadyEvent(int userId,
|
|
int productId,
|
|
int quantity,
|
|
int quality,
|
|
int branchId)
|
|
{
|
|
try {
|
|
nlohmann::json message = {
|
|
{"event", "production_ready"},
|
|
{"user_id", userId},
|
|
{"product_id", productId},
|
|
{"quantity", quantity},
|
|
{"quality", quality},
|
|
{"branch_id", branchId}
|
|
};
|
|
broker.publish(message.dump());
|
|
} catch (const std::exception &e) {
|
|
std::cerr << "[ProduceWorker] Fehler beim Senden des Production Ready Events: "
|
|
<< e.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
void ProduceWorker::addProductionToLog(int regionId, int userId, int productId, int quantity) {
|
|
try {
|
|
ConnectionGuard connGuard(pool);
|
|
auto &db = connGuard.get();
|
|
db.prepare("QUERY_INSERT_UPDATE_PRODUCTION_LOG", QUERY_INSERT_UPDATE_PRODUCTION_LOG);
|
|
db.execute("QUERY_INSERT_UPDATE_PRODUCTION_LOG", { std::to_string(regionId), std::to_string(productId),
|
|
std::to_string(productId), std::to_string(userId) });
|
|
} catch (const std::exception &e) {
|
|
|
|
}
|
|
}
|