use crate::db::{ConnectionPool, DbError}; use crate::message_broker::MessageBroker; use rand::distributions::{Distribution, Uniform}; use rand::rngs::StdRng; use rand::SeedableRng; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; use crate::worker::sql::{ QUERY_GET_TOWNS, QUERY_INSERT_STOCK, QUERY_CLEANUP_STOCK, QUERY_GET_REGION_USERS, }; pub struct StockageManager { base: BaseWorker, } impl StockageManager { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("StockageManager", pool, broker), } } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { let mut last_add_run: Option = None; let mut last_cleanup_run: Option = None; while state.running_worker.load(Ordering::Relaxed) { let now = Instant::now(); // Entspricht addLocalStocks: alle 60 Sekunden prüfen & ggf. Stocks hinzufügen let should_add = match last_add_run { None => true, Some(last) => now.saturating_duration_since(last) >= Duration::from_secs(60), }; if should_add { if let Err(err) = Self::add_local_stocks(&pool, &broker) { eprintln!("[StockageManager] Fehler in addLocalStocks: {err}"); } last_add_run = Some(now); } // Cleanup regelmäßig ausführen (z.B. ebenfalls im 60s-Rhythmus) let should_cleanup = match last_cleanup_run { None => true, Some(last) => now.saturating_duration_since(last) >= Duration::from_secs(60), }; if should_cleanup { if let Err(err) = Self::cleanup_buyable_stock(&pool) { eprintln!("[StockageManager] Fehler bei stock cleanup: {err}"); } last_cleanup_run = Some(now); } std::thread::sleep(Duration::from_secs(1)); } } fn add_local_stocks(pool: &ConnectionPool, broker: &MessageBroker) -> Result<(), DbError> { let mut rng = StdRng::from_entropy(); let dist = Uniform::from(0.0..1.0); let town_ids = Self::get_town_ids(pool)?; for town_id in town_ids { // Wahrscheinlichkeit analog: round(dist * 2160) <= 1 let roll: f64 = dist.sample(&mut rng) * 2160.0_f64; let chance = roll.round(); if chance <= 1.0 { Self::add_stock_for_town(pool, broker, town_id)?; } } Ok(()) } fn get_town_ids(pool: &ConnectionPool) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("[StockageManager] DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_towns", QUERY_GET_TOWNS)?; let towns = conn.execute("get_towns", &[])?; let mut ids = Vec::with_capacity(towns.len()); for row in towns { if let Some(id) = row.get("id").and_then(|v| v.parse::().ok()) { ids.push(id); } } Ok(ids) } fn add_stock_for_town( pool: &ConnectionPool, broker: &MessageBroker, town_id: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("[StockageManager] DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("add_stock", QUERY_INSERT_STOCK)?; conn.execute("add_stock", &[&town_id])?; // Benachrichtige alle User in der Region let users = Self::get_region_users(&mut conn, town_id)?; for user_id in users { let message = format!( r#"{{"event":"stock_change","user_id":{},"branch":{}}}"#, user_id, town_id ); broker.publish(message); } Ok(()) } fn get_region_users(conn: &mut crate::db::DbConnection, region_id: i32) -> Result, DbError> { conn.prepare("get_region_users", QUERY_GET_REGION_USERS)?; let rows = conn.execute("get_region_users", &[®ion_id])?; let mut result = Vec::with_capacity(rows.len()); for row in rows { if let Some(uid) = row.get("user_id").and_then(|v| v.parse::().ok()) { result.push(uid); } } Ok(result) } fn cleanup_buyable_stock(pool: &ConnectionPool) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("[StockageManager] DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("cleanup_stock", QUERY_CLEANUP_STOCK)?; conn.execute("cleanup_stock", &[])?; Ok(()) } } impl Worker for StockageManager { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { StockageManager::run_loop(pool.clone(), broker.clone(), state); }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } }