use crate::db::{ConnectionPool, DbError}; use crate::message_broker::MessageBroker; use std::cmp::min; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use super::base::{BaseWorker, Worker, WorkerState}; #[derive(Debug, Clone)] struct ArrivedTransport { id: i32, source_branch_id: Option, // NULL wenn kein Branch in Source-Region target_branch_id: i32, product_id: Option, // NULL für leere Transporte size: i32, vehicle_id: i32, distance: f64, user_id: i32, // User-ID für Notifications } #[derive(Debug, Clone)] struct StockInfo { stock_id: i32, total_capacity: i32, filled: i32, } // Ermittelt alle Transporte, die gemäß Distanz und Fahrzeuggeschwindigkeit bereits // angekommen sein sollten. Die Reisezeit wird hier vereinfacht als // travel_minutes = distance / speed // interpretiert, d.h. `speed` gibt die Einheiten "Distanz pro Minute" an. const QUERY_GET_ARRIVED_TRANSPORTS: &str = r#" SELECT t.id, t.product_id, t.size, t.vehicle_id, t.source_region_id, t.target_region_id, b_target.id AS target_branch_id, b_source.id AS source_branch_id, rd.distance AS distance, v.falukant_user_id AS user_id FROM falukant_data.transport AS t JOIN falukant_data.vehicle AS v ON v.id = t.vehicle_id JOIN falukant_type.vehicle AS vt ON vt.id = v.vehicle_type_id JOIN falukant_data.region_distance AS rd ON ((rd.source_region_id = t.source_region_id AND rd.target_region_id = t.target_region_id) OR (rd.source_region_id = t.target_region_id AND rd.target_region_id = t.source_region_id)) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL) LEFT JOIN falukant_data.branch AS b_target ON b_target.region_id = t.target_region_id AND b_target.falukant_user_id = v.falukant_user_id LEFT JOIN falukant_data.branch AS b_source ON b_source.region_id = t.source_region_id AND b_source.falukant_user_id = v.falukant_user_id WHERE vt.speed > 0 AND t.created_at + (rd.distance / vt.speed::double precision) * INTERVAL '1 minute' <= NOW(); "#; // Verfügbare Lagerplätze in einem Branch, analog zur Logik im ProduceWorker. const QUERY_GET_AVAILABLE_STOCKS: &str = r#" SELECT stock.id, stock.quantity AS total_capacity, ( SELECT COALESCE(SUM(inventory.quantity), 0) FROM falukant_data.inventory WHERE inventory.stock_id = stock.id ) AS filled FROM falukant_data.stock stock JOIN falukant_data.branch branch ON stock.branch_id = branch.id WHERE branch.id = $1 ORDER BY total_capacity DESC; "#; const QUERY_INSERT_INVENTORY: &str = r#" INSERT INTO falukant_data.inventory ( stock_id, product_id, quantity, quality, produced_at ) VALUES ($1, $2, $3, $4, NOW()); "#; const QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT: &str = r#" UPDATE falukant_data.vehicle SET region_id = $2, condition = GREATEST(0, condition - $3::int), available_from = NOW(), updated_at = NOW() WHERE id = $1; "#; const QUERY_DELETE_TRANSPORT: &str = r#" DELETE FROM falukant_data.transport WHERE id = $1; "#; /// Notification-Eintrag, analog zur Overproduction-Notification im ProduceWorker. /// `tr` wird hier als JSON-String mit Übersetzungs-Key und Werten gespeichert. const QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION: &str = r#" INSERT INTO falukant_log.notification ( user_id, tr, shown, created_at, updated_at ) VALUES ($1, $2, FALSE, NOW(), NOW()); "#; pub struct TransportWorker { base: BaseWorker, } impl TransportWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("TransportWorker", pool, broker), } } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { while state.running_worker.load(Ordering::Relaxed) { if let Err(err) = Self::process_arrived_transports(&pool, &broker) { eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}"); } // Einmal pro Sekunde prüfen for _ in 0..1 { if !state.running_worker.load(Ordering::Relaxed) { break; } std::thread::sleep(Duration::from_secs(1)); } } } fn process_arrived_transports( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let transports = Self::load_arrived_transports(pool)?; for t in transports { if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) { eprintln!( "[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}", t.id ); } } Ok(()) } fn load_arrived_transports(pool: &ConnectionPool) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?; let rows = conn.execute("get_arrived_transports", &[])?; let mut result = Vec::with_capacity(rows.len()); for row in rows { let id = parse_i32(&row, "id", -1); let target_branch_id = parse_i32(&row, "target_branch_id", -1); // source_branch_id kann NULL sein (wenn kein Branch in Source-Region) let source_branch_id_str = row.get("source_branch_id"); let source_branch_id = source_branch_id_str .and_then(|v| v.parse::().ok()) .filter(|&x| x >= 0); // product_id kann NULL sein (für leere Transporte) let product_id_str = row.get("product_id"); let product_id = product_id_str .and_then(|v| v.parse::().ok()); let size = parse_i32(&row, "size", 0); let vehicle_id = parse_i32(&row, "vehicle_id", -1); let distance = parse_f64(&row, "distance", 0.0); let user_id = parse_i32(&row, "user_id", -1); // Für normale Transporte: product_id muss vorhanden sein und size > 0 // Für leere Transporte: product_id ist NULL und size = 0 let is_valid = if product_id.is_some() { // Normaler Transport mit Produkt id >= 0 && target_branch_id >= 0 && vehicle_id >= 0 && size > 0 && user_id >= 0 } else { // Leerer Transport (ohne Produkt) id >= 0 && target_branch_id >= 0 && vehicle_id >= 0 && size == 0 && user_id >= 0 }; if is_valid { result.push(ArrivedTransport { id, source_branch_id, target_branch_id, product_id, size, vehicle_id, distance, user_id, }); } } Ok(result) } fn handle_arrived_transport( pool: &ConnectionPool, broker: &MessageBroker, t: &ArrivedTransport, ) -> Result<(), DbError> { // Leere Transporte (ohne Produkt) werden anders behandelt if t.product_id.is_none() { // Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen eprintln!( "[TransportWorker] Leerer Transport {} angekommen: Fahrzeug {} zurückgeholt", t.id, t.vehicle_id ); Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?; Self::delete_transport(pool, t.id)?; // Notifications für beide Branches if t.user_id > 0 { // Target-Branch: Transport angekommen let target_message = format!( r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"empty":true}}"#, t.target_branch_id, t.user_id ); broker.publish(target_message); // Source-Branch: Transport entfernt (falls vorhanden) if let Some(source_branch_id) = t.source_branch_id { let source_message = format!( r#"{{"event":"transport_removed","branch_id":{},"user_id":{}}}"#, source_branch_id, t.user_id ); broker.publish(source_message); } } return Ok(()); } // Normaler Transport mit Produkt: // 1) Waren in das Ziel-Branch-Lager einbuchen – wir erhalten die // tatsächlich verbleibende Menge im Transportmittel zurück. let product_id = t.product_id.unwrap(); let remaining_quantity = Self::add_to_inventory(pool, t.target_branch_id, product_id, t.size)?; let delivered = t.size.saturating_sub(remaining_quantity); if remaining_quantity <= 0 { // Alles konnte eingelagert werden: // 2) Fahrzeug-Region & Zustand aktualisieren Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?; // 3) Transport-Eintrag löschen Self::delete_transport(pool, t.id)?; // Notifications für beide Branches if t.user_id > 0 { // Target-Branch: Transport angekommen und Inventar aktualisiert let target_transport_message = format!( r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"product_id":{},"quantity":{}}}"#, t.target_branch_id, t.user_id, product_id, delivered ); broker.publish(target_transport_message); let target_inventory_message = format!( r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#, t.target_branch_id, t.user_id ); broker.publish(target_inventory_message); // Source-Branch: Transport entfernt (falls vorhanden) if let Some(source_branch_id) = t.source_branch_id { let source_message = format!( r#"{{"event":"transport_removed","branch_id":{},"user_id":{}}}"#, source_branch_id, t.user_id ); broker.publish(source_message); } } } else { // Es konnte nur ein Teil (oder nichts) eingelagert werden: // - Transport bleibt in der Tabelle bestehen // - Größe im Transport anpassen, damit nur der verbliebene Rest // beim nächsten Durchlauf erneut versucht wird. if delivered > 0 { Self::update_transport_size(pool, t.id, remaining_quantity)?; // Notification: Teilweise eingelagert, Inventar aktualisiert if t.user_id > 0 { let target_inventory_message = format!( r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#, t.target_branch_id, t.user_id ); broker.publish(target_inventory_message); } } // Nutzer informieren, dass Ware noch im Transportmittel liegt. if t.user_id > 0 { if let Err(err) = Self::insert_transport_waiting_notification( pool, t.user_id, product_id, remaining_quantity, ) { eprintln!( "[TransportWorker] Fehler beim Schreiben der Transport-Waiting-Notification: {err}" ); } } } Ok(()) } fn add_to_inventory( pool: &ConnectionPool, branch_id: i32, product_id: i32, quantity: i32, ) -> Result { let stocks = Self::get_available_stocks(pool, branch_id)?; let mut remaining_quantity = quantity; for stock in stocks { if remaining_quantity <= 0 { break; } let free_capacity = stock.total_capacity - stock.filled; if free_capacity <= 0 { continue; } let to_store = min(remaining_quantity, free_capacity); Self::store_in_stock(pool, stock.stock_id, product_id, to_store)?; remaining_quantity -= to_store; } Ok(remaining_quantity) } fn get_available_stocks(pool: &ConnectionPool, branch_id: i32) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("transport_get_stocks", QUERY_GET_AVAILABLE_STOCKS)?; let rows = conn.execute("transport_get_stocks", &[&branch_id])?; let mut result = Vec::with_capacity(rows.len()); for row in rows { let stock_id = parse_i32(&row, "id", -1); let total_capacity = parse_i32(&row, "total_capacity", 0); let filled = parse_i32(&row, "filled", 0); if stock_id >= 0 { result.push(StockInfo { stock_id, total_capacity, filled, }); } } Ok(result) } fn store_in_stock( pool: &ConnectionPool, stock_id: i32, product_id: i32, quantity: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("transport_insert_inventory", QUERY_INSERT_INVENTORY)?; // Qualität für Transporte zunächst konservativ auf 100 setzen. Eine // spätere Erweiterung könnte Qualitätseinbußen durch lange Strecken // oder schlechten Fahrzeugzustand berücksichtigen. let quality: i32 = 100; conn.execute( "transport_insert_inventory", &[&stock_id, &product_id, &quantity, &quality], )?; Ok(()) } fn update_vehicle_after_transport( pool: &ConnectionPool, vehicle_id: i32, target_branch_id: i32, distance: f64, ) -> Result<(), DbError> { // Hole die Ziel-Region über den Branch let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; // Region des Branches abrufen const QUERY_GET_BRANCH_REGION: &str = r#" SELECT region_id FROM falukant_data.branch WHERE id = $1 LIMIT 1; "#; conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?; let rows = conn.execute("get_branch_region", &[&target_branch_id])?; let region_id = rows .get(0) .and_then(|r| r.get("region_id")) .and_then(|v| v.parse::().ok()) .unwrap_or(-1); if region_id < 0 { return Err(DbError::new(format!( "Konnte Region für Branch {} nicht bestimmen", target_branch_id ))); } conn.prepare( "update_vehicle_after_transport", QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT, )?; let distance_int = distance.round() as i32; conn.execute( "update_vehicle_after_transport", &[&vehicle_id, ®ion_id, &distance_int], )?; Ok(()) } fn delete_transport(pool: &ConnectionPool, transport_id: i32) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("delete_transport", QUERY_DELETE_TRANSPORT)?; conn.execute("delete_transport", &[&transport_id])?; Ok(()) } fn update_transport_size( pool: &ConnectionPool, transport_id: i32, new_size: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; const QUERY_UPDATE_TRANSPORT_SIZE: &str = r#" UPDATE falukant_data.transport SET size = $2, updated_at = NOW() WHERE id = $1; "#; conn.prepare("update_transport_size", QUERY_UPDATE_TRANSPORT_SIZE)?; conn.execute("update_transport_size", &[&transport_id, &new_size])?; Ok(()) } fn get_branch_user_id(pool: &ConnectionPool, branch_id: i32) -> Result { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; const QUERY_GET_BRANCH_USER: &str = r#" SELECT falukant_user_id FROM falukant_data.branch WHERE id = $1 LIMIT 1; "#; conn.prepare("get_branch_user", QUERY_GET_BRANCH_USER)?; let rows = conn.execute("get_branch_user", &[&branch_id])?; let user_id = rows .get(0) .and_then(|r| r.get("falukant_user_id")) .and_then(|v| v.parse::().ok()) .unwrap_or(-1); Ok(user_id) } fn insert_transport_waiting_notification( pool: &ConnectionPool, user_id: i32, product_id: i32, remaining_quantity: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "add_transport_waiting_notification", QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION, )?; let notification = format!( r#"{{"tr":"transport.waiting","productId":{},"value":{}}}"#, product_id, remaining_quantity ); conn.execute( "add_transport_waiting_notification", &[&user_id, ¬ification], )?; Ok(()) } } impl Worker for TransportWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { TransportWorker::run_loop(pool.clone(), broker.clone(), state); }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } } fn parse_i32(row: &crate::db::Row, key: &str, default: i32) -> i32 { row.get(key) .and_then(|v| v.parse::().ok()) .unwrap_or(default) } fn parse_f64(row: &crate::db::Row, key: &str, default: f64) -> f64 { row.get(key) .and_then(|v| v.parse::().ok()) .unwrap_or(default) }