diff --git a/src/db/connection.rs b/src/db/connection.rs index 83545f6..8e1b049 100644 --- a/src/db/connection.rs +++ b/src/db/connection.rs @@ -33,7 +33,8 @@ impl std::error::Error for DbError {} impl From for DbError { fn from(err: PgError) -> Self { if let Some(db_err) = err.as_db_error() { - let code = db_err.code(); + // SQLSTATE als String extrahieren (z.B. "23514") + let code = db_err.code().code().to_string(); let message = db_err.message(); let detail = db_err.detail().unwrap_or_default(); let hint = db_err.hint().unwrap_or_default(); diff --git a/src/main.rs b/src/main.rs index 1e11f8c..0c9c3d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ use message_broker::MessageBroker; use websocket_server::WebSocketServer; use worker::{ CharacterCreationWorker, ConnectionPool, DirectorWorker, HouseWorker, PoliticsWorker, - ProduceWorker, StockageManager, UndergroundWorker, UserCharacterWorker, + ProduceWorker, StockageManager, TransportWorker, UndergroundWorker, UserCharacterWorker, ValueRecalculationWorker, Worker, }; @@ -62,6 +62,7 @@ fn run_daemon() -> Result<(), Box> { fn install_signal_handler() -> Result<(), Box> { // Behandle SIGINT/SIGTERM (z.B. Strg+C) und leite auf das globale Flag um. ctrlc::set_handler(|| { + eprintln!("[Daemon] SIGINT/SIGTERM empfangen, fahre kontrolliert herunter..."); KEEP_RUNNING.store(false, Ordering::SeqCst); })?; Ok(()) @@ -138,6 +139,7 @@ fn create_workers(pool: ConnectionPool, broker: MessageBroker) -> Vec], broker: &MessageBroker, ) { + eprintln!("[Daemon] Shutdown-System gestartet: stoppe Worker..."); // systemd: wir fahren nun kontrolliert herunter let _ = daemon::notify(false, &[NotifyState::Stopping]); @@ -171,16 +174,21 @@ fn shutdown_system( } // 2) WebSocket-Server stoppen (Tokio-Runtime herunterfahren) + eprintln!("[Daemon] WebSocket-Server wird gestoppt..."); websocket_server.stop(); // 3) MessageBroker-Hook – aktuell noch Stub, aber hier zentral ergänzt // für eine spätere interne Queue/Thread-Implementierung. + eprintln!("[Daemon] Broker wird gestoppt..."); broker.stop(); + eprintln!("[Daemon] Shutdown-System abgeschlossen."); } fn run_main_loop() { + eprintln!("[Daemon] Hauptloop gestartet."); while KEEP_RUNNING.load(Ordering::Relaxed) { thread::sleep(Duration::from_millis(100)); } + eprintln!("[Daemon] Hauptloop beendet, starte Shutdown-Sequenz..."); } diff --git a/src/worker/base.rs b/src/worker/base.rs index 3222f33..83decca 100644 --- a/src/worker/base.rs +++ b/src/worker/base.rs @@ -95,10 +95,14 @@ impl BaseWorker { self.watchdog_thread = Some(thread::spawn(move || { while state.running_watchdog.load(Ordering::Relaxed) { - thread::sleep(Duration::from_secs(10)); - - if !state.running_watchdog.load(Ordering::Relaxed) { - break; + // Nicht in einem großen 10s-Sleep blockieren, damit der + // Shutdown (stop_watchdog) zügig reagieren kann. Stattdessen + // in 1s-Scheiben schlafen und dazwischen das Flag prüfen. + for _ in 0..10 { + if !state.running_watchdog.load(Ordering::Relaxed) { + break; + } + thread::sleep(Duration::from_secs(1)); } let step = state.current_step.lock().unwrap().clone(); diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 7e45991..2a8afce 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -8,6 +8,7 @@ mod politics; mod underground; mod value_recalculation; mod user_character; +mod transport; pub use base::Worker; pub use crate::db::ConnectionPool; @@ -20,4 +21,5 @@ pub use politics::PoliticsWorker; pub use underground::UndergroundWorker; pub use value_recalculation::ValueRecalculationWorker; pub use user_character::UserCharacterWorker; +pub use transport::TransportWorker; diff --git a/src/worker/transport.rs b/src/worker/transport.rs new file mode 100644 index 0000000..48ed1e9 --- /dev/null +++ b/src/worker/transport.rs @@ -0,0 +1,478 @@ +use crate::db::{ConnectionPool, DbError}; +use crate::message_broker::MessageBroker; +use std::cmp::min; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use super::base::{BaseWorker, Worker, WorkerState}; + +#[derive(Debug, Clone)] +struct ArrivedTransport { + id: i32, + branch_id: i32, + product_id: i32, + size: i32, + vehicle_id: i32, + distance: f64, +} + +#[derive(Debug, Clone)] +struct StockInfo { + stock_id: i32, + total_capacity: i32, + filled: i32, +} + +// Ermittelt alle Transporte, die gemäß Distanz und Fahrzeuggeschwindigkeit bereits +// angekommen sein sollten. Die Reisezeit wird hier vereinfacht als +// travel_minutes = distance / speed +// interpretiert, d.h. `speed` gibt die Einheiten "Distanz pro Minute" an. +const QUERY_GET_ARRIVED_TRANSPORTS: &str = r#" + SELECT + t.id, + t.product_id, + t.size, + t.vehicle_id, + b.id AS branch_id, + rd.distance AS distance + FROM falukant_data.transport AS t + JOIN falukant_data.vehicle AS v + ON v.id = t.vehicle_id + JOIN falukant_type.vehicle AS vt + ON vt.id = v.vehicle_type_id + JOIN falukant_data.region_distance AS rd + ON ((rd.source_region_id = t.source_region_id + AND rd.target_region_id = t.target_region_id) + OR (rd.source_region_id = t.target_region_id + AND rd.target_region_id = t.source_region_id)) + AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL) + LEFT JOIN falukant_data.branch AS b + ON b.region_id = t.target_region_id + AND b.falukant_user_id = v.falukant_user_id + WHERE vt.speed > 0 + AND t.created_at + + (rd.distance / vt.speed::double precision) * INTERVAL '1 minute' + <= NOW(); +"#; + +// Verfügbare Lagerplätze in einem Branch, analog zur Logik im ProduceWorker. +const QUERY_GET_AVAILABLE_STOCKS: &str = r#" + SELECT + stock.id, + stock.quantity AS total_capacity, + ( + SELECT COALESCE(SUM(inventory.quantity), 0) + FROM falukant_data.inventory + WHERE inventory.stock_id = stock.id + ) AS filled + FROM falukant_data.stock stock + JOIN falukant_data.branch branch + ON stock.branch_id = branch.id + WHERE branch.id = $1 + ORDER BY total_capacity DESC; +"#; + +const QUERY_INSERT_INVENTORY: &str = r#" + INSERT INTO falukant_data.inventory ( + stock_id, + product_id, + quantity, + quality, + produced_at + ) VALUES ($1, $2, $3, $4, NOW()); +"#; + +const QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT: &str = r#" + UPDATE falukant_data.vehicle + SET region_id = $2, + condition = GREATEST(0, condition - $3::int), + available_from = NOW(), + updated_at = NOW() + WHERE id = $1; +"#; + +const QUERY_DELETE_TRANSPORT: &str = r#" + DELETE FROM falukant_data.transport + WHERE id = $1; +"#; + +/// Notification-Eintrag, analog zur Overproduction-Notification im ProduceWorker. +/// `tr` wird hier als JSON-String mit Übersetzungs-Key und Werten gespeichert. +const QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION: &str = r#" + INSERT INTO falukant_log.notification ( + user_id, + tr, + shown, + created_at, + updated_at + ) VALUES ($1, $2, FALSE, NOW(), NOW()); +"#; + +pub struct TransportWorker { + base: BaseWorker, +} + +impl TransportWorker { + pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { + Self { + base: BaseWorker::new("TransportWorker", pool, broker), + } + } + + fn run_loop(pool: ConnectionPool, _broker: MessageBroker, state: Arc) { + while state.running_worker.load(Ordering::Relaxed) { + if let Err(err) = Self::process_arrived_transports(&pool) { + eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}"); + } + + // Einmal pro Sekunde prüfen + for _ in 0..1 { + if !state.running_worker.load(Ordering::Relaxed) { + break; + } + std::thread::sleep(Duration::from_secs(1)); + } + } + } + + fn process_arrived_transports(pool: &ConnectionPool) -> Result<(), DbError> { + let transports = Self::load_arrived_transports(pool)?; + + for t in transports { + if let Err(err) = Self::handle_arrived_transport(pool, &t) { + eprintln!( + "[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}", + t.id + ); + } + } + + Ok(()) + } + + fn load_arrived_transports(pool: &ConnectionPool) -> Result, DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?; + let rows = conn.execute("get_arrived_transports", &[])?; + + let mut result = Vec::with_capacity(rows.len()); + for row in rows { + let id = parse_i32(&row, "id", -1); + let branch_id = parse_i32(&row, "branch_id", -1); + let product_id = parse_i32(&row, "product_id", -1); + let size = parse_i32(&row, "size", 0); + let vehicle_id = parse_i32(&row, "vehicle_id", -1); + let distance = parse_f64(&row, "distance", 0.0); + + if id >= 0 && branch_id >= 0 && product_id >= 0 && vehicle_id >= 0 && size > 0 { + result.push(ArrivedTransport { + id, + branch_id, + product_id, + size, + vehicle_id, + distance, + }); + } + } + + Ok(result) + } + + fn handle_arrived_transport(pool: &ConnectionPool, t: &ArrivedTransport) -> Result<(), DbError> { + // 1) Waren in das Ziel-Branch-Lager einbuchen – wir erhalten die + // tatsächlich verbleibende Menge im Transportmittel zurück. + let remaining_quantity = + Self::add_to_inventory(pool, t.branch_id, t.product_id, t.size)?; + + let delivered = t.size.saturating_sub(remaining_quantity); + + if remaining_quantity <= 0 { + // Alles konnte eingelagert werden: + // 2) Fahrzeug-Region & Zustand aktualisieren + Self::update_vehicle_after_transport(pool, t.vehicle_id, t.branch_id, t.distance)?; + + // 3) Transport-Eintrag löschen + Self::delete_transport(pool, t.id)?; + } else { + // Es konnte nur ein Teil (oder nichts) eingelagert werden: + // - Transport bleibt in der Tabelle bestehen + // - Größe im Transport anpassen, damit nur der verbliebene Rest + // beim nächsten Durchlauf erneut versucht wird. + if delivered > 0 { + Self::update_transport_size(pool, t.id, remaining_quantity)?; + } + + // Nutzer informieren, dass Ware noch im Transportmittel liegt. + // Wir ermitteln hierzu den Besitzer des Ziel-Branches. + if let Ok(user_id) = Self::get_branch_user_id(pool, t.branch_id) { + if user_id > 0 { + if let Err(err) = Self::insert_transport_waiting_notification( + pool, + user_id, + t.product_id, + remaining_quantity, + ) { + eprintln!( + "[TransportWorker] Fehler beim Schreiben der Transport-Waiting-Notification: {err}" + ); + } + } + } + } + + Ok(()) + } + + fn add_to_inventory( + pool: &ConnectionPool, + branch_id: i32, + product_id: i32, + quantity: i32, + ) -> Result { + let stocks = Self::get_available_stocks(pool, branch_id)?; + + let mut remaining_quantity = quantity; + + for stock in stocks { + if remaining_quantity <= 0 { + break; + } + + let free_capacity = stock.total_capacity - stock.filled; + if free_capacity <= 0 { + continue; + } + + let to_store = min(remaining_quantity, free_capacity); + Self::store_in_stock(pool, stock.stock_id, product_id, to_store)?; + remaining_quantity -= to_store; + } + + Ok(remaining_quantity) + } + + fn get_available_stocks(pool: &ConnectionPool, branch_id: i32) -> Result, DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + conn.prepare("transport_get_stocks", QUERY_GET_AVAILABLE_STOCKS)?; + let rows = conn.execute("transport_get_stocks", &[&branch_id])?; + + let mut result = Vec::with_capacity(rows.len()); + for row in rows { + let stock_id = parse_i32(&row, "id", -1); + let total_capacity = parse_i32(&row, "total_capacity", 0); + let filled = parse_i32(&row, "filled", 0); + + if stock_id >= 0 { + result.push(StockInfo { + stock_id, + total_capacity, + filled, + }); + } + } + + Ok(result) + } + + fn store_in_stock( + pool: &ConnectionPool, + stock_id: i32, + product_id: i32, + quantity: i32, + ) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + conn.prepare("transport_insert_inventory", QUERY_INSERT_INVENTORY)?; + + // Qualität für Transporte zunächst konservativ auf 100 setzen. Eine + // spätere Erweiterung könnte Qualitätseinbußen durch lange Strecken + // oder schlechten Fahrzeugzustand berücksichtigen. + let quality: i32 = 100; + + conn.execute( + "transport_insert_inventory", + &[&stock_id, &product_id, &quantity, &quality], + )?; + + Ok(()) + } + + fn update_vehicle_after_transport( + pool: &ConnectionPool, + vehicle_id: i32, + target_branch_id: i32, + distance: f64, + ) -> Result<(), DbError> { + // Hole die Ziel-Region über den Branch + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + // Region des Branches abrufen + const QUERY_GET_BRANCH_REGION: &str = r#" + SELECT region_id + FROM falukant_data.branch + WHERE id = $1 + LIMIT 1; + "#; + + conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?; + let rows = conn.execute("get_branch_region", &[&target_branch_id])?; + + let region_id = rows + .get(0) + .and_then(|r| r.get("region_id")) + .and_then(|v| v.parse::().ok()) + .unwrap_or(-1); + + if region_id < 0 { + return Err(DbError::new(format!( + "Konnte Region für Branch {} nicht bestimmen", + target_branch_id + ))); + } + + conn.prepare( + "update_vehicle_after_transport", + QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT, + )?; + + let distance_int = distance.round() as i32; + + conn.execute( + "update_vehicle_after_transport", + &[&vehicle_id, ®ion_id, &distance_int], + )?; + + Ok(()) + } + + fn delete_transport(pool: &ConnectionPool, transport_id: i32) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + conn.prepare("delete_transport", QUERY_DELETE_TRANSPORT)?; + conn.execute("delete_transport", &[&transport_id])?; + + Ok(()) + } + + fn update_transport_size( + pool: &ConnectionPool, + transport_id: i32, + new_size: i32, + ) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + const QUERY_UPDATE_TRANSPORT_SIZE: &str = r#" + UPDATE falukant_data.transport + SET size = $2, + updated_at = NOW() + WHERE id = $1; + "#; + + conn.prepare("update_transport_size", QUERY_UPDATE_TRANSPORT_SIZE)?; + conn.execute("update_transport_size", &[&transport_id, &new_size])?; + + Ok(()) + } + + fn get_branch_user_id(pool: &ConnectionPool, branch_id: i32) -> Result { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + const QUERY_GET_BRANCH_USER: &str = r#" + SELECT falukant_user_id + FROM falukant_data.branch + WHERE id = $1 + LIMIT 1; + "#; + + conn.prepare("get_branch_user", QUERY_GET_BRANCH_USER)?; + let rows = conn.execute("get_branch_user", &[&branch_id])?; + + let user_id = rows + .get(0) + .and_then(|r| r.get("falukant_user_id")) + .and_then(|v| v.parse::().ok()) + .unwrap_or(-1); + + Ok(user_id) + } + + fn insert_transport_waiting_notification( + pool: &ConnectionPool, + user_id: i32, + product_id: i32, + remaining_quantity: i32, + ) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + conn.prepare( + "add_transport_waiting_notification", + QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION, + )?; + + let notification = format!( + r#"{{"tr":"transport.waiting","productId":{},"value":{}}}"#, + product_id, remaining_quantity + ); + + conn.execute( + "add_transport_waiting_notification", + &[&user_id, ¬ification], + )?; + + Ok(()) + } +} + +impl Worker for TransportWorker { + fn start_worker_thread(&mut self) { + let pool = self.base.pool.clone(); + let broker = self.base.broker.clone(); + + self.base + .start_worker_with_loop(move |state: Arc| { + TransportWorker::run_loop(pool.clone(), broker.clone(), state); + }); + } + + fn stop_worker_thread(&mut self) { + self.base.stop_worker(); + } + + fn enable_watchdog(&mut self) { + self.base.start_watchdog(); + } +} + +fn parse_i32(row: &crate::db::Row, key: &str, default: i32) -> i32 { + row.get(key) + .and_then(|v| v.parse::().ok()) + .unwrap_or(default) +} + +fn parse_f64(row: &crate::db::Row, key: &str, default: f64) -> f64 { + row.get(key) + .and_then(|v| v.parse::().ok()) + .unwrap_or(default) +} + +