use crate::db::{Row, Rows}; use crate::message_broker::MessageBroker; use std::cmp::min; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use crate::db::ConnectionPool; use super::base::{BaseWorker, Worker, WorkerState}; use crate::worker::sql::{ QUERY_GET_FINISHED_PRODUCTIONS, QUERY_GET_AVAILABLE_STOCKS, QUERY_DELETE_PRODUCTION, QUERY_INSERT_INVENTORY, QUERY_INSERT_UPDATE_PRODUCTION_LOG, QUERY_ADD_OVERPRODUCTION_NOTIFICATION, }; /// Abbildet eine abgeschlossene Produktion aus der Datenbank. #[derive(Debug, Clone)] struct FinishedProduction { production_id: i32, branch_id: i32, product_id: i32, quantity: i32, quality: i32, user_id: i32, region_id: i32, } /// Abbildet ein Lager (Stock) mit Kapazität. #[derive(Debug, Clone)] struct StockInfo { stock_id: i32, total_capacity: i32, filled: i32, } pub struct ProduceWorker { base: BaseWorker, last_iteration: Option, } impl ProduceWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("ProduceWorker", pool, broker), last_iteration: None, } } fn run_iteration(&mut self, state: &WorkerState) { self.base .set_current_step("Check runningWorker Variable"); if !state.running_worker.load(Ordering::Relaxed) { return; } let sleep_duration = self.time_until_next_iteration(); self.sleep_with_shutdown_check(sleep_duration, state); if !state.running_worker.load(Ordering::Relaxed) { return; } self.base.set_current_step("Process Productions"); self.process_productions(); self.base.set_current_step("Signal Activity"); // TODO: Später Analogie zu signalActivity() aus der C++-Basisklasse herstellen. self.base.set_current_step("Loop Done"); } fn time_until_next_iteration(&mut self) -> Duration { const MIN_INTERVAL_MS: u64 = 200; let now = Instant::now(); match self.last_iteration { None => { self.last_iteration = Some(now); Duration::from_millis(0) } Some(last) => { let elapsed = now.saturating_duration_since(last); if elapsed >= Duration::from_millis(MIN_INTERVAL_MS) { self.last_iteration = Some(now); Duration::from_millis(0) } else { let remaining = Duration::from_millis(MIN_INTERVAL_MS) - elapsed; self.last_iteration = Some(now); remaining } } } } fn sleep_with_shutdown_check(&self, duration: Duration, state: &WorkerState) { const SLICE_MS: u64 = 10; let total_ms = duration.as_millis() as u64; let mut slept = 0; while slept < total_ms { if !state.running_worker.load(Ordering::Relaxed) { break; } let remaining = total_ms - slept; let slice = min(remaining, SLICE_MS); std::thread::sleep(Duration::from_millis(slice)); slept += slice; } } fn process_productions(&mut self) { self.base .set_current_step("Fetch Finished Productions"); let finished_productions = match self.get_finished_productions() { Ok(rows) => rows, Err(err) => { eprintln!("[ProduceWorker] Fehler in getFinishedProductions: {err}"); Vec::new() } }; self.base .set_current_step("Process Finished Productions"); // Nur Fehler loggen; keine Debug-Infos for production in finished_productions { self.handle_finished_production(&production); } } fn get_finished_productions(&self) -> Result, crate::db::DbError> { let rows = self.load_finished_productions()?; Ok(rows .into_iter() .filter_map(Self::map_row_to_finished_production) .collect()) } fn handle_finished_production(&mut self, production: &FinishedProduction) { let FinishedProduction { branch_id, product_id, quantity, quality, user_id, region_id, production_id, } = *production; if self.add_to_inventory(branch_id, product_id, quantity, quality, user_id) { self.delete_production(production_id); self.add_production_to_log(region_id, user_id, product_id, quantity); } } fn add_to_inventory( &mut self, branch_id: i32, product_id: i32, quantity: i32, quality: i32, user_id: i32, ) -> bool { let mut remaining_quantity = quantity; let stocks = match self.get_available_stocks(branch_id) { Ok(rows) => rows, Err(err) => { eprintln!("[ProduceWorker] Fehler in getAvailableStocks: {err}"); Vec::new() } }; // Ruhemodus: keine Info-Logs, nur Fehler for stock in stocks { if remaining_quantity <= 0 { break; } let free_capacity = stock.total_capacity - stock.filled; // keine Debug-Ausgabe if free_capacity <= 0 { continue; } let to_store = min(remaining_quantity, free_capacity); // keine Debug-Ausgabe if !self.store_in_stock(stock.stock_id, product_id, to_store, quality) { return false; } remaining_quantity -= to_store; // keine Debug-Ausgabe } if remaining_quantity == 0 { self.send_production_ready_event(user_id, product_id, quantity, quality, branch_id); true } else { // Es konnten nicht alle produzierten Einheiten eingelagert werden – // Überproduktion für diesen Branch melden. self.handle_overproduction(user_id, branch_id, remaining_quantity); true } } fn get_available_stocks(&self, branch_id: i32) -> Result, crate::db::DbError> { let rows = self.load_available_stocks(branch_id)?; Ok(rows .into_iter() .filter_map(Self::map_row_to_stock_info) .collect()) } fn store_in_stock( &self, stock_id: i32, product_id: i32, quantity: i32, quality: i32, ) -> bool { // Versuch: vorhandenen Inventory-Posten für (stock, product) erhöhen match self.update_inventory_by_stock_product(stock_id, product_id, quantity, quality) { Ok(updated) => { if updated { return true; } // Wenn kein Update stattfand, Insert versuchen if let Err(err) = self.insert_inventory(stock_id, product_id, quantity, quality) { eprintln!("[ProduceWorker] Fehler beim Insert in storeInStock: {err}"); return false; } true } Err(err) => { eprintln!("[ProduceWorker] Fehler beim Update in storeInStock: {err}"); false } } } fn delete_production(&self, production_id: i32) { if let Err(err) = self.remove_production(production_id) { eprintln!("[ProduceWorker] Fehler beim Löschen der Produktion: {err}"); } } fn add_production_to_log( &self, region_id: i32, user_id: i32, product_id: i32, quantity: i32, ) { if let Err(err) = self.insert_or_update_production_log(region_id, user_id, product_id, quantity) { eprintln!("[ProduceWorker] Fehler in addProductionToLog: {err}"); } } fn send_production_ready_event( &self, user_id: i32, product_id: i32, quantity: i32, quality: i32, branch_id: i32, ) { // JSON als String aufbauen, um externe Dependencies zu vermeiden. let message = format!( r#"{{"event":"production_ready","user_id":{user_id},"product_id":{product_id},"quantity":{quantity},"quality":{quality},"branch_id":{branch_id}}}"# ); self.base.broker.publish(message); } fn handle_overproduction(&self, user_id: i32, branch_id: i32, remaining_quantity: i32) { if let Err(err) = self.insert_overproduction_notification(user_id, branch_id, remaining_quantity) { eprintln!( "[ProduceWorker] Fehler beim Schreiben der Overproduction-Notification: {err}" ); } let update_status = format!(r#"{{"event":"falukantUpdateStatus","user_id":{user_id}}}"#); self.base.broker.publish(update_status); } fn load_finished_productions(&self) -> Result { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_finished_productions", QUERY_GET_FINISHED_PRODUCTIONS)?; conn.execute("get_finished_productions", &[]) } fn load_available_stocks(&self, branch_id: i32) -> Result { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_stocks", QUERY_GET_AVAILABLE_STOCKS)?; conn.execute("get_stocks", &[&branch_id]) } fn insert_inventory( &self, stock_id: i32, product_id: i32, quantity: i32, quality: i32, ) -> Result<(), crate::db::DbError> { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("insert_inventory", QUERY_INSERT_INVENTORY)?; let _rows = conn.execute("insert_inventory", &[&stock_id, &product_id, &quantity, &quality])?; Ok(()) } fn update_inventory_by_stock_product( &self, stock_id: i32, product_id: i32, quantity: i32, quality: i32, ) -> Result { use crate::worker::sql::QUERY_UPDATE_INVENTORY_BY_STOCK_PRODUCT; let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("update_inventory_by_stock_product", QUERY_UPDATE_INVENTORY_BY_STOCK_PRODUCT)?; let rows = conn.execute( "update_inventory_by_stock_product", &[&stock_id, &product_id, &quantity, &quality], )?; Ok(!rows.is_empty()) } fn remove_production(&self, production_id: i32) -> Result<(), crate::db::DbError> { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("delete_production", QUERY_DELETE_PRODUCTION)?; conn.execute("delete_production", &[&production_id])?; Ok(()) } fn insert_or_update_production_log( &self, region_id: i32, user_id: i32, product_id: i32, quantity: i32, ) -> Result<(), crate::db::DbError> { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "insert_update_production_log", QUERY_INSERT_UPDATE_PRODUCTION_LOG, )?; conn.execute( "insert_update_production_log", &[®ion_id, &product_id, &quantity, &user_id], )?; Ok(()) } fn insert_overproduction_notification( &self, user_id: i32, branch_id: i32, remaining_quantity: i32, ) -> Result<(), crate::db::DbError> { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "add_overproduction_notification", QUERY_ADD_OVERPRODUCTION_NOTIFICATION, )?; // Zusätzlich zur Menge die Branch-ID in der Payload mitschicken, damit // das Frontend die Überproduktion einem konkreten Branch zuordnen kann. let notification = format!( r#"{{"tr":"production.overproduction","value":{},"branch_id":{}}}"#, remaining_quantity, branch_id ); conn.execute( "add_overproduction_notification", &[&user_id, ¬ification], )?; Ok(()) } fn map_row_to_finished_production(row: Row) -> Option { Some(FinishedProduction { production_id: row.get("production_id")?.parse().ok()?, branch_id: row.get("branch_id")?.parse().ok()?, product_id: row.get("product_id")?.parse().ok()?, quantity: row.get("quantity")?.parse().ok()?, quality: row.get("quality")?.parse().ok()?, user_id: row.get("user_id")?.parse().ok()?, region_id: row.get("region_id")?.parse().ok()?, }) } fn map_row_to_stock_info(row: Row) -> Option { Some(StockInfo { stock_id: row.get("id")?.parse().ok()?, total_capacity: row.get("total_capacity")?.parse().ok()?, filled: row.get("filled")?.parse().ok()?, }) } } impl Worker for ProduceWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { let mut worker = ProduceWorker::new(pool.clone(), broker.clone()); while state.running_worker.load(Ordering::Relaxed) { worker.run_iteration(&state); } }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } }