use crate::db::{ConnectionPool, DbError, Row}; use crate::message_broker::MessageBroker; use std::collections::HashSet; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; pub struct ValueRecalculationWorker { base: BaseWorker, } // Produktwissen / Produktions-Logs const QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER: &str = r#" UPDATE falukant_data.knowledge k SET knowledge = LEAST(100, k.knowledge + 1) FROM falukant_data.character c JOIN falukant_log.production p ON DATE(p.production_timestamp) = CURRENT_DATE - INTERVAL '1 day' WHERE c.id = k.character_id AND c.user_id = 18 AND k.product_id = 10; "#; const QUERY_DELETE_OLD_PRODUCTIONS: &str = r#" DELETE FROM falukant_log.production flp WHERE DATE(flp.production_timestamp) < CURRENT_DATE; "#; const QUERY_GET_PRODUCERS_LAST_DAY: &str = r#" SELECT p.producer_id FROM falukant_log.production p WHERE DATE(p.production_timestamp) = CURRENT_DATE - INTERVAL '1 day' GROUP BY producer_id; "#; // Regionale Verkaufspreise const QUERY_UPDATE_REGION_SELL_PRICE: &str = r#" UPDATE falukant_data.town_product_worth tpw SET worth_percent = GREATEST( 0, LEAST( CASE WHEN s.quantity > avg_sells THEN tpw.worth_percent - 1 WHEN s.quantity < avg_sells THEN tpw.worth_percent + 1 ELSE tpw.worth_percent END, 100 ) ) FROM ( SELECT region_id, product_id, quantity, (SELECT AVG(quantity) FROM falukant_log.sell avs WHERE avs.product_id = s.product_id) AS avg_sells FROM falukant_log.sell s WHERE DATE(s.sell_timestamp) = CURRENT_DATE - INTERVAL '1 day' ) s WHERE tpw.region_id = s.region_id AND tpw.product_id = s.product_id; "#; const QUERY_DELETE_REGION_SELL_PRICE: &str = r#" DELETE FROM falukant_log.sell s WHERE DATE(s.sell_timestamp) < CURRENT_DATE; "#; const QUERY_GET_SELL_REGIONS: &str = r#" SELECT s.region_id FROM falukant_log.sell s WHERE DATE(s.sell_timestamp) = CURRENT_DATE - INTERVAL '1 day' GROUP BY region_id; "#; // Stündliche Preisneuberechnung basierend auf Verkäufen der letzten Stunde // Zwei Ebenen der Preisberechnung: // 1. Weltweit: Vergleich Stadt-Verkäufe vs. weltweiter Durchschnitt // - ±5% Toleranz: Preis bleibt gleich // - Mehr Verkäufe (>5% über Durchschnitt): Preis +10% // - Weniger Verkäufe (<5% unter Durchschnitt): Preis -10% // 2. Parent-Region: Vergleich Stadt-Verkäufe vs. Durchschnitt der parent-region // - ±5% Toleranz: Preis bleibt gleich // - Abweichung >±5%: Preis ±5% const QUERY_HOURLY_PRICE_RECALCULATION: &str = r#" WITH city_sales AS ( SELECT s.region_id, s.product_id, SUM(s.quantity) AS total_sold FROM falukant_log.sell s WHERE s.sell_timestamp >= NOW() - INTERVAL '1 hour' GROUP BY s.region_id, s.product_id ), world_avg_sales AS ( SELECT product_id, AVG(total_sold) AS avg_sold FROM city_sales GROUP BY product_id ), parent_region_sales AS ( SELECT r.parent_region_id, cs.product_id, AVG(cs.total_sold) AS avg_sold FROM city_sales cs JOIN falukant_data.region r ON r.id = cs.region_id WHERE r.parent_region_id IS NOT NULL GROUP BY r.parent_region_id, cs.product_id ), price_updates_world AS ( SELECT cs.region_id, cs.product_id, cs.total_sold, COALESCE(wa.avg_sold, 0) AS world_avg, tpw.worth_percent AS current_price, CASE -- Mehr als 5% über dem weltweiten Durchschnitt: 10% teurer WHEN cs.total_sold > COALESCE(wa.avg_sold, 0) * 1.05 THEN tpw.worth_percent * 1.1 -- Weniger als 5% unter dem weltweiten Durchschnitt: 10% billiger WHEN cs.total_sold < COALESCE(wa.avg_sold, 0) * 0.95 THEN tpw.worth_percent * 0.9 -- Innerhalb ±5%: Preis bleibt gleich ELSE tpw.worth_percent END AS price_after_world FROM city_sales cs JOIN world_avg_sales wa ON wa.product_id = cs.product_id JOIN falukant_data.town_product_worth tpw ON tpw.region_id = cs.region_id AND tpw.product_id = cs.product_id -- Nur updaten wenn es eine Änderung gibt (außerhalb der ±5% Toleranz) WHERE cs.total_sold > COALESCE(wa.avg_sold, 0) * 1.05 OR cs.total_sold < COALESCE(wa.avg_sold, 0) * 0.95 ), all_cities_with_prices AS ( SELECT cs.region_id, cs.product_id, cs.total_sold, r.parent_region_id, tpw.worth_percent AS original_price, COALESCE(puw.price_after_world, tpw.worth_percent) AS price_after_world FROM city_sales cs JOIN falukant_data.region r ON r.id = cs.region_id JOIN falukant_data.town_product_worth tpw ON tpw.region_id = cs.region_id AND tpw.product_id = cs.product_id LEFT JOIN price_updates_world puw ON puw.region_id = cs.region_id AND puw.product_id = cs.product_id ), price_updates_parent AS ( SELECT acwp.region_id, acwp.product_id, acwp.total_sold, acwp.parent_region_id, COALESCE(prs.avg_sold, 0) AS parent_avg, acwp.price_after_world AS current_price, CASE -- Mehr als 5% über dem parent-region Durchschnitt: 5% teurer WHEN acwp.total_sold > COALESCE(prs.avg_sold, 0) * 1.05 THEN acwp.price_after_world * 1.05 -- Weniger als 5% unter dem parent-region Durchschnitt: 5% billiger WHEN acwp.total_sold < COALESCE(prs.avg_sold, 0) * 0.95 THEN acwp.price_after_world * 0.95 -- Innerhalb ±5%: Preis bleibt gleich (vom world-update) ELSE acwp.price_after_world END AS new_price FROM all_cities_with_prices acwp LEFT JOIN parent_region_sales prs ON prs.parent_region_id = acwp.parent_region_id AND prs.product_id = acwp.product_id WHERE acwp.parent_region_id IS NOT NULL AND ( acwp.total_sold > COALESCE(prs.avg_sold, 0) * 1.05 OR acwp.total_sold < COALESCE(prs.avg_sold, 0) * 0.95 ) ), final_price_updates AS ( SELECT COALESCE(pup.region_id, puw.region_id) AS region_id, COALESCE(pup.product_id, puw.product_id) AS product_id, COALESCE(pup.new_price, puw.price_after_world, acwp.original_price) AS final_price FROM all_cities_with_prices acwp LEFT JOIN price_updates_world puw ON puw.region_id = acwp.region_id AND puw.product_id = acwp.product_id LEFT JOIN price_updates_parent pup ON pup.region_id = acwp.region_id AND pup.product_id = acwp.product_id WHERE puw.region_id IS NOT NULL OR pup.region_id IS NOT NULL ) UPDATE falukant_data.town_product_worth tpw SET worth_percent = GREATEST( 0, LEAST( 100, fpu.final_price ) ) FROM final_price_updates fpu WHERE tpw.region_id = fpu.region_id AND tpw.product_id = fpu.product_id; "#; // Ehen / Beziehungen const QUERY_SET_MARRIAGES_BY_PARTY: &str = r#" WITH updated_relations AS ( UPDATE falukant_data.relationship AS rel SET relationship_type_id = ( SELECT id FROM falukant_type.relationship AS rt WHERE rt.tr = 'married' ) WHERE rel.id IN ( SELECT rel2.id FROM falukant_data.party AS p JOIN falukant_type.party AS pt ON pt.id = p.party_type_id AND pt.tr = 'wedding' JOIN falukant_data.falukant_user AS fu ON fu.id = p.falukant_user_id JOIN falukant_data.character AS c ON c.user_id = fu.id JOIN falukant_data.relationship AS rel2 ON rel2.character1_id = c.id OR rel2.character2_id = c.id JOIN falukant_type.relationship AS rt2 ON rt2.id = rel2.relationship_type_id AND rt2.tr = 'engaged' WHERE p.created_at <= NOW() - INTERVAL '1 day' ) RETURNING character1_id, character2_id ) SELECT c1.user_id AS character1_user, c2.user_id AS character2_user FROM updated_relations AS ur JOIN falukant_data.character AS c1 ON c1.id = ur.character1_id JOIN falukant_data.character AS c2 ON c2.id = ur.character2_id; "#; // Lernen / Studium const QUERY_GET_STUDYINGS_TO_EXECUTE: &str = r#" SELECT l.id, l.associated_falukant_user_id, l.associated_learning_character_id, l.learn_all_products, l.learning_recipient_id, l.product_id, lr.tr FROM falukant_data.learning l JOIN falukant_type.learn_recipient lr ON lr.id = l.learning_recipient_id WHERE l.learning_is_executed = FALSE AND l.created_at + INTERVAL '1 day' < NOW(); "#; const QUERY_GET_OWN_CHARACTER_ID: &str = r#" SELECT id FROM falukant_data.character c WHERE c.user_id = $1; "#; const QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE: &str = r#" UPDATE falukant_data.knowledge k SET knowledge = LEAST(100, k.knowledge + $1) WHERE k.character_id = $2 AND k.product_id = $3; "#; const QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE: &str = r#" UPDATE falukant_data.knowledge k SET knowledge = LEAST(100, k.knowledge + $1) WHERE k.character_id = $2; "#; const QUERY_SET_LEARNING_DONE: &str = r#" UPDATE falukant_data.learning SET learning_is_executed = TRUE WHERE id = $1; "#; impl ValueRecalculationWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("ValueRecalculationWorker", pool, broker), } } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { // Wir nutzen hier einfach Intervall-Logik (täglich / halbtäglich), // statt exakte Uhrzeiten nachzubilden – Verhalten ist funktional ähnlich. let mut last_product = None; let mut last_sell_price = None; let mut last_hourly_price_recalc = None; loop { if !state.running_worker.load(Ordering::Relaxed) { break; } let now = Instant::now(); // Produktwissen einmal täglich if should_run_interval(last_product, now, Duration::from_secs(24 * 3600)) { if let Err(err) = Self::calculate_product_knowledge_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateProductKnowledge: {err}"); } last_product = Some(now); } // Regionale Verkaufspreise einmal täglich (gegen Mittag) if should_run_interval(last_sell_price, now, Duration::from_secs(24 * 3600)) { if let Err(err) = Self::calculate_regional_sell_price_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateRegionalSellPrice: {err}"); } last_sell_price = Some(now); } // Stündliche Preisneuberechnung basierend auf Verkäufen der letzten Stunde if should_run_interval(last_hourly_price_recalc, now, Duration::from_secs(3600)) { if let Err(err) = Self::calculate_hourly_price_recalculation_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateHourlyPriceRecalculation: {err}"); } last_hourly_price_recalc = Some(now); } // Ehen & Studium bei jedem Durchlauf if let Err(err) = Self::calculate_marriages_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateMarriages: {err}"); } if let Err(err) = Self::calculate_studying_inner(&pool, &broker) { eprintln!("[ValueRecalculationWorker] Fehler in calculateStudying: {err}"); } // 60-Sekunden-Wartezeit in kurze Scheiben aufteilen, damit ein Shutdown // (running_worker = false) schnell greift. const SLICE_MS: u64 = 500; let total_ms = 60_000; let mut slept = 0; while slept < total_ms { if !state.running_worker.load(Ordering::Relaxed) { break; } let remaining = total_ms - slept; let slice = SLICE_MS.min(remaining); std::thread::sleep(Duration::from_millis(slice)); slept += slice; } } } fn calculate_product_knowledge_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "update_product_knowledge_user", QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER, )?; conn.execute("update_product_knowledge_user", &[])?; conn.prepare("get_producers_last_day", QUERY_GET_PRODUCERS_LAST_DAY)?; let users = conn.execute("get_producers_last_day", &[])?; for row in users { if let Some(user_id) = row.get("producer_id").and_then(|v| v.parse::().ok()) { let message = format!(r#"{{"event":"price_update","user_id":{}}}"#, user_id); broker.publish(message); } } conn.prepare("delete_old_productions", QUERY_DELETE_OLD_PRODUCTIONS)?; conn.execute("delete_old_productions", &[])?; Ok(()) } fn calculate_regional_sell_price_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("update_region_sell_price", QUERY_UPDATE_REGION_SELL_PRICE)?; conn.execute("update_region_sell_price", &[])?; conn.prepare("get_sell_regions", QUERY_GET_SELL_REGIONS)?; let regions = conn.execute("get_sell_regions", &[])?; for row in regions { if let Some(region_id) = row.get("region_id").and_then(|v| v.parse::().ok()) { let message = format!(r#"{{"event":"price_update","region_id":{}}}"#, region_id); broker.publish(message); } } conn.prepare("delete_region_sell_price", QUERY_DELETE_REGION_SELL_PRICE)?; conn.execute("delete_region_sell_price", &[])?; Ok(()) } fn calculate_hourly_price_recalculation_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("hourly_price_recalculation", QUERY_HOURLY_PRICE_RECALCULATION)?; let _updated_rows = conn.execute("hourly_price_recalculation", &[])?; // Sammle alle betroffenen Regionen für Event-Benachrichtigungen let mut affected_regions = HashSet::new(); // Da die Query bereits die Updates durchführt, müssen wir die betroffenen Regionen // separat abfragen. Alternativ können wir auch einfach alle Regionen benachrichtigen, // die in der letzten Stunde Verkäufe hatten. conn.prepare("get_sell_regions_hourly", r#" SELECT DISTINCT region_id FROM falukant_log.sell WHERE sell_timestamp >= NOW() - INTERVAL '1 hour' "#)?; let regions = conn.execute("get_sell_regions_hourly", &[])?; for row in regions { if let Some(region_id) = row.get("region_id").and_then(|v| v.parse::().ok()) { affected_regions.insert(region_id); } } // Speichere die Anzahl vor der Schleife, da affected_regions in der Schleife bewegt wird let affected_count = affected_regions.len(); // Benachrichtige alle betroffenen Regionen über Preisänderungen for region_id in affected_regions { let message = format!(r#"{{"event":"price_update","region_id":{}}}"#, region_id); broker.publish(message); } eprintln!( "[ValueRecalculationWorker] Stündliche Preisneuberechnung abgeschlossen. {} Regionen aktualisiert.", affected_count ); Ok(()) } fn calculate_marriages_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("set_marriages_by_party", QUERY_SET_MARRIAGES_BY_PARTY)?; let rows = conn.execute("set_marriages_by_party", &[])?; for row in rows { if let Some(uid) = row.get("character1_user").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"relationship_changed","user_id":{}}}"#, uid); broker.publish(msg); } if let Some(uid) = row.get("character2_user").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"relationship_changed","user_id":{}}}"#, uid); broker.publish(msg); } } Ok(()) } fn calculate_studying_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "get_studyings_to_execute", QUERY_GET_STUDYINGS_TO_EXECUTE, )?; conn.prepare("set_learning_done", QUERY_SET_LEARNING_DONE)?; let studies = conn.execute("get_studyings_to_execute", &[])?; for study in studies { let tr = study.get("tr").cloned().unwrap_or_default(); match tr.as_str() { "self" => Self::calculate_studying_self(pool, broker, &study)?, "children" | "director" => { Self::calculate_studying_for_associated_character( pool, broker, &study, )? } _ => {} } if let Some(id) = study.get("id").and_then(|v| v.parse::().ok()) { conn.execute("set_learning_done", &[&id])?; } } Ok(()) } fn calculate_studying_self( pool: &ConnectionPool, broker: &MessageBroker, entry: &Row, ) -> Result<(), DbError> { let falukant_user_id = match entry .get("associated_falukant_user_id") .and_then(|v| v.parse::().ok()) { Some(id) => id, None => return Ok(()), }; let (learn_all, product_id) = study_scope(entry); let character_id = Self::get_own_character_id(pool, falukant_user_id)?; if let Some(cid) = character_id { Self::calculate_studying_character( pool, broker, cid, learn_all, product_id, parse_i32(entry, "learning_recipient_id", -1), )?; } Ok(()) } fn calculate_studying_for_associated_character( pool: &ConnectionPool, broker: &MessageBroker, entry: &Row, ) -> Result<(), DbError> { let character_id = parse_i32(entry, "associated_learning_character_id", -1); if character_id < 0 { return Ok(()); } let (learn_all, product_id) = study_scope(entry); let recipient_id = parse_i32(entry, "learning_recipient_id", -1); Self::calculate_studying_character( pool, broker, character_id, learn_all, product_id, recipient_id, ) } fn get_own_character_id( pool: &ConnectionPool, falukant_user_id: i32, ) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_own_character_id", QUERY_GET_OWN_CHARACTER_ID)?; let rows = conn.execute("get_own_character_id", &[&falukant_user_id])?; Ok(rows .get(0) .and_then(|r| r.get("id")) .and_then(|v| v.parse::().ok())) } fn calculate_studying_character( pool: &ConnectionPool, broker: &MessageBroker, character_id: i32, learn_all: bool, product_id: Option, falukant_user_id: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; if learn_all { conn.prepare( "increase_all_products_knowledge", QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE, )?; conn.execute( "increase_all_products_knowledge", &[&1_i32, &character_id], )?; } else if let Some(pid) = product_id { conn.prepare( "increase_one_product_knowledge", QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE, )?; conn.execute( "increase_one_product_knowledge", &[&5_i32, &character_id, &pid], )?; } let message = format!(r#"{{"event":"knowledge_updated","user_id":{}}}"#, falukant_user_id); broker.publish(message); Ok(()) } } impl Worker for ValueRecalculationWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { ValueRecalculationWorker::run_loop(pool.clone(), broker.clone(), state); }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } } fn should_run_interval( last_run: Option, now: Instant, interval: Duration, ) -> bool { match last_run { None => true, Some(prev) => now.saturating_duration_since(prev) >= interval, } } fn parse_i32(row: &Row, key: &str, default: i32) -> i32 { row.get(key) .and_then(|v| v.parse::().ok()) .unwrap_or(default) } fn study_scope(entry: &Row) -> (bool, Option) { let learn_all_flag = entry.get("learn_all_products").map(|v| v == "t").unwrap_or(false); let product_id_str = entry.get("product_id").cloned().unwrap_or_default(); if learn_all_flag || product_id_str.is_empty() { (true, None) } else { let pid = product_id_str.parse::().ok(); match pid { Some(id) => (false, Some(id)), None => (true, None), } } }