use crate::db::{ConnectionPool, DbError}; use crate::message_broker::MessageBroker; use std::cmp::min; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; use crate::worker::sql::{ QUERY_GET_ARRIVED_TRANSPORTS, QUERY_GET_AVAILABLE_STOCKS, QUERY_INSERT_INVENTORY, QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT, QUERY_DELETE_TRANSPORT, QUERY_GET_BRANCH_REGION, QUERY_UPDATE_TRANSPORT_SIZE, }; #[derive(Debug, Clone)] struct ArrivedTransport { id: i32, source_branch_id: Option, // NULL wenn kein Branch in Source-Region target_branch_id: i32, product_id: Option, // NULL für leere Transporte size: i32, vehicle_id: i32, distance: f64, user_id: i32, // User-ID für Notifications } #[derive(Debug, Clone)] struct StockInfo { stock_id: i32, total_capacity: i32, filled: i32, } pub struct TransportWorker { base: BaseWorker, } impl TransportWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("TransportWorker", pool, broker), } } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { while state.running_worker.load(Ordering::Relaxed) { if let Err(err) = Self::process_arrived_transports(&pool, &broker) { eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}"); } // Einmal pro Sekunde prüfen for _ in 0..1 { if !state.running_worker.load(Ordering::Relaxed) { break; } std::thread::sleep(Duration::from_secs(1)); } } } fn process_arrived_transports( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let transports = Self::load_arrived_transports(pool)?; if !transports.is_empty() { eprintln!( "[TransportWorker] {} angekommene Transport(e) gefunden", transports.len() ); } for t in transports { if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) { eprintln!( "[TransportWorker] Fehler beim Verarbeiten von Transport {} (vehicle_id={}, product_id={:?}, size={}): {}", t.id, t.vehicle_id, t.product_id, t.size, err ); } } Ok(()) } fn load_arrived_transports(pool: &ConnectionPool) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?; let rows = conn.execute("get_arrived_transports", &[])?; if rows.is_empty() { // Nur alle 60 Sekunden loggen, um Log-Flut zu vermeiden static LAST_LOG: Mutex> = Mutex::new(None); let mut last_log = LAST_LOG.lock().unwrap(); let should_log = last_log .map(|t| t.elapsed().as_secs() >= 60) .unwrap_or(true); if should_log { eprintln!("[TransportWorker] Keine angekommenen Transporte gefunden"); *last_log = Some(Instant::now()); } } let mut result = Vec::with_capacity(rows.len()); for row in rows { let id = parse_i32(&row, "id", -1); let target_branch_id = parse_i32(&row, "target_branch_id", -1); // source_branch_id kann NULL sein (wenn kein Branch in Source-Region) let source_branch_id_str = row.get("source_branch_id"); let source_branch_id = source_branch_id_str .and_then(|v| v.parse::().ok()) .filter(|&x| x >= 0); // product_id kann NULL sein (für leere Transporte) let product_id_str = row.get("product_id"); let product_id = product_id_str .and_then(|v| v.parse::().ok()); let size = parse_i32(&row, "size", 0); let vehicle_id = parse_i32(&row, "vehicle_id", -1); let distance = parse_f64(&row, "distance", 0.0); let user_id = parse_i32(&row, "user_id", -1); // Für normale Transporte: product_id muss vorhanden sein und size > 0 // Für leere Transporte: product_id ist NULL und size = 0 let is_valid = if product_id.is_some() { // Normaler Transport mit Produkt id >= 0 && target_branch_id >= 0 && vehicle_id >= 0 && size > 0 && user_id >= 0 } else { // Leerer Transport (ohne Produkt) id >= 0 && target_branch_id >= 0 && vehicle_id >= 0 && size == 0 && user_id >= 0 }; if is_valid { result.push(ArrivedTransport { id, source_branch_id, target_branch_id, product_id, size, vehicle_id, distance, user_id, }); } } Ok(result) } fn handle_arrived_transport( pool: &ConnectionPool, broker: &MessageBroker, t: &ArrivedTransport, ) -> Result<(), DbError> { // Leere Transporte (ohne Produkt) werden anders behandelt if t.product_id.is_none() { // Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen // keine Info-Logs Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?; Self::delete_transport(pool, t.id)?; // Notifications für beide Branches if t.user_id > 0 { // Target-Branch: Transport angekommen let target_message = format!( r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"empty":true}}"#, t.target_branch_id, t.user_id ); broker.publish(target_message); // Source-Branch: Transport entfernt (falls vorhanden) if let Some(source_branch_id) = t.source_branch_id { let source_message = format!( r#"{{"event":"transport_removed","branch_id":{},"user_id":{}}}"#, source_branch_id, t.user_id ); broker.publish(source_message); } } return Ok(()); } // Normaler Transport mit Produkt: // 1) Waren in das Ziel-Branch-Lager einbuchen – wir erhalten die // tatsächlich verbleibende Menge im Transportmittel zurück. let product_id = t.product_id.unwrap(); let remaining_quantity = Self::add_to_inventory(pool, t.target_branch_id, product_id, t.size)?; let delivered = t.size.saturating_sub(remaining_quantity); if remaining_quantity <= 0 { // Alles konnte eingelagert werden: // 2) Fahrzeug-Region & Zustand aktualisieren Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?; // 3) Transport-Eintrag löschen Self::delete_transport(pool, t.id)?; // Notifications für beide Branches if t.user_id > 0 { // Target-Branch: Transport angekommen und Inventar aktualisiert let target_transport_message = format!( r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"product_id":{},"quantity":{}}}"#, t.target_branch_id, t.user_id, product_id, delivered ); broker.publish(target_transport_message); let target_inventory_message = format!( r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#, t.target_branch_id, t.user_id ); broker.publish(target_inventory_message); // Source-Branch: Transport entfernt (falls vorhanden) if let Some(source_branch_id) = t.source_branch_id { let source_message = format!( r#"{{"event":"transport_removed","branch_id":{},"user_id":{}}}"#, source_branch_id, t.user_id ); broker.publish(source_message); } } } else { // Es konnte nur ein Teil (oder nichts) eingelagert werden: // - Transport bleibt in der Tabelle bestehen // - Größe im Transport anpassen, damit nur der verbliebene Rest // beim nächsten Durchlauf erneut versucht wird. if delivered > 0 { Self::update_transport_size(pool, t.id, remaining_quantity)?; // Notification: Teilweise eingelagert, Inventar aktualisiert if t.user_id > 0 { let target_inventory_message = format!( r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#, t.target_branch_id, t.user_id ); broker.publish(target_inventory_message); } } else { // Nichts konnte eingelagert werden - Transport bleibt unverändert // Logge dies, damit wir sehen, dass der Transport wartet eprintln!( "[TransportWorker] Transport {} wartet: Kein Lagerplatz verfügbar (branch_id={}, product_id={}, size={})", t.id, t.target_branch_id, product_id, t.size ); } // Keine Notification für wartende Transporte, um Notification-System zu entlasten. } Ok(()) } fn add_to_inventory( pool: &ConnectionPool, branch_id: i32, product_id: i32, quantity: i32, ) -> Result { let stocks = Self::get_available_stocks(pool, branch_id)?; let mut remaining_quantity = quantity; for stock in stocks { if remaining_quantity <= 0 { break; } let free_capacity = stock.total_capacity - stock.filled; if free_capacity <= 0 { continue; } let to_store = min(remaining_quantity, free_capacity); Self::store_in_stock(pool, stock.stock_id, product_id, to_store)?; remaining_quantity -= to_store; } Ok(remaining_quantity) } fn get_available_stocks(pool: &ConnectionPool, branch_id: i32) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("transport_get_stocks", QUERY_GET_AVAILABLE_STOCKS)?; let rows = conn.execute("transport_get_stocks", &[&branch_id])?; let mut result = Vec::with_capacity(rows.len()); for row in rows { let stock_id = parse_i32(&row, "id", -1); let total_capacity = parse_i32(&row, "total_capacity", 0); let filled = parse_i32(&row, "filled", 0); if stock_id >= 0 { result.push(StockInfo { stock_id, total_capacity, filled, }); } } Ok(result) } fn store_in_stock( pool: &ConnectionPool, stock_id: i32, product_id: i32, quantity: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("transport_insert_inventory", QUERY_INSERT_INVENTORY)?; // Qualität für Transporte zunächst konservativ auf 100 setzen. Eine // spätere Erweiterung könnte Qualitätseinbußen durch lange Strecken // oder schlechten Fahrzeugzustand berücksichtigen. let quality: i32 = 100; conn.execute( "transport_insert_inventory", &[&stock_id, &product_id, &quantity, &quality], )?; Ok(()) } fn update_vehicle_after_transport( pool: &ConnectionPool, vehicle_id: i32, target_branch_id: i32, distance: f64, ) -> Result<(), DbError> { // Hole die Ziel-Region über den Branch let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; // Region des Branches abrufen conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?; let rows = conn.execute("get_branch_region", &[&target_branch_id])?; let region_id = rows .first() .and_then(|r| r.get("region_id")) .and_then(|v| v.parse::().ok()) .unwrap_or(-1); if region_id < 0 { return Err(DbError::new(format!( "Konnte Region für Branch {} nicht bestimmen", target_branch_id ))); } conn.prepare( "update_vehicle_after_transport", QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT, )?; let distance_int = distance.round() as i32; conn.execute( "update_vehicle_after_transport", &[&vehicle_id, ®ion_id, &distance_int], )?; Ok(()) } fn delete_transport(pool: &ConnectionPool, transport_id: i32) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("delete_transport", QUERY_DELETE_TRANSPORT)?; conn.execute("delete_transport", &[&transport_id])?; Ok(()) } fn update_transport_size( pool: &ConnectionPool, transport_id: i32, new_size: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("update_transport_size", QUERY_UPDATE_TRANSPORT_SIZE)?; conn.execute("update_transport_size", &[&transport_id, &new_size])?; Ok(()) } } impl Worker for TransportWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { TransportWorker::run_loop(pool.clone(), broker.clone(), state); }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } } fn parse_i32(row: &crate::db::Row, key: &str, default: i32) -> i32 { row.get(key) .and_then(|v| v.parse::().ok()) .unwrap_or(default) } fn parse_f64(row: &crate::db::Row, key: &str, default: f64) -> f64 { row.get(key) .and_then(|v| v.parse::().ok()) .unwrap_or(default) }