use crate::db::{Row, Rows}; use crate::message_broker::MessageBroker; use std::cmp::min; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use crate::db::ConnectionPool; use super::base::{BaseWorker, Worker, WorkerState}; /// Abbildet eine abgeschlossene Produktion aus der Datenbank. #[derive(Debug, Clone)] struct FinishedProduction { production_id: i32, branch_id: i32, product_id: i32, quantity: i32, quality: i32, user_id: i32, region_id: i32, } /// Abbildet ein Lager (Stock) mit Kapazität. #[derive(Debug, Clone)] struct StockInfo { stock_id: i32, total_capacity: i32, filled: i32, } // SQL-Queries analog zur C++-Implementierung // Wichtig: Pro `production.id` darf hier **genau eine Zeile** zurückkommen. // Durch die Joins auf Director/Knowledge kann es sonst zu Mehrfachzeilen mit // unterschiedlicher berechneter Qualität kommen. Deshalb wird die Qualität // über MAX() aggregiert und nach `production_id` gruppiert. const QUERY_GET_FINISHED_PRODUCTIONS: &str = r#" SELECT p.id AS production_id, p.branch_id, p.product_id, p.quantity, p.start_timestamp, pr.production_time, -- Aggregierte Qualitätsbewertung pro Produktion MAX( CASE WHEN k2.id IS NOT NULL THEN (k.knowledge * 2 + k2.knowledge) / 3 ELSE k.knowledge END ) AS quality, br.region_id, br.falukant_user_id AS user_id FROM falukant_data.production p JOIN falukant_type.product pr ON p.product_id = pr.id JOIN falukant_data.branch br ON p.branch_id = br.id JOIN falukant_data.character c ON c.user_id = br.falukant_user_id JOIN falukant_data.knowledge k ON p.product_id = k.product_id AND k.character_id = c.id JOIN falukant_data.stock s ON s.branch_id = br.id LEFT JOIN falukant_data.director d ON d.employer_user_id = c.user_id LEFT JOIN falukant_data.knowledge k2 ON k2.character_id = d.director_character_id AND k2.product_id = p.product_id WHERE p.start_timestamp + INTERVAL '1 minute' * pr.production_time <= NOW() GROUP BY p.id, p.branch_id, p.product_id, p.quantity, p.start_timestamp, pr.production_time, br.region_id, br.falukant_user_id ORDER BY p.start_timestamp; "#; const QUERY_GET_AVAILABLE_STOCKS: &str = r#" SELECT stock.id, stock.quantity AS total_capacity, ( SELECT COALESCE(SUM(inventory.quantity), 0) FROM falukant_data.inventory WHERE inventory.stock_id = stock.id ) AS filled, stock.branch_id FROM falukant_data.stock stock JOIN falukant_data.branch branch ON stock.branch_id = branch.id WHERE branch.id = $1 ORDER BY total_capacity DESC; "#; const QUERY_DELETE_PRODUCTION: &str = r#" DELETE FROM falukant_data.production WHERE id = $1; "#; const QUERY_INSERT_INVENTORY: &str = r#" INSERT INTO falukant_data.inventory ( stock_id, product_id, quantity, quality, produced_at ) VALUES ($1, $2, $3, $4, NOW()); "#; const QUERY_INSERT_UPDATE_PRODUCTION_LOG: &str = r#" INSERT INTO falukant_log.production ( region_id, product_id, quantity, producer_id, production_date ) VALUES ($1, $2, $3, $4, CURRENT_DATE) ON CONFLICT (producer_id, product_id, region_id, production_date) DO UPDATE SET quantity = falukant_log.production.quantity + EXCLUDED.quantity; "#; const QUERY_ADD_OVERPRODUCTION_NOTIFICATION: &str = r#" INSERT INTO falukant_log.notification ( user_id, tr, shown, created_at, updated_at ) VALUES ($1, $2, FALSE, NOW(), NOW()); "#; pub struct ProduceWorker { base: BaseWorker, last_iteration: Option, } impl ProduceWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("ProduceWorker", pool, broker), last_iteration: None, } } fn run_iteration(&mut self, state: &WorkerState) { self.base .set_current_step("Check runningWorker Variable"); if !state.running_worker.load(Ordering::Relaxed) { return; } let sleep_duration = self.time_until_next_iteration(); self.sleep_with_shutdown_check(sleep_duration, state); if !state.running_worker.load(Ordering::Relaxed) { return; } self.base.set_current_step("Process Productions"); self.process_productions(); self.base.set_current_step("Signal Activity"); // TODO: Später Analogie zu signalActivity() aus der C++-Basisklasse herstellen. self.base.set_current_step("Loop Done"); } fn time_until_next_iteration(&mut self) -> Duration { const MIN_INTERVAL_MS: u64 = 200; let now = Instant::now(); match self.last_iteration { None => { self.last_iteration = Some(now); Duration::from_millis(0) } Some(last) => { let elapsed = now.saturating_duration_since(last); if elapsed >= Duration::from_millis(MIN_INTERVAL_MS) { self.last_iteration = Some(now); Duration::from_millis(0) } else { let remaining = Duration::from_millis(MIN_INTERVAL_MS) - elapsed; self.last_iteration = Some(now); remaining } } } } fn sleep_with_shutdown_check(&self, duration: Duration, state: &WorkerState) { const SLICE_MS: u64 = 10; let total_ms = duration.as_millis() as u64; let mut slept = 0; while slept < total_ms { if !state.running_worker.load(Ordering::Relaxed) { break; } let remaining = total_ms - slept; let slice = min(remaining, SLICE_MS); std::thread::sleep(Duration::from_millis(slice)); slept += slice; } } fn process_productions(&mut self) { self.base .set_current_step("Fetch Finished Productions"); let finished_productions = match self.get_finished_productions() { Ok(rows) => rows, Err(err) => { eprintln!("[ProduceWorker] Fehler in getFinishedProductions: {err}"); Vec::new() } }; self.base .set_current_step("Process Finished Productions"); for production in finished_productions { self.handle_finished_production(&production); } } fn get_finished_productions(&self) -> Result, crate::db::DbError> { let rows = self.load_finished_productions()?; Ok(rows .into_iter() .filter_map(Self::map_row_to_finished_production) .collect()) } fn handle_finished_production(&mut self, production: &FinishedProduction) { let FinishedProduction { branch_id, product_id, quantity, quality, user_id, region_id, production_id, } = *production; if self.add_to_inventory(branch_id, product_id, quantity, quality, user_id) { self.delete_production(production_id); self.add_production_to_log(region_id, user_id, product_id, quantity); } } fn add_to_inventory( &mut self, branch_id: i32, product_id: i32, quantity: i32, quality: i32, user_id: i32, ) -> bool { let mut remaining_quantity = quantity; let stocks = match self.get_available_stocks(branch_id) { Ok(rows) => rows, Err(err) => { eprintln!("[ProduceWorker] Fehler in getAvailableStocks: {err}"); Vec::new() } }; for stock in stocks { if remaining_quantity <= 0 { break; } let free_capacity = stock.total_capacity - stock.filled; if free_capacity <= 0 { continue; } let to_store = min(remaining_quantity, free_capacity); if !self.store_in_stock(stock.stock_id, product_id, to_store, quality) { return false; } remaining_quantity -= to_store; } if remaining_quantity == 0 { self.send_production_ready_event(user_id, product_id, quantity, quality, branch_id); true } else { // Es konnten nicht alle produzierten Einheiten eingelagert werden – // Überproduktion für diesen Branch melden. self.handle_overproduction(user_id, branch_id, remaining_quantity); true } } fn get_available_stocks(&self, branch_id: i32) -> Result, crate::db::DbError> { let rows = self.load_available_stocks(branch_id)?; Ok(rows .into_iter() .filter_map(Self::map_row_to_stock_info) .collect()) } fn store_in_stock( &self, stock_id: i32, product_id: i32, quantity: i32, quality: i32, ) -> bool { if let Err(err) = self.insert_inventory(stock_id, product_id, quantity, quality) { eprintln!("[ProduceWorker] Fehler in storeInStock: {err}"); return false; } true } fn delete_production(&self, production_id: i32) { if let Err(err) = self.remove_production(production_id) { eprintln!("[ProduceWorker] Fehler beim Löschen der Produktion: {err}"); } } fn add_production_to_log( &self, region_id: i32, user_id: i32, product_id: i32, quantity: i32, ) { if let Err(err) = self.insert_or_update_production_log(region_id, user_id, product_id, quantity) { eprintln!("[ProduceWorker] Fehler in addProductionToLog: {err}"); } } fn send_production_ready_event( &self, user_id: i32, product_id: i32, quantity: i32, quality: i32, branch_id: i32, ) { // JSON als String aufbauen, um externe Dependencies zu vermeiden. let message = format!( r#"{{"event":"production_ready","user_id":{user_id},"product_id":{product_id},"quantity":{quantity},"quality":{quality},"branch_id":{branch_id}}}"# ); self.base.broker.publish(message); } fn handle_overproduction(&self, user_id: i32, branch_id: i32, remaining_quantity: i32) { if let Err(err) = self.insert_overproduction_notification(user_id, branch_id, remaining_quantity) { eprintln!( "[ProduceWorker] Fehler beim Schreiben der Overproduction-Notification: {err}" ); } let update_status = format!(r#"{{"event":"falukantUpdateStatus","user_id":{user_id}}}"#); self.base.broker.publish(update_status); } fn load_finished_productions(&self) -> Result { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_finished_productions", QUERY_GET_FINISHED_PRODUCTIONS)?; conn.execute("get_finished_productions", &[]) } fn load_available_stocks(&self, branch_id: i32) -> Result { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_stocks", QUERY_GET_AVAILABLE_STOCKS)?; conn.execute("get_stocks", &[&branch_id]) } fn insert_inventory( &self, stock_id: i32, product_id: i32, quantity: i32, quality: i32, ) -> Result<(), crate::db::DbError> { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("insert_inventory", QUERY_INSERT_INVENTORY)?; conn.execute("insert_inventory", &[&stock_id, &product_id, &quantity, &quality])?; Ok(()) } fn remove_production(&self, production_id: i32) -> Result<(), crate::db::DbError> { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("delete_production", QUERY_DELETE_PRODUCTION)?; conn.execute("delete_production", &[&production_id])?; Ok(()) } fn insert_or_update_production_log( &self, region_id: i32, user_id: i32, product_id: i32, quantity: i32, ) -> Result<(), crate::db::DbError> { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "insert_update_production_log", QUERY_INSERT_UPDATE_PRODUCTION_LOG, )?; conn.execute( "insert_update_production_log", &[®ion_id, &product_id, &quantity, &user_id], )?; Ok(()) } fn insert_overproduction_notification( &self, user_id: i32, branch_id: i32, remaining_quantity: i32, ) -> Result<(), crate::db::DbError> { let mut conn = self .base .pool .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "add_overproduction_notification", QUERY_ADD_OVERPRODUCTION_NOTIFICATION, )?; // Zusätzlich zur Menge die Branch-ID in der Payload mitschicken, damit // das Frontend die Überproduktion einem konkreten Branch zuordnen kann. let notification = format!( r#"{{"tr":"production.overproduction","value":{},"branch_id":{}}}"#, remaining_quantity, branch_id ); conn.execute( "add_overproduction_notification", &[&user_id, ¬ification], )?; Ok(()) } fn map_row_to_finished_production(row: Row) -> Option { Some(FinishedProduction { production_id: row.get("production_id")?.parse().ok()?, branch_id: row.get("branch_id")?.parse().ok()?, product_id: row.get("product_id")?.parse().ok()?, quantity: row.get("quantity")?.parse().ok()?, quality: row.get("quality")?.parse().ok()?, user_id: row.get("user_id")?.parse().ok()?, region_id: row.get("region_id")?.parse().ok()?, }) } fn map_row_to_stock_info(row: Row) -> Option { Some(StockInfo { stock_id: row.get("id")?.parse().ok()?, total_capacity: row.get("total_capacity")?.parse().ok()?, filled: row.get("filled")?.parse().ok()?, }) } } impl Worker for ProduceWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { let mut worker = ProduceWorker::new(pool.clone(), broker.clone()); while state.running_worker.load(Ordering::Relaxed) { worker.run_iteration(&state); } }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } }