use crate::db::{ConnectionPool, DbError, Row}; use crate::message_broker::MessageBroker; use std::collections::HashSet; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; use crate::worker::sql::{ QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER, QUERY_DELETE_OLD_PRODUCTIONS, QUERY_GET_PRODUCERS_LAST_DAY, QUERY_UPDATE_REGION_SELL_PRICE, QUERY_DELETE_REGION_SELL_PRICE, QUERY_GET_SELL_REGIONS, QUERY_HOURLY_PRICE_RECALCULATION, QUERY_SET_MARRIAGES_BY_PARTY, QUERY_GET_STUDYINGS_TO_EXECUTE, QUERY_GET_OWN_CHARACTER_ID, QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE, QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE, QUERY_SET_LEARNING_DONE, }; pub struct ValueRecalculationWorker { base: BaseWorker, } impl ValueRecalculationWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("ValueRecalculationWorker", pool, broker), } } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { // Wir nutzen hier einfach Intervall-Logik (täglich / halbtäglich), // statt exakte Uhrzeiten nachzubilden – Verhalten ist funktional ähnlich. let mut last_product = None; let mut last_sell_price = None; let mut last_hourly_price_recalc = None; loop { if !state.running_worker.load(Ordering::Relaxed) { break; } let now = Instant::now(); // Produktwissen einmal täglich if should_run_interval(last_product, now, Duration::from_secs(24 * 3600)) { if let Err(err) = Self::calculate_product_knowledge_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateProductKnowledge: {err}"); } last_product = Some(now); } // Regionale Verkaufspreise einmal täglich (gegen Mittag) if should_run_interval(last_sell_price, now, Duration::from_secs(24 * 3600)) { if let Err(err) = Self::calculate_regional_sell_price_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateRegionalSellPrice: {err}"); } last_sell_price = Some(now); } // Stündliche Preisneuberechnung basierend auf Verkäufen der letzten Stunde if should_run_interval(last_hourly_price_recalc, now, Duration::from_secs(3600)) { if let Err(err) = Self::calculate_hourly_price_recalculation_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateHourlyPriceRecalculation: {err}"); } last_hourly_price_recalc = Some(now); } // Ehen & Studium bei jedem Durchlauf if let Err(err) = Self::calculate_marriages_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateMarriages: {err}"); } if let Err(err) = Self::calculate_studying_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateStudying: {err}"); } // 60-Sekunden-Wartezeit in kurze Scheiben aufteilen, damit ein Shutdown // (running_worker = false) schnell greift. const SLICE_MS: u64 = 500; let total_ms = 60_000; let mut slept = 0; while slept < total_ms { if !state.running_worker.load(Ordering::Relaxed) { break; } let remaining = total_ms - slept; let slice = SLICE_MS.min(remaining); std::thread::sleep(Duration::from_millis(slice)); slept += slice; } } } fn calculate_product_knowledge_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "update_product_knowledge_user", QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER, )?; conn.execute("update_product_knowledge_user", &[])?; conn.prepare("get_producers_last_day", QUERY_GET_PRODUCERS_LAST_DAY)?; let users = conn.execute("get_producers_last_day", &[])?; for row in users { if let Some(user_id) = row.get("producer_id").and_then(|v| v.parse::().ok()) { let message = format!(r#"{{"event":"price_update","user_id":{}}}"#, user_id); broker.publish(message); } } conn.prepare("delete_old_productions", QUERY_DELETE_OLD_PRODUCTIONS)?; conn.execute("delete_old_productions", &[])?; Ok(()) } fn calculate_regional_sell_price_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("update_region_sell_price", QUERY_UPDATE_REGION_SELL_PRICE)?; conn.execute("update_region_sell_price", &[])?; conn.prepare("get_sell_regions", QUERY_GET_SELL_REGIONS)?; let regions = conn.execute("get_sell_regions", &[])?; for row in regions { if let Some(region_id) = row.get("region_id").and_then(|v| v.parse::().ok()) { let message = format!(r#"{{"event":"price_update","region_id":{}}}"#, region_id); broker.publish(message); } } conn.prepare("delete_region_sell_price", QUERY_DELETE_REGION_SELL_PRICE)?; conn.execute("delete_region_sell_price", &[])?; Ok(()) } fn calculate_hourly_price_recalculation_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("hourly_price_recalculation", QUERY_HOURLY_PRICE_RECALCULATION)?; let _updated_rows = conn.execute("hourly_price_recalculation", &[])?; // Sammle alle betroffenen Regionen für Event-Benachrichtigungen let mut affected_regions = HashSet::new(); // Da die Query bereits die Updates durchführt, müssen wir die betroffenen Regionen // separat abfragen. Alternativ können wir auch einfach alle Regionen benachrichtigen, // die in der letzten Stunde Verkäufe hatten. conn.prepare("get_sell_regions_hourly", r#" SELECT DISTINCT region_id FROM falukant_log.sell WHERE sell_timestamp >= NOW() - INTERVAL '1 hour' "#)?; let regions = conn.execute("get_sell_regions_hourly", &[])?; for row in regions { if let Some(region_id) = row.get("region_id").and_then(|v| v.parse::().ok()) { affected_regions.insert(region_id); } } // Speichere die Anzahl vor der Schleife, da affected_regions in der Schleife bewegt wird let affected_count = affected_regions.len(); // Benachrichtige alle betroffenen Regionen über Preisänderungen for region_id in affected_regions { let message = format!(r#"{{"event":"price_update","region_id":{}}}"#, region_id); broker.publish(message); } eprintln!( "[ValueRecalculationWorker] Stündliche Preisneuberechnung abgeschlossen. {} Regionen aktualisiert.", affected_count ); Ok(()) } fn calculate_marriages_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("set_marriages_by_party", QUERY_SET_MARRIAGES_BY_PARTY)?; let rows = conn.execute("set_marriages_by_party", &[])?; for row in rows { if let Some(uid) = row.get("character1_user").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"relationship_changed","user_id":{}}}"#, uid); broker.publish(msg); } if let Some(uid) = row.get("character2_user").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"relationship_changed","user_id":{}}}"#, uid); broker.publish(msg); } } Ok(()) } fn calculate_studying_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "get_studyings_to_execute", QUERY_GET_STUDYINGS_TO_EXECUTE, )?; conn.prepare("set_learning_done", QUERY_SET_LEARNING_DONE)?; let studies = conn.execute("get_studyings_to_execute", &[])?; for study in studies { let tr = study.get("tr").cloned().unwrap_or_default(); match tr.as_str() { "self" => Self::calculate_studying_self(pool, broker, &study)?, "children" | "director" => { Self::calculate_studying_for_associated_character( pool, broker, &study, )? } _ => {} } if let Some(id) = study.get("id").and_then(|v| v.parse::().ok()) { conn.execute("set_learning_done", &[&id])?; } } Ok(()) } fn calculate_studying_self( pool: &ConnectionPool, broker: &MessageBroker, entry: &Row, ) -> Result<(), DbError> { let falukant_user_id = match entry .get("associated_falukant_user_id") .and_then(|v| v.parse::().ok()) { Some(id) => id, None => return Ok(()), }; let (learn_all, product_id) = study_scope(entry); let character_id = Self::get_own_character_id(pool, falukant_user_id)?; if let Some(cid) = character_id { Self::calculate_studying_character( pool, broker, cid, learn_all, product_id, parse_i32(entry, "learning_recipient_id", -1), )?; } Ok(()) } fn calculate_studying_for_associated_character( pool: &ConnectionPool, broker: &MessageBroker, entry: &Row, ) -> Result<(), DbError> { let character_id = parse_i32(entry, "associated_learning_character_id", -1); if character_id < 0 { return Ok(()); } let (learn_all, product_id) = study_scope(entry); let recipient_id = parse_i32(entry, "learning_recipient_id", -1); Self::calculate_studying_character( pool, broker, character_id, learn_all, product_id, recipient_id, ) } fn get_own_character_id( pool: &ConnectionPool, falukant_user_id: i32, ) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_own_character_id", QUERY_GET_OWN_CHARACTER_ID)?; let rows = conn.execute("get_own_character_id", &[&falukant_user_id])?; Ok(rows .first() .and_then(|r| r.get("id")) .and_then(|v| v.parse::().ok())) } fn calculate_studying_character( pool: &ConnectionPool, broker: &MessageBroker, character_id: i32, learn_all: bool, product_id: Option, falukant_user_id: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; if learn_all { conn.prepare( "increase_all_products_knowledge", QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE, )?; conn.execute( "increase_all_products_knowledge", &[&1_i32, &character_id], )?; } else if let Some(pid) = product_id { conn.prepare( "increase_one_product_knowledge", QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE, )?; conn.execute( "increase_one_product_knowledge", &[&5_i32, &character_id, &pid], )?; } let message = format!(r#"{{"event":"knowledge_updated","user_id":{}}}"#, falukant_user_id); broker.publish(message); Ok(()) } } impl Worker for ValueRecalculationWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { ValueRecalculationWorker::run_loop(pool.clone(), broker.clone(), state); }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } } fn should_run_interval( last_run: Option, now: Instant, interval: Duration, ) -> bool { match last_run { None => true, Some(prev) => now.saturating_duration_since(prev) >= interval, } } fn parse_i32(row: &Row, key: &str, default: i32) -> i32 { row.get(key) .and_then(|v| v.parse::().ok()) .unwrap_or(default) } fn study_scope(entry: &Row) -> (bool, Option) { let learn_all_flag = entry.get("learn_all_products").map(|v| v == "t").unwrap_or(false); let product_id_str = entry.get("product_id").cloned().unwrap_or_default(); if learn_all_flag || product_id_str.is_empty() { (true, None) } else { let pid = product_id_str.parse::().ok(); match pid { Some(id) => (false, Some(id)), None => (true, None), } } }