From c9e0781b617f0dd5977c133159701e1ab1424e21 Mon Sep 17 00:00:00 2001 From: "Torsten Schulz (local)" Date: Wed, 28 Jan 2026 14:21:28 +0100 Subject: [PATCH] Update dependencies and enhance WebSocket server logging: Add 'chrono' and 'android_system_properties' to Cargo.lock, improve error handling and logging in websocket_server.rs, and streamline character creation notifications in worker modules for better clarity and maintainability. --- Cargo.lock | 115 +++++ Cargo.toml | 1 + src/websocket_server.rs | 122 ++--- src/worker/character_creation.rs | 60 +-- src/worker/director.rs | 237 ++------- src/worker/events.rs | 833 ++++++++----------------------- src/worker/house.rs | 63 +-- src/worker/mod.rs | 2 - src/worker/politics.rs | 473 ++++++++++++++++++ src/worker/produce.rs | 233 +-------- src/worker/sql.rs | 615 ++++++++++------------- src/worker/transport.rs | 137 +---- src/worker/user_character.rs | 89 +--- src/worker/weather.rs | 8 +- 14 files changed, 1174 insertions(+), 1814 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4073758..d8a31d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6,6 +6,7 @@ version = 4 name = "YpDaemon" version = "0.1.0" dependencies = [ + "chrono", "ctrlc", "futures-util", "libsystemd", @@ -19,6 +20,15 @@ dependencies = [ "tokio-tungstenite", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -106,6 +116,25 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chrono" +version = "0.4.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fac4744fb15ae8337dc853fee7fb3f4e48c0fbaa23d0afe49c447b4fab126118" +dependencies = [ + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "cpufeatures" version = "0.2.17" @@ -296,6 +325,30 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "iana-time-zone" +version = "0.1.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "itoa" version = "1.0.15" @@ -432,6 +485,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "objc2" version = "0.6.3" @@ -1169,12 +1231,65 @@ dependencies = [ "web-sys", ] +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 7e990a9..5097bfa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,4 @@ ctrlc = "3" tokio-rustls = "0.25" rustls-pemfile = "2" libsystemd = "0.7" +chrono = "0.4" diff --git a/src/websocket_server.rs b/src/websocket_server.rs index 8b7f0f4..6435ee8 100644 --- a/src/websocket_server.rs +++ b/src/websocket_server.rs @@ -20,10 +20,8 @@ use tokio_rustls::rustls::{self, ServerConfig}; use tokio_rustls::TlsAcceptor; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::accept_async; -use rustls_pemfile::{certs, ec_private_keys, pkcs8_private_keys, rsa_private_keys}; -use rustls::pki_types::{ - CertificateDer, PrivateKeyDer, PrivatePkcs1KeyDer, PrivatePkcs8KeyDer, PrivateSec1KeyDer, -}; +use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs1KeyDer, PrivatePkcs8KeyDer}; /// Einfacher WebSocket-Server auf Basis von Tokio + tokio-tungstenite. /// @@ -134,7 +132,7 @@ fn create_tls_acceptor( let key_file = File::open(key_path)?; let mut key_reader = BufReader::new(key_file); - // Versuche zuerst PKCS8, dann ggf. RSA-Key, dann SEC1 (EC PRIVATE KEY). + // Versuche zuerst PKCS8, dann ggf. RSA-Key let mut keys: Vec> = pkcs8_private_keys(&mut key_reader) .map(|res: Result, _>| res.map(PrivateKeyDer::Pkcs8)) .collect::>()?; @@ -149,16 +147,7 @@ fn create_tls_acceptor( } if keys.is_empty() { - // Leser zurücksetzen und SEC1 (EC PRIVATE KEY) versuchen - let key_file = File::open(key_path)?; - let mut key_reader = BufReader::new(key_file); - keys = ec_private_keys(&mut key_reader) - .map(|res: Result, _>| res.map(PrivateKeyDer::Sec1)) - .collect::>()?; - } - - if keys.is_empty() { - return Err("Key-Datei enthält keinen privaten Schlüssel (PKCS8, RSA oder SEC1)".into()); + return Err("Key-Datei enthält keinen privaten Schlüssel (PKCS8 oder RSA)".into()); } let private_key = keys.remove(0); @@ -210,6 +199,7 @@ impl WebSocketServer { "Starte WebSocket-Server auf Port {} mit SSL (cert: {:?}, key: {:?})", self.port, self.cert_path, self.key_path ); + // Hinweis: SSL-Unterstützung ist noch nicht implementiert. } else { println!("Starte WebSocket-Server auf Port {} (ohne SSL)", self.port); } @@ -238,22 +228,15 @@ impl WebSocketServer { self.cert_path.as_deref(), self.key_path.as_deref(), ) { - Ok(acc) => { - println!("[WebSocketServer] TLS erfolgreich initialisiert. Server akzeptiert wss:// Verbindungen."); - Some(acc) - } + Ok(acc) => Some(acc), Err(err) => { eprintln!( "[WebSocketServer] TLS-Initialisierung fehlgeschlagen, starte ohne SSL: {err}" ); - eprintln!( - "[WebSocketServer] ACHTUNG: WEBSOCKET_SSL_ENABLED=true, aber TLS ist nicht aktiv. Clients müssen dann ws:// statt wss:// verwenden." - ); None } } } else { - println!("[WebSocketServer] TLS deaktiviert. Server akzeptiert nur ws:// Verbindungen (nicht wss://)."); None }; @@ -304,76 +287,48 @@ async fn run_accept_loop( tls_acceptor: Option, ) { let listener = match TcpListener::bind(&addr).await { - Ok(l) => { - println!("[WebSocketServer] Erfolgreich gebunden an {}", addr); - l - } + Ok(l) => l, Err(e) => { - eprintln!("[WebSocketServer] FEHLER: Konnte nicht an {} binden: {}", addr, e); - eprintln!("[WebSocketServer] Mögliche Ursachen: Port bereits belegt, keine Berechtigung, oder Firewall blockiert."); + eprintln!("[WebSocketServer] Fehler beim Binden an {}: {}", addr, e); running.store(false, Ordering::SeqCst); return; } }; - println!("[WebSocketServer] Lauscht auf {} (TLS: {})", addr, tls_acceptor.is_some()); - - // Heartbeat: Logge regelmäßig, dass der Server läuft - let mut heartbeat_interval = interval(TokioDuration::from_secs(300)); // Alle 5 Minuten - let server_start = std::time::Instant::now(); + println!("[WebSocketServer] Lauscht auf {}", addr); while running.load(Ordering::SeqCst) { - tokio::select! { - // Heartbeat: Logge alle 5 Minuten, dass der Server läuft - _ = heartbeat_interval.tick() => { - let uptime = server_start.elapsed().as_secs(); - println!("[WebSocketServer] Heartbeat: Server läuft seit {} Sekunden auf {} (TLS: {})", - uptime, addr, tls_acceptor.is_some()); + let (stream, peer) = match listener.accept().await { + Ok(v) => v, + Err(e) => { + eprintln!("[WebSocketServer] accept() fehlgeschlagen: {}", e); + continue; } - // Neue Verbindung akzeptieren - result = listener.accept() => { - match result { - Ok((stream, peer)) => { - let peer_addr = peer; - println!("[WebSocketServer] Client-Verbindungsversuch von {} (TCP-Verbindung etabliert)", peer_addr); - - let rx = tx.subscribe(); - let registry_clone = registry.clone(); - let ws_log_clone = ws_log.clone(); - let tls_acceptor_clone = tls_acceptor.clone(); + }; - tokio::spawn(async move { - if let Some(acc) = tls_acceptor_clone { - println!("[WebSocketServer] Starte TLS-Handshake für {}...", peer_addr); - match acc.accept(stream).await { - Ok(tls_stream) => { - println!("[WebSocketServer] TLS-Handshake erfolgreich für {}", peer_addr); - handle_connection(tls_stream, peer_addr, rx, registry_clone, ws_log_clone) - .await - } - Err(err) => { - eprintln!( - "[WebSocketServer] TLS-Handshake fehlgeschlagen ({peer_addr}): {err}" - ); - eprintln!( - "[WebSocketServer] Mögliche Ursachen: Ungültiges Zertifikat, unsupported Cipher, oder Client verwendet ws:// statt wss://" - ); - } - } - } else { - println!("[WebSocketServer] Verarbeite Verbindung von {} ohne TLS (Client sollte ws:// verwenden, nicht wss://)", peer_addr); - handle_connection(stream, peer_addr, rx, registry_clone, ws_log_clone).await; - } - }); + let peer_addr = peer; + let rx = tx.subscribe(); + let registry_clone = registry.clone(); + let ws_log_clone = ws_log.clone(); + let tls_acceptor_clone = tls_acceptor.clone(); + + tokio::spawn(async move { + if let Some(acc) = tls_acceptor_clone { + match acc.accept(stream).await { + Ok(tls_stream) => { + handle_connection(tls_stream, peer_addr, rx, registry_clone, ws_log_clone) + .await } - Err(e) => { - eprintln!("[WebSocketServer] Fehler beim Akzeptieren einer Verbindung: {}", e); - // Kurz warten, um CPU-Last zu reduzieren bei wiederholten Fehlern - tokio::time::sleep(TokioDuration::from_millis(100)).await; + Err(err) => { + eprintln!( + "[WebSocketServer] TLS-Handshake fehlgeschlagen ({peer_addr}): {err}" + ); } } + } else { + handle_connection(stream, peer_addr, rx, registry_clone, ws_log_clone).await; } - } + }); } } @@ -386,19 +341,16 @@ async fn handle_connection( ) where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - println!("[WebSocketServer] Versuche WebSocket-Handshake für {}...", peer_addr); let ws_stream = match accept_async(stream).await { - Ok(ws) => { - println!("[WebSocketServer] WebSocket-Handshake erfolgreich für {} - Verbindung etabliert", peer_addr); - ws - } + Ok(ws) => ws, Err(e) => { eprintln!("[WebSocketServer] WebSocket-Handshake fehlgeschlagen ({peer_addr}): {e}"); - eprintln!("[WebSocketServer] Mögliche Ursachen: Client sendet kein WebSocket-Request, falscher Upgrade-Header, oder Protokoll-Mismatch"); return; } }; + println!("[WebSocketServer] Neue Verbindung von {}", peer_addr); + let (mut ws_sender, mut ws_receiver) = ws_stream.split(); // Kanal für Antworten direkt an diesen Client (z.B. getConnections) diff --git a/src/worker/character_creation.rs b/src/worker/character_creation.rs index 96ac47b..704338a 100644 --- a/src/worker/character_creation.rs +++ b/src/worker/character_creation.rs @@ -20,9 +20,9 @@ use crate::worker::sql::{ QUERY_DELETE_DIRECTOR, QUERY_DELETE_RELATIONSHIP, QUERY_DELETE_CHILD_RELATION, + QUERY_INSERT_NOTIFICATION, QUERY_MARK_CHARACTER_DECEASED, }; -use crate::worker::{insert_notification, publish_update_status}; pub struct CharacterCreationWorker { pub(crate) base: BaseWorker, @@ -417,48 +417,12 @@ impl CharacterCreationWorker { // 2) Relationships löschen und betroffene User benachrichtigen conn.prepare("delete_relationship", QUERY_DELETE_RELATIONSHIP)?; let rel_result = conn.execute("delete_relationship", &[&character_id])?; - - // Logging: Anzahl gelöschter Relationships - let deleted_count = rel_result.len(); - if deleted_count > 0 { - eprintln!( - "[CharacterCreationWorker] {} Relationship(s) gelöscht für character_id={}", - deleted_count, character_id - ); - } - for row in rel_result { - let related_user_id = row + if let Some(related_user_id) = row .get("related_user_id") - .and_then(|v| v.parse::().ok()); - let related_character_id = row - .get("related_character_id") - .and_then(|v| v.parse::().ok()); - let relationship_type_tr = row - .get("relationship_type_tr") - .map(|s| s.to_string()); - - // Logging: Relationship wurde gelöscht - eprintln!( - "[CharacterCreationWorker] Relationship gelöscht: character_id={}, related_character_id={:?}, related_user_id={:?}, relationship_type={:?}", - character_id, - related_character_id, - related_user_id, - relationship_type_tr - ); - - if let Some(uid) = related_user_id { - // Spezielle Notification für Verlobungen - if relationship_type_tr.as_deref() == Some("engaged") { - use crate::worker::insert_notification; - let notification_json = serde_json::json!({ - "tr": "relationship.engaged_character_death", - "character_id": related_character_id - }); - insert_notification(pool, uid, ¬ification_json.to_string(), related_character_id)?; - } else { - Self::notify_user(pool, broker, uid, "relationship_death")?; - } + .and_then(|v| v.parse::().ok()) + { + Self::notify_user(pool, broker, related_user_id, "relationship_death")?; } } @@ -492,11 +456,17 @@ impl CharacterCreationWorker { user_id: i32, event_type: &str, ) -> Result<(), DbError> { - // DB-Notification (zentralisiert). Historisch wird hier als `tr` der event_type-String gespeichert. - insert_notification(pool, user_id, event_type, None)?; + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - // Frontend-Update (zentralisiert) - publish_update_status(broker, user_id); + conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?; + conn.execute("insert_notification", &[&user_id])?; + + // falukantUpdateStatus + let update_message = + format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); + broker.publish(update_message); // ursprüngliche Benachrichtigung let message = diff --git a/src/worker/director.rs b/src/worker/director.rs index 2b03998..dd4fc51 100644 --- a/src/worker/director.rs +++ b/src/worker/director.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use crate::message_broker::MessageBroker; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::Mutex; use std::time::{Duration, Instant}; use crate::db::ConnectionPool; @@ -25,6 +24,7 @@ use crate::worker::sql::{ QUERY_GET_SALARY_TO_PAY, QUERY_SET_SALARY_PAYED, QUERY_UPDATE_SATISFACTION, + QUERY_GET_DIRECTOR_USER, QUERY_COUNT_VEHICLES_IN_BRANCH_REGION, QUERY_COUNT_VEHICLES_IN_REGION, QUERY_CHECK_ROUTE, @@ -35,20 +35,15 @@ use crate::worker::sql::{ QUERY_GET_USER_OFFICES, QUERY_CUMULATIVE_TAX_NO_EXEMPT, QUERY_CUMULATIVE_TAX_WITH_EXEMPT, - QUERY_GET_VEHICLES_TO_REPAIR_IN_REGION, - QUERY_REPAIR_VEHICLE, }; -use crate::worker::publish_update_status; #[derive(Debug, Clone)] struct Director { id: i32, branch_id: i32, - falukant_user_id: i32, may_produce: bool, may_sell: bool, may_start_transport: bool, - may_repair_vehicles: bool, } #[derive(Debug, Clone)] @@ -162,14 +157,6 @@ impl DirectorWorker { } for director in directors { - if director.may_repair_vehicles { - if let Err(err) = self.repair_vehicles(&director) { - eprintln!( - "[DirectorWorker] Fehler bei repair_vehicles für Director {}: {err}", - director.id - ); - } - } if director.may_produce { eprintln!( "[DirectorWorker] Starte Produktionsprüfung für Director {} (branch_id={})", @@ -205,106 +192,15 @@ impl DirectorWorker { Some(Director { id: row.get("id")?.parse().ok()?, branch_id: row.get("branch_id")?.parse().ok()?, - falukant_user_id: row.get("falukant_user_id")?.parse().ok()?, may_produce: row.get("may_produce").map(|v| v == "t" || v == "true").unwrap_or(false), may_sell: row.get("may_sell").map(|v| v == "t" || v == "true").unwrap_or(false), may_start_transport: row .get("may_start_transport") .map(|v| v == "t" || v == "true") .unwrap_or(false), - may_repair_vehicles: row - .get("may_repair_vehicles") - .map(|v| v == "t" || v == "true") - .unwrap_or(false), }) } - fn repair_vehicles(&mut self, director: &Director) -> Result<(), DbError> { - self.base - .set_current_step("DirectorWorker: repair_vehicles"); - - let mut conn = self - .base - .pool - .get() - .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - - // Region des Directors/Branches bestimmen - conn.prepare("get_branch_region_for_repair", QUERY_GET_BRANCH_REGION)?; - let rows = conn.execute("get_branch_region_for_repair", &[&director.branch_id])?; - let region_id: i32 = rows - .first() - .and_then(|r| r.get("region_id")) - .and_then(|v| v.parse::().ok()) - .unwrap_or(-1); - if region_id < 0 { - return Ok(()); - } - - conn.prepare( - "get_vehicles_to_repair", - QUERY_GET_VEHICLES_TO_REPAIR_IN_REGION, - )?; - conn.prepare("repair_vehicle", QUERY_REPAIR_VEHICLE)?; - - let candidates = - conn.execute("get_vehicles_to_repair", &[&director.falukant_user_id, ®ion_id])?; - if candidates.is_empty() { - return Ok(()); - } - - let mut repaired = 0usize; - for row in candidates { - let vehicle_id = row - .get("vehicle_id") - .and_then(|v| v.parse::().ok()) - .unwrap_or(-1); - let condition = row - .get("condition") - .and_then(|v| v.parse::().ok()) - .unwrap_or(100.0); - let type_cost = row - .get("cost") - .and_then(|v| v.parse::().ok()) - .unwrap_or(0.0); - - if vehicle_id < 0 || type_cost <= 0.0 { - continue; - } - - // repairCost = round(type.cost * 0.8 * (100 - condition) / 100) - let repair_cost = (type_cost * 0.8 * (100.0 - condition) / 100.0).round(); - if repair_cost <= 0.0 { - continue; - } - - // Preconditions (wie Backend): - // - sufficientFunds: wir versuchen abzubuchen und überspringen bei Fehler - if let Err(_err) = self.base.change_falukant_user_money( - director.falukant_user_id, - -repair_cost, - &format!("repair vehicle {}", vehicle_id), - ) { - continue; - } - - // Fahrzeug auf 100 setzen + available_from in Zukunft (build_time_minutes) - let _ = conn.execute("repair_vehicle", &[&vehicle_id])?; - repaired += 1; - } - - if repaired > 0 { - eprintln!( - "[DirectorWorker] {} Fahrzeug(e) automatisch zur Reparatur gestartet (director_id={}, region_id={})", - repaired, director.id, region_id - ); - // Frontend: Branches/Status neu laden - publish_update_status(&self.base.broker, director.falukant_user_id); - } - - Ok(()) - } - fn start_productions(&mut self, director: &Director) -> Result<(), DbError> { self.base .set_current_step("DirectorWorker: start_productions"); @@ -319,36 +215,10 @@ impl DirectorWorker { conn.prepare("get_to_produce", QUERY_GET_BEST_PRODUCTION)?; let rows = conn.execute("get_to_produce", &[&director.id, &director.branch_id])?; if rows.is_empty() { - // Debug: SQL-Vorschau nur gedrosselt loggen, damit wir die Query testen können - // ohne Log-Flut. - static LAST_EMPTY_PROD_LOG: Mutex> = Mutex::new(None); - let mut last = LAST_EMPTY_PROD_LOG.lock().unwrap(); - let should_log = last - .map(|t| t.elapsed().as_secs() >= 60) - .unwrap_or(true); - if should_log { - // SQL ggf. kürzen, um Log-Flut zu vermeiden - let mut sql_preview = QUERY_GET_BEST_PRODUCTION.to_string(); - const MAX_SQL_PREVIEW: usize = 1200; - if sql_preview.len() > MAX_SQL_PREVIEW { - sql_preview.truncate(MAX_SQL_PREVIEW); - sql_preview.push_str(" …"); - } - eprintln!( - "[DirectorWorker] Keine Produktionskandidaten für Director {} (branch_id={}). Query (get_to_produce): {} | params: director_id={}, branch_id={}", - director.id, - director.branch_id, - sql_preview, - director.id, - director.branch_id - ); - *last = Some(Instant::now()); - } else { - eprintln!( - "[DirectorWorker] Keine Produktionskandidaten für Director {} gefunden.", - director.id - ); - } + eprintln!( + "[DirectorWorker] Keine Produktionskandidaten für Director {} gefunden.", + director.id + ); return Ok(()); } @@ -432,32 +302,12 @@ impl DirectorWorker { base_plan.running_productions_quantity = running_productions_quantity; // Eine neue Produktion starten (max. 100 Stück) - let produced_quantity = match self.create_single_production(&mut conn, &base_plan) { - Ok(qty) => qty, - Err(err) => { - eprintln!( - "[DirectorWorker] Fehler beim Starten einer Produktion: {err}" - ); - break; - } - }; - - // WICHTIG: Wenn wir die gesamte verfügbare Kapazität verwendet haben, breche ab. - // Sonst könnte die nächste Iteration fälschlicherweise noch Platz sehen, wenn - // die gerade gestartete Produktion noch nicht in running_productions_quantity enthalten ist. - if produced_quantity >= free_capacity { + if let Err(err) = self.create_single_production(&mut conn, &base_plan) { eprintln!( - "[DirectorWorker] Produktion mit {} Stück gestartet, was die gesamte freie Kapazität ({}) ausnutzt. Breche ab.", - produced_quantity, free_capacity + "[DirectorWorker] Fehler beim Starten einer Produktion: {err}" ); break; } - - // Wenn wir weniger als 10% der freien Kapazität produziert haben, könnte es sein, - // dass wir noch mehr Platz haben. Aber sicherheitshalber brechen wir nach einer - // Produktion ab, um Race Conditions zu vermeiden. - // Die nächste Iteration (beim nächsten Director-Check) wird dann wieder prüfen. - break; } Ok(()) @@ -519,15 +369,12 @@ impl DirectorWorker { &mut self, conn: &mut DbConnection, plan: &ProductionPlan, - ) -> Result { - // WICHTIG: Kapazität direkt aus dem Plan berechnen (wurde gerade in der Schleife aktualisiert) + ) -> Result<(), DbError> { let free_capacity = Self::calc_free_capacity(plan); let one_piece_cost = Self::calc_one_piece_cost(plan); let max_money_production = Self::calc_max_money_production(plan, one_piece_cost); - // to_produce darf NIE größer sein als free_capacity, sonst passt es nicht ins Lager - // Zusätzlich: max. 100 Stück pro Produktion, und max. was das Geld erlaubt - let to_produce = free_capacity.min(max_money_production).min(100).max(0); + let to_produce = (free_capacity.min(max_money_production)).clamp(0, 100); eprintln!( "[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}", @@ -546,16 +393,7 @@ impl DirectorWorker { plan.running_productions, plan.running_productions_quantity ); - return Ok(0); - } - - // Sicherheitsprüfung: to_produce darf niemals größer sein als free_capacity - if to_produce > free_capacity { - eprintln!( - "[DirectorWorker] FEHLER: to_produce ({}) > free_capacity ({})! Das sollte nicht passieren. Breche Produktion ab.", - to_produce, free_capacity - ); - return Ok(0); + return Ok(()); } let production_cost = to_produce as f64 * one_piece_cost; @@ -596,8 +434,7 @@ impl DirectorWorker { ); self.base.broker.publish(message); - // Rückgabe der produzierten Menge, damit die Schleife entscheiden kann, ob sie weiterläuft - Ok(to_produce) + Ok(()) } fn calc_free_capacity(plan: &ProductionPlan) -> i32 { @@ -648,8 +485,14 @@ impl DirectorWorker { // Für alle Items dieses Directors sollten die user_id-Felder identisch // sein (Arbeitgeber des Directors). let falukant_user_id = if items.is_empty() { - // User-ID ist bereits über QUERY_GET_DIRECTORS geladen. - director.falukant_user_id + // Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln + conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?; + let user_rows = conn.execute("get_director_user", &[&director.id])?; + user_rows + .into_iter() + .next() + .and_then(|row| row.get("employer_user_id").and_then(|v| v.parse::().ok())) + .ok_or_else(|| DbError::new("Konnte employer_user_id nicht ermitteln"))? } else { items[0].user_id }; @@ -665,7 +508,7 @@ impl DirectorWorker { let vehicles_in_branch = vehicle_count_rows .into_iter() .next() - .and_then(|row| row.get("cnt").and_then(|v| v.parse::().ok())) + .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0); // Falls es nichts zu transportieren gibt, prüfe auf leere Transporte @@ -718,7 +561,7 @@ impl DirectorWorker { let vehicles_in_branch_after = vehicle_count_rows_after .into_iter() .next() - .and_then(|row| row.get("cnt").and_then(|v| v.parse::().ok())) + .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0); if vehicles_in_branch_after == 0 { @@ -768,7 +611,7 @@ impl DirectorWorker { let vehicles_in_branch_final = vehicle_count_rows_final .into_iter() .next() - .and_then(|row| row.get("cnt").and_then(|v| v.parse::().ok())) + .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0); if vehicles_in_branch_final == 0 { @@ -963,38 +806,24 @@ impl DirectorWorker { return Ok(()); } - // compute piece price and full sell price - let piece_price = Self::compute_piece_sell_price(item); - let revenue = piece_price * item.quantity as f64; + // compute piece price and full sell price + let piece_price = Self::compute_piece_sell_price(item); + let sell_price = piece_price * item.quantity as f64; let one_piece_cost = Self::resolve_one_piece_cost(conn, item.product_id, item.sell_cost)?; - let cumulative_tax_percent = - Self::get_cumulative_tax_percent(conn, item.branch_id, item.user_id)?; + let cumulative_tax_percent = Self::get_cumulative_tax_percent(conn, item.branch_id, item.user_id)?; - let revenue_cents = (revenue * 100.0).round() as i64; - let cost = one_piece_cost * item.quantity as f64; - let cost_cents = (cost * 100.0).round() as i64; + let revenue_cents = (sell_price * 100.0).round() as i64; + let cost_cents = (one_piece_cost * item.quantity as f64 * 100.0).round() as i64; let profit_cents = (revenue_cents - cost_cents).max(0); - - // Steuer wird vom Gewinn abgezogen (nicht „zugerechnet“) let tax_cents = ((profit_cents as f64) * cumulative_tax_percent / 100.0).round() as i64; let payout_cents = revenue_cents - tax_cents; - eprintln!("[DirectorWorker] sell: revenue={:.2}, cost={:.2}, profit_cents={}, tax%={:.2}, tax_cents={}, payout_cents={}", revenue, cost, profit_cents, cumulative_tax_percent, tax_cents, payout_cents); - - // Treasury-User-ID optional per ENV, fallback auf DEFAULT_TREASURY_USER_ID - let treasury_user_id: i32 = std::env::var("TREASURY_FALUKANT_USER_ID") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(DEFAULT_TREASURY_USER_ID); + eprintln!("[DirectorWorker] sell: revenue={:.2}, cost={:.2}, profit_cents={}, tax%={:.2}, tax_cents={}, payout_cents={}", sell_price, one_piece_cost * item.quantity as f64, profit_cents, cumulative_tax_percent, tax_cents, payout_cents); if tax_cents > 0 { let tax_amount = (tax_cents as f64) / 100.0; - if let Err(err) = self.base.change_falukant_user_money( - treasury_user_id, - tax_amount, - &format!("tax from sale product {}", item.product_id), - ) { + if let Err(err) = self.base.change_falukant_user_money(DEFAULT_TREASURY_USER_ID, tax_amount, &format!("tax from sale product {}", item.product_id)) { eprintln!("[DirectorWorker] Fehler bei change_falukant_user_money (tax): {err}"); } } @@ -1008,7 +837,7 @@ impl DirectorWorker { eprintln!( "[DirectorWorker] sell: user_id={}, revenue={:.2}, tax={:.2}, payout={:.2}, product_id={}", item.user_id, - revenue, + sell_price, (tax_cents as f64) / 100.0, payout_amount, item.product_id @@ -1221,7 +1050,7 @@ impl DirectorWorker { let vehicle_count = vehicle_count_rows .into_iter() .next() - .and_then(|row| row.get("cnt").and_then(|v| v.parse::().ok())) + .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0); eprintln!( @@ -1239,7 +1068,7 @@ impl DirectorWorker { let route_exists = route_rows .into_iter() .next() - .and_then(|row| row.get("1").and_then(|v| v.parse::().ok())) + .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0) > 0; eprintln!( diff --git a/src/worker/events.rs b/src/worker/events.rs index 23fdc8b..b86f9b3 100644 --- a/src/worker/events.rs +++ b/src/worker/events.rs @@ -5,7 +5,6 @@ use rand::seq::SliceRandom; use serde_json::json; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::Mutex; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; @@ -14,6 +13,7 @@ use crate::worker::sql::{ QUERY_GET_RANDOM_INFANT, QUERY_GET_RANDOM_CITY, QUERY_GET_AFFECTED_USERS, + QUERY_INSERT_NOTIFICATION, QUERY_GET_MONEY, QUERY_UPDATE_MONEY, QUERY_GET_REGION_STOCKS, @@ -45,9 +45,7 @@ use crate::worker::sql::{ QUERY_DELETE_CHILD_RELATION, QUERY_DELETE_CHARACTER, QUERY_GET_HEIR, - QUERY_GET_RANDOM_HEIR_FROM_REGION, QUERY_SET_CHARACTER_USER, - QUERY_CLEAR_CHARACTER_USER, QUERY_GET_CURRENT_MONEY, QUERY_GET_HOUSE_VALUE, QUERY_GET_SETTLEMENT_VALUE, @@ -55,7 +53,6 @@ use crate::worker::sql::{ QUERY_GET_CREDIT_DEBT, QUERY_COUNT_CHILDREN, }; -use crate::worker::{insert_notification, publish_update_status}; /// Typisierung von Ereignissen #[derive(Debug, Clone, Copy, PartialEq)] @@ -101,13 +98,6 @@ pub enum EventEffect { HouseQualityChange { probability: f64, min_change: i32, max_change: i32 }, } -/// Hilfsstruktur für Character-Informationen aus Effekten -struct CharacterInfo { - character_id: Option, - first_name: Option, - last_name: Option, -} - /// Definition eines zufälligen Ereignisses #[derive(Debug, Clone)] pub struct RandomEvent { @@ -169,7 +159,7 @@ impl EventsWorker { vec![ RandomEvent { id: "windfall".to_string(), - probability_per_minute: 0.0025, // 0.25% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.01, // 1% pro Minute event_type: EventType::Personal, title: "Unerwarteter Geldsegen".to_string(), description: "Du findest eine vergessene Geldbörse auf der Straße.".to_string(), @@ -183,7 +173,7 @@ impl EventsWorker { }, RandomEvent { id: "theft".to_string(), - probability_per_minute: 0.002, // 0.2% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.008, // 0.8% pro Minute event_type: EventType::Personal, title: "Diebstahl".to_string(), description: "Ein Dieb hat einen Teil deines Geldes gestohlen.".to_string(), @@ -197,7 +187,7 @@ impl EventsWorker { }, RandomEvent { id: "regional_storm".to_string(), - probability_per_minute: 0.00125, // 0.125% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.005, // 0.5% pro Minute event_type: EventType::Regional, title: "Sturm in der Region".to_string(), description: "Ein schwerer Sturm hat die Region getroffen.".to_string(), @@ -241,7 +231,7 @@ impl EventsWorker { }, RandomEvent { id: "regional_festival".to_string(), - probability_per_minute: 0.00075, // 0.075% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.003, // 0.3% pro Minute event_type: EventType::Regional, title: "Regionales Fest".to_string(), description: "Ein großes Fest findet in der Region statt.".to_string(), @@ -260,7 +250,7 @@ impl EventsWorker { }, RandomEvent { id: "warehouse_fire".to_string(), - probability_per_minute: 0.0005, // 0.05% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.002, // 0.2% pro Minute event_type: EventType::Personal, title: "Lagerbrand".to_string(), description: "Ein Feuer hat Teile deines Lagers beschädigt.".to_string(), @@ -293,22 +283,21 @@ impl EventsWorker { }, RandomEvent { id: "character_illness".to_string(), - probability_per_minute: 0.0025, // 0.25% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.01, // 1% pro Minute event_type: EventType::Personal, title: "Krankheit".to_string(), description: "Ein Charakter ist erkrankt und hat an Gesundheit verloren.".to_string(), effects: vec![ EventEffect::CharacterHealthChange { probability: 1.0, - // Soll nur leicht reduzieren: maximal -12 (User-Wunsch) - min_change: -12, - max_change: -3, + min_change: -20, + max_change: -5, }, ], }, RandomEvent { id: "character_recovery".to_string(), - probability_per_minute: 0.002, // 0.2% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.008, // 0.8% pro Minute event_type: EventType::Personal, title: "Genesung".to_string(), description: "Ein Charakter hat sich von einer Krankheit erholt.".to_string(), @@ -320,37 +309,9 @@ impl EventsWorker { }, ], }, - RandomEvent { - id: "character_rest".to_string(), - probability_per_minute: 0.0015, // 0.15% pro Minute (reduziert) - event_type: EventType::Personal, - title: "Erholung".to_string(), - description: "Ein Charakter hat sich gut ausgeruht und gewinnt Gesundheit zurück.".to_string(), - effects: vec![ - EventEffect::CharacterHealthChange { - probability: 1.0, - min_change: 3, - max_change: 10, - }, - ], - }, - RandomEvent { - id: "character_healer".to_string(), - probability_per_minute: 0.001, // 0.10% pro Minute (reduziert) - event_type: EventType::Personal, - title: "Heilerbesuch".to_string(), - description: "Ein Heiler behandelt einen Charakter. Die Gesundheit steigt.".to_string(), - effects: vec![ - EventEffect::CharacterHealthChange { - probability: 1.0, - min_change: 8, - max_change: 20, - }, - ], - }, RandomEvent { id: "character_accident".to_string(), - probability_per_minute: 0.00075, // 0.075% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.003, // 0.3% pro Minute event_type: EventType::Personal, title: "Unfall".to_string(), description: "Ein schwerer Unfall hat einen Charakter schwer verletzt.".to_string(), @@ -367,7 +328,7 @@ impl EventsWorker { }, RandomEvent { id: "regional_epidemic".to_string(), - probability_per_minute: 0.0005, // 0.05% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.002, // 0.2% pro Minute event_type: EventType::Regional, title: "Epidemie".to_string(), description: "Eine Seuche hat die Region erfasst.".to_string(), @@ -384,11 +345,11 @@ impl EventsWorker { }, RandomEvent { id: "sudden_infant_death".to_string(), - // Wahrscheinlichkeit pro Minute: 0.000125 (0.0125%) - reduziert auf 25% der ursprünglichen Rate + // Wahrscheinlichkeit pro Minute: 0.0005 (0.05%) // Im Mittelalter starben etwa 30-40% der Kinder vor dem 2. Geburtstag // Diese Wahrscheinlichkeit führt bei regelmäßiger Prüfung zu einer // realistischen mittelalterlichen Säuglingssterblichkeit - probability_per_minute: 0.000125, + probability_per_minute: 0.0005, event_type: EventType::Personal, title: "Plötzlicher Kindstod".to_string(), description: "Ein Kleinkind ist plötzlich verstorben.".to_string(), @@ -400,7 +361,7 @@ impl EventsWorker { }, RandomEvent { id: "earthquake".to_string(), - probability_per_minute: 0.00025, // 0.025% pro Minute (reduziert auf 25% der ursprünglichen Rate) + probability_per_minute: 0.001, // 0.1% pro Minute (sehr selten) event_type: EventType::Regional, title: "Erdbeben".to_string(), description: "Ein Erdbeben hat die Region erschüttert.".to_string(), @@ -430,10 +391,8 @@ impl EventsWorker { fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { let mut last_event_check = None; - let mut last_notification_cleanup: Option = None; let mut rng = rand::thread_rng(); let events = Self::initialize_events(); - eprintln!("[EventsWorker] Worker-Thread gestartet"); loop { if !state.running_worker.load(Ordering::Relaxed) { @@ -442,19 +401,6 @@ impl EventsWorker { let now = Instant::now(); - // Heartbeat im Run-Loop (unabhängig davon, ob Events triggern) - static LAST_RUNLOOP_HEARTBEAT: Mutex> = Mutex::new(None); - { - let mut last = LAST_RUNLOOP_HEARTBEAT.lock().unwrap(); - let should_log = last - .map(|t| t.elapsed().as_secs() >= 3600) - .unwrap_or(true); - if should_log { - eprintln!("[EventsWorker] RunLoop Heartbeat: alive"); - *last = Some(Instant::now()); - } - } - // Ereignisse einmal pro Minute prüfen if should_run_interval(last_event_check, now, Duration::from_secs(60)) { if let Err(err) = Self::check_and_trigger_events_inner( @@ -469,13 +415,8 @@ impl EventsWorker { last_event_check = Some(now); } - // Alte Notifications einmal täglich aufräumen (alle 24 Stunden) - if should_run_interval(last_notification_cleanup, now, Duration::from_secs(86400)) { - Self::cleanup_old_notifications(&pool); - last_notification_cleanup = Some(now); - } - - // 10-Sekunden-Wartezeit in kurze Scheiben aufteilen + // 10-Sekunden-Wartezeit in kurze Scheiben aufteilen, damit ein Shutdown + // (running_worker = false) schnell greift. const SLICE_MS: u64 = 500; let total_ms = 10_000; let mut slept = 0; @@ -491,34 +432,9 @@ impl EventsWorker { } } - fn cleanup_old_notifications(pool: &ConnectionPool) { - use crate::worker::sql::QUERY_DELETE_OLD_NOTIFICATIONS; - - let result = (|| -> Result<(), DbError> { - let mut conn = pool - .get() - .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - - conn.prepare("delete_old_notifications", QUERY_DELETE_OLD_NOTIFICATIONS)?; - conn.execute("delete_old_notifications", &[])?; - Ok(()) - })(); - - match result { - Ok(()) => eprintln!("[EventsWorker] Alte Notifications (>30 Tage) aufgeräumt"), - Err(err) => eprintln!("[EventsWorker] Fehler beim Aufräumen alter Notifications: {err}"), - } - } - - // Globaler Skalierungsfaktor für Ereignisfrequenz. - // Default: 1.0 (unverändert). Optional per ENV `EVENT_RATE_SCALE` konfigurierbar. - fn event_rate_scale() -> f64 { - std::env::var("EVENT_RATE_SCALE") - .ok() - .and_then(|v| v.parse::().ok()) - .filter(|v| v.is_finite() && *v >= 0.0) - .unwrap_or(1.0) - } + // Globaler Skalierungsfaktor für Ereignisfrequenz (1.0 = unverändert). + // Setze auf 0.05, um Ereignisse auf 1/20 der ursprünglichen Häufigkeit zu reduzieren. + const EVENT_RATE_SCALE: f64 = 0.05; fn check_and_trigger_events_inner( pool: &ConnectionPool, @@ -527,38 +443,12 @@ impl EventsWorker { rng: &mut impl Rng, events: &[RandomEvent], ) -> Result<(), DbError> { - let rate_scale = Self::event_rate_scale(); - // Einmalige/gedrosselte Debug-Ausgaben, damit sichtbar ist, dass der Worker läuft - // und welche Skalierung aktiv ist. - static LAST_HEARTBEAT: Mutex> = Mutex::new(None); - // optional counter; keep it underscore-prefixed to avoid unused warnings - let mut _triggered = 0usize; - - { - let mut last = LAST_HEARTBEAT.lock().unwrap(); - let should_log = last - .map(|t| t.elapsed().as_secs() >= 3600) - .unwrap_or(true); - if should_log { - if (rate_scale - 1.0).abs() > f64::EPSILON { - eprintln!( - "[EventsWorker] Heartbeat: EVENT_RATE_SCALE={} (ENV `EVENT_RATE_SCALE` gesetzt?)", - rate_scale - ); - } else { - eprintln!("[EventsWorker] Heartbeat: EVENT_RATE_SCALE=1.0"); - } - *last = Some(Instant::now()); - } - } - // Prüfe jedes mögliche Ereignis for event in events { // Zufällige Prüfung basierend auf Wahrscheinlichkeit let roll = rng.gen_range(0.0..=1.0); - let effective_prob = event.probability_per_minute * rate_scale; + let effective_prob = event.probability_per_minute * Self::EVENT_RATE_SCALE; if roll < effective_prob { - _triggered += 1; eprintln!( "[EventsWorker] Ereignis '{}' wurde ausgelöst (Wahrscheinlichkeit: {:.4}% -> skaliert {:.4}%)", event.id, @@ -577,9 +467,6 @@ impl EventsWorker { } } - // Wenn über längere Zeit kein Ereignis triggert, sieht man sonst keinerlei Logs. - // Das Heartbeat-Log oben zeigt Liveness; hier loggen wir bewusst nicht pro Minute. - Ok(()) } @@ -589,11 +476,24 @@ impl EventsWorker { event: &RandomEvent, rng: &mut impl Rng, ) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + // Spezielle Behandlung für plötzlichen Kindstod: Finde ein zufälliges Kind unter 2 Jahren if event.id == "sudden_infant_death" { return Self::trigger_sudden_infant_death(pool, broker, event, rng); } - let user_id = match Self::get_random_user_id(pool)? { + conn.prepare("get_random_user", QUERY_GET_RANDOM_USER)?; + let rows = conn.execute("get_random_user", &[])?; + + let user_id: Option = rows + .first() + .and_then(|r| r.get("id")) + .and_then(|v| v.parse::().ok()); + + let user_id = match user_id { Some(id) => id, None => { eprintln!("[EventsWorker] Kein Spieler gefunden für persönliches Ereignis"); @@ -601,279 +501,119 @@ impl EventsWorker { } }; - let mut conn = pool - .get() - .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - - let effect_results = Self::apply_personal_effects(&mut conn, pool, broker, user_id, event, rng)?; - - if effect_results.is_empty() { - eprintln!( - "[EventsWorker] Persönliches Ereignis '{}' für Spieler {} übersprungen (keine Effekte)", - event.id, user_id - ); - return Ok(()); - } - - let char_info = Self::extract_character_info_from_effects(&effect_results); - Self::send_personal_event_notifications(pool, broker, user_id, event, &effect_results, &char_info)?; - - eprintln!( - "[EventsWorker] Persönliches Ereignis '{}' für Spieler {} verarbeitet", - event.id, user_id - ); - - Ok(()) - } - - fn get_random_user_id(pool: &ConnectionPool) -> Result, DbError> { - let mut conn = pool - .get() - .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - - conn.prepare("get_random_user", QUERY_GET_RANDOM_USER)?; - let rows = conn.execute("get_random_user", &[])?; - - Ok(rows - .first() - .and_then(|r| r.get("id")) - .and_then(|v| v.parse::().ok())) - } - - fn apply_personal_effects( - conn: &mut DbConnection, - pool: &ConnectionPool, - broker: &MessageBroker, - user_id: i32, - event: &RandomEvent, - rng: &mut impl Rng, - ) -> Result, DbError> { + // Wende Effekte an let mut effect_results = Vec::new(); - for effect in &event.effects { let effect_roll = rng.gen_range(0.0..=1.0); - if let Some(result) = Self::apply_single_personal_effect(conn, pool, broker, user_id, effect, effect_roll, rng)? { - effect_results.extend(result); + match effect { + EventEffect::MoneyChange { + probability, + min_percent, + max_percent, + } => { + if effect_roll < *probability { + let percent_change = rng.gen_range(*min_percent..=*max_percent); + if let Ok(absolute_change) = Self::apply_money_change(&mut conn, user_id, percent_change) { + effect_results.push(json!({ + "type": "money_change", + "percent": percent_change, + "absolute": absolute_change + })); + } + } + } + EventEffect::StorageCapacityChange { + probability, + min_percent, + max_percent, + } => { + if effect_roll < *probability { + let percent_change = rng.gen_range(*min_percent..=*max_percent); + Self::apply_storage_capacity_change(&mut conn, user_id, percent_change)?; + effect_results.push(json!({ + "type": "storage_capacity_change", + "percent": percent_change + })); + } + } + EventEffect::CharacterHealthChange { + probability, + min_change, + max_change, + } => { + if effect_roll < *probability + && let Ok((character_id, health_change)) = Self::apply_character_health_change( + &mut conn, + user_id, + *min_change, + *max_change, + rng, + ) + { + effect_results.push(json!({ + "type": "character_health_change", + "character_id": character_id, + "change": health_change + })); + } + } + EventEffect::CharacterDeath { probability } => { + if effect_roll < *probability + && let Ok(character_id) = Self::apply_character_death(&mut conn, user_id, pool, broker) + { + effect_results.push(json!({ + "type": "character_death", + "character_id": character_id + })); + } + } + EventEffect::StorageDamage { + probability, + stock_type_label, + inventory_damage_min_percent, + inventory_damage_max_percent, + storage_destruction_min_percent, + storage_destruction_max_percent, + } => { + if effect_roll < *probability + && let Ok(damage_info) = Self::apply_personal_storage_damage( + &mut conn, + PersonalStorageDamageParams { + user_id, + stock_type_label, + inventory_damage_min_percent: *inventory_damage_min_percent, + inventory_damage_max_percent: *inventory_damage_max_percent, + storage_destruction_min_percent: *storage_destruction_min_percent, + storage_destruction_max_percent: *storage_destruction_max_percent, + }, + rng, + ) + { + effect_results.push(json!({ + "type": "storage_damage", + "stock_type": stock_type_label, + "inventory_damage_percent": damage_info.inventory_damage_percent, + "storage_destruction_percent": damage_info.storage_destruction_percent, + "affected_stocks": damage_info.affected_stocks, + "destroyed_stocks": damage_info.destroyed_stocks, + })); + } + } + _ => { + eprintln!( + "[EventsWorker] Effekt {:?} wird für persönliche Ereignisse noch nicht unterstützt", + effect + ); + } } } - Ok(effect_results) - } - - fn apply_single_personal_effect( - conn: &mut DbConnection, - pool: &ConnectionPool, - broker: &MessageBroker, - user_id: i32, - effect: &EventEffect, - effect_roll: f64, - rng: &mut impl Rng, - ) -> Result>, DbError> { - match effect { - EventEffect::MoneyChange { probability, min_percent, max_percent } => { - Self::handle_money_change_effect(conn, user_id, effect_roll, *probability, *min_percent, *max_percent, rng) - } - EventEffect::StorageCapacityChange { probability, min_percent, max_percent } => { - Self::handle_storage_capacity_effect(conn, user_id, effect_roll, *probability, *min_percent, *max_percent, rng) - } - EventEffect::CharacterHealthChange { probability, min_change, max_change } => { - Self::handle_character_health_effect(conn, pool, broker, user_id, effect_roll, *probability, *min_change, *max_change, rng) - } - EventEffect::CharacterDeath { probability } => { - Self::handle_character_death_effect(conn, pool, broker, user_id, effect_roll, *probability) - } - EventEffect::StorageDamage { probability, stock_type_label, inventory_damage_min_percent, inventory_damage_max_percent, storage_destruction_min_percent, storage_destruction_max_percent } => { - Self::handle_storage_damage_effect(conn, user_id, effect_roll, *probability, stock_type_label, *inventory_damage_min_percent, *inventory_damage_max_percent, *storage_destruction_min_percent, *storage_destruction_max_percent, rng) - } - _ => { - eprintln!("[EventsWorker] Effekt {:?} wird für persönliche Ereignisse noch nicht unterstützt", effect); - Ok(None) - } - } - } - - fn handle_money_change_effect( - conn: &mut DbConnection, - user_id: i32, - effect_roll: f64, - probability: f64, - min_percent: f64, - max_percent: f64, - rng: &mut impl Rng, - ) -> Result>, DbError> { - if effect_roll >= probability { - return Ok(None); - } - let percent_change = rng.gen_range(min_percent..=max_percent); - if let Ok(absolute_change) = Self::apply_money_change(conn, user_id, percent_change) { - Ok(Some(vec![json!({ - "type": "money_change", - "percent": percent_change, - "absolute": absolute_change - })])) - } else { - Ok(None) - } - } - - fn handle_storage_capacity_effect( - conn: &mut DbConnection, - user_id: i32, - effect_roll: f64, - probability: f64, - min_percent: f64, - max_percent: f64, - rng: &mut impl Rng, - ) -> Result>, DbError> { - if effect_roll >= probability { - return Ok(None); - } - let percent_change = rng.gen_range(min_percent..=max_percent); - Self::apply_storage_capacity_change(conn, user_id, percent_change)?; - Ok(Some(vec![json!({ - "type": "storage_capacity_change", - "percent": percent_change - })])) - } - - fn handle_character_health_effect( - conn: &mut DbConnection, - pool: &ConnectionPool, - broker: &MessageBroker, - user_id: i32, - effect_roll: f64, - probability: f64, - min_change: i32, - max_change: i32, - rng: &mut impl Rng, - ) -> Result>, DbError> { - if effect_roll >= probability { - return Ok(None); - } - let Ok((character_id, health_change, died, first_name, last_name)) = - Self::apply_character_health_change(conn, pool, broker, user_id, min_change, max_change, rng) - else { - return Ok(None); - }; - - let mut results = vec![json!({ - "type": "character_health_change", - "character_id": character_id, - "change": health_change, - "character_first_name": first_name, - "character_last_name": last_name - })]; - - if died { - results.push(json!({ - "type": "character_death", - "character_id": character_id, - "character_first_name": first_name, - "character_last_name": last_name - })); - } - - Ok(Some(results)) - } - - fn handle_character_death_effect( - conn: &mut DbConnection, - pool: &ConnectionPool, - broker: &MessageBroker, - user_id: i32, - effect_roll: f64, - probability: f64, - ) -> Result>, DbError> { - if effect_roll >= probability { - return Ok(None); - } - let Ok((character_id, first_name, last_name)) = Self::apply_character_death(conn, user_id, pool, broker) else { - return Ok(None); - }; - Ok(Some(vec![json!({ - "type": "character_death", - "character_id": character_id, - "character_first_name": first_name, - "character_last_name": last_name - })])) - } - - fn handle_storage_damage_effect( - conn: &mut DbConnection, - user_id: i32, - effect_roll: f64, - probability: f64, - stock_type_label: &str, - inventory_damage_min_percent: f64, - inventory_damage_max_percent: f64, - storage_destruction_min_percent: f64, - storage_destruction_max_percent: f64, - rng: &mut impl Rng, - ) -> Result>, DbError> { - if effect_roll >= probability { - return Ok(None); - } - let Ok(damage_info) = Self::apply_personal_storage_damage( - conn, - PersonalStorageDamageParams { - user_id, - stock_type_label, - inventory_damage_min_percent, - inventory_damage_max_percent, - storage_destruction_min_percent, - storage_destruction_max_percent, - }, - rng, - ) else { - return Ok(None); - }; - Ok(Some(vec![json!({ - "type": "storage_damage", - "stock_type": stock_type_label, - "inventory_damage_percent": damage_info.inventory_damage_percent, - "storage_destruction_percent": damage_info.storage_destruction_percent, - "affected_stocks": damage_info.affected_stocks, - "destroyed_stocks": damage_info.destroyed_stocks, - })])) - } - - fn extract_character_info_from_effects(effect_results: &[serde_json::Value]) -> CharacterInfo { - let character_id = effect_results.iter().find_map(|eff| { + // Schreibe Benachrichtigung in die Datenbank mit Event-Details + // If any effect contains a character_id, include it at top-level for the notification + let top_character_id = effect_results.iter().find_map(|eff| { eff.get("character_id").and_then(|v| v.as_i64()).map(|n| n as i32) }); - let (first_name, last_name) = if let Some(cid) = character_id { - let eff = effect_results.iter().find(|e| { - e.get("character_id") - .and_then(|v| v.as_i64()) - .map(|n| n as i32) - == Some(cid) - }); - let fn_ = eff - .and_then(|e| e.get("character_first_name")) - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - let ln_ = eff - .and_then(|e| e.get("character_last_name")) - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - (fn_, ln_) - } else { - (None, None) - }; - - CharacterInfo { character_id, first_name, last_name } - } - - fn send_personal_event_notifications( - pool: &ConnectionPool, - broker: &MessageBroker, - user_id: i32, - event: &RandomEvent, - effect_results: &[serde_json::Value], - char_info: &CharacterInfo, - ) -> Result<(), DbError> { - // DB-Benachrichtigung let mut notification_json = serde_json::json!({ "tr": format!("random_event.{}", event.id), "event_id": event.id, @@ -881,20 +621,14 @@ impl EventsWorker { "effects": effect_results }); - if let Some(cid) = char_info.character_id { + if let Some(cid) = top_character_id { notification_json["character_id"] = serde_json::json!(cid); } - if let Some(ref fn_) = char_info.first_name { - notification_json["character_first_name"] = serde_json::json!(fn_); - } - if let Some(ref ln_) = char_info.last_name { - notification_json["character_last_name"] = serde_json::json!(ln_); - } - Self::notify_user(pool, broker, user_id, ¬ification_json.to_string(), char_info.character_id)?; + Self::notify_user(pool, broker, user_id, ¬ification_json.to_string())?; - // WebSocket-Benachrichtigung - let mut ws_notification = json!({ + // Sende Benachrichtigung über WebSocket + let mut notification = json!({ "event": "random_event", "event_id": event.id, "event_type": "personal", @@ -904,11 +638,16 @@ impl EventsWorker { "effects": effect_results }); - if let Some(cid) = char_info.character_id { - ws_notification["character_id"] = json!(cid); + if let Some(cid) = top_character_id { + notification["character_id"] = json!(cid); } - broker.publish(ws_notification.to_string()); + broker.publish(notification.to_string()); + eprintln!( + "[EventsWorker] Persönliches Ereignis '{}' für Spieler {} verarbeitet", + event.id, user_id + ); + Ok(()) } @@ -953,9 +692,6 @@ impl EventsWorker { } }; - let first_name = rows.first().and_then(|r| r.get("first_name")).cloned(); - let last_name = rows.first().and_then(|r| r.get("last_name")).cloned(); - // Wende Effekte an (in diesem Fall nur CharacterDeath) let mut effect_results = Vec::new(); for effect in &event.effects { @@ -967,9 +703,7 @@ impl EventsWorker { { effect_results.push(json!({ "type": "character_death", - "character_id": character_id, - "character_first_name": first_name, - "character_last_name": last_name + "character_id": character_id })); } } @@ -985,25 +719,12 @@ impl EventsWorker { // Schreibe Benachrichtigung in die Datenbank mit Event-Details let notification_json = serde_json::json!({ "tr": format!("random_event.{}", event.id), - "value": { - "event_id": event.id, - "event_type": "personal", - "title": event.title, - "description": event.description, - "character_id": character_id, - "character_first_name": first_name, - "character_last_name": last_name, - "effects": effect_results - } + "event_id": event.id, + "event_type": "personal", + "character_id": character_id, + "effects": effect_results }); - // Falls das Kind inzwischen gelöscht wurde, halten wir Name + ID im Payload fest. - Self::notify_user( - pool, - broker, - user_id, - ¬ification_json.to_string(), - Some(character_id), - )?; + Self::notify_user(pool, broker, user_id, ¬ification_json.to_string())?; // Sende Benachrichtigung über WebSocket let notification = json!({ @@ -1113,27 +834,32 @@ impl EventsWorker { min_change, max_change, } => { - // WICHTIG: Bei regionalen Ereignissen ist `probability` als Anteil der - // betroffenen Charaktere gemeint (z.B. 0.8 = 80% betroffen), - // nicht als "Chance, dass der Effekt global auf ALLE greift". - if let Ok((affected_characters, dead_characters)) = - Self::apply_regional_character_health_change( + if effect_roll < *probability { + // Für regionale Ereignisse: Betrifft alle Charaktere in der Region + if let Ok(affected_characters) = Self::apply_regional_character_health_change( &mut conn, - pool, - broker, region_id, - *probability, *min_change, *max_change, rng, - ) - { - effect_results.push(json!({ - "type": "character_health_change", - "affected_count": affected_characters.len(), - "characters": affected_characters - })); - if !dead_characters.is_empty() { + ) { + effect_results.push(json!({ + "type": "character_health_change", + "affected_count": affected_characters.len(), + "characters": affected_characters + })); + } + } + } + EventEffect::CharacterDeath { probability } => { + if effect_roll < *probability { + // Für regionale Ereignisse: Betrifft alle Charaktere in der Region + if let Ok(dead_characters) = Self::apply_regional_character_death( + &mut conn, + region_id, + pool, + broker, + ) { effect_results.push(json!({ "type": "character_death", "dead_count": dead_characters.len(), @@ -1142,25 +868,6 @@ impl EventsWorker { } } } - EventEffect::CharacterDeath { probability } => { - // WICHTIG: Bei regionalen Ereignissen ist `probability` als - // "Chance pro Charakter zu sterben" gemeint (z.B. 0.1 = 10%), - // nicht als "10% Chance, dass alle Charaktere der Region sterben". - if let Ok(dead_characters) = Self::apply_regional_character_death( - &mut conn, - region_id, - *probability, - pool, - broker, - rng, - ) { - effect_results.push(json!({ - "type": "character_death", - "dead_count": dead_characters.len(), - "characters": dead_characters - })); - } - } EventEffect::StorageDamage { probability, stock_type_label, @@ -1259,18 +966,12 @@ impl EventsWorker { // Schreibe Benachrichtigung in die Datenbank mit Event-Details let notification_json = serde_json::json!({ "tr": format!("random_event.{}", event.id), - "value": { - "event_id": event.id, - "event_type": "regional", - "title": event.title, - "description": event.description, - "region_id": region_id, - "effects": effect_results - } + "event_id": event.id, + "event_type": "regional", + "region_id": region_id, + "effects": effect_results }); - if let Err(err) = - Self::notify_user(pool, broker, uid, ¬ification_json.to_string(), None) - { + if let Err(err) = Self::notify_user(pool, broker, uid, ¬ification_json.to_string()) { eprintln!("[EventsWorker] Fehler beim Schreiben der Benachrichtigung für User {}: {}", uid, err); } @@ -1324,12 +1025,7 @@ impl EventsWorker { }; let change = current_money * (percent_change / 100.0); - // Action-String für money history: Unterscheide zwischen positiven (Geldsegen) und negativen (Diebstahl) Änderungen - let action = if percent_change > 0.0 { - "Zufallsereignis: Unerwarteter Geldsegen" - } else { - "Zufallsereignis: Diebstahl" - }; + let action = format!("Zufallsereignis: Geldänderung {:.2}%", percent_change); // Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection conn.prepare("update_money_event", QUERY_UPDATE_MONEY)?; @@ -1573,13 +1269,11 @@ impl EventsWorker { fn apply_character_health_change( conn: &mut DbConnection, - pool: &ConnectionPool, - broker: &MessageBroker, user_id: i32, min_change: i32, max_change: i32, rng: &mut impl Rng, - ) -> Result<(i32, i32, bool, Option, Option), DbError> { + ) -> Result<(i32, i32), DbError> { // Hole einen zufälligen Charakter des Spielers conn.prepare("get_random_character", QUERY_GET_RANDOM_CHARACTER)?; let rows = conn.execute("get_random_character", &[&user_id])?; @@ -1603,10 +1297,6 @@ impl EventsWorker { .and_then(|v| v.parse::().ok()) .unwrap_or(100); - // Namen-Snapshot (wichtig, falls der Charakter durch das Event stirbt und gelöscht wird) - let first_name = rows.first().and_then(|r| r.get("first_name")).cloned(); - let last_name = rows.first().and_then(|r| r.get("last_name")).cloned(); - let health_change = rng.gen_range(min_change..=max_change); let new_health = (current_health + health_change).clamp(0, 100); @@ -1614,15 +1304,7 @@ impl EventsWorker { conn.prepare("update_health", QUERY_UPDATE_HEALTH)?; conn.execute("update_health", &[&new_health, &character_id])?; - // Wenn die Gesundheit 0 erreicht, muss der Charakter „sterben“ (Cleanup + Löschen). - // Sonst bleibt er als health=0 „hängen“, weil spätere Queries meist health>0 filtern. - let died = new_health <= 0; - if died { - // Best-effort: Death-Handling kann fehlschlagen (FK/DB), dann loggt es selbst. - let _ = Self::handle_character_death(pool, broker, character_id); - } - - Ok((character_id, health_change, died, first_name, last_name)) + Ok((character_id, health_change)) } fn apply_character_death( @@ -1630,7 +1312,7 @@ impl EventsWorker { user_id: i32, pool: &ConnectionPool, broker: &MessageBroker, - ) -> Result<(i32, Option, Option), DbError> { + ) -> Result { // Hole einen zufälligen Charakter des Spielers conn.prepare("get_random_character_death", QUERY_GET_RANDOM_CHARACTER)?; let rows = conn.execute("get_random_character_death", &[&user_id])?; @@ -1648,39 +1330,27 @@ impl EventsWorker { } }; - let first_name = rows.first().and_then(|r| r.get("first_name")).cloned(); - let last_name = rows.first().and_then(|r| r.get("last_name")).cloned(); - // Verwende die existierende Logik zum Löschen von Charakteren // (ähnlich wie in CharacterCreationWorker) Self::handle_character_death(pool, broker, character_id)?; - Ok((character_id, first_name, last_name)) + Ok(character_id) } fn apply_regional_character_health_change( conn: &mut DbConnection, - pool: &ConnectionPool, - broker: &MessageBroker, region_id: i32, - affected_probability: f64, min_change: i32, max_change: i32, rng: &mut impl Rng, - ) -> Result<(Vec<(i32, i32)>, Vec), DbError> { + ) -> Result, DbError> { // Hole alle lebenden Charaktere in der Region conn.prepare("get_region_characters", QUERY_GET_REGION_CHARACTERS)?; let rows = conn.execute("get_region_characters", &[®ion_id])?; let mut affected_characters = Vec::new(); - let mut dead_characters = Vec::new(); for row in rows { - // Nur ein Teil der Charaktere soll betroffen sein (z.B. 80% bei Epidemie). - if rng.gen_range(0.0..=1.0) >= affected_probability { - continue; - } - let character_id: Option = row .get("id") .and_then(|v| v.parse::().ok()); @@ -1703,24 +1373,16 @@ impl EventsWorker { conn.execute("update_health_regional", &[&new_health, &character_id])?; affected_characters.push((character_id, health_change)); - - if new_health <= 0 { - if Self::handle_character_death(pool, broker, character_id).is_ok() { - dead_characters.push(character_id); - } - } } - Ok((affected_characters, dead_characters)) + Ok(affected_characters) } fn apply_regional_character_death( conn: &mut DbConnection, region_id: i32, - death_probability: f64, pool: &ConnectionPool, broker: &MessageBroker, - rng: &mut impl Rng, ) -> Result, DbError> { // Hole alle lebenden Charaktere in der Region conn.prepare("get_region_characters_death", QUERY_GET_REGION_CHARACTERS)?; @@ -1729,11 +1391,6 @@ impl EventsWorker { let mut dead_characters = Vec::new(); for row in rows { - // Chance pro Charakter (z.B. 10% bei Epidemie), nicht global. - if rng.gen_range(0.0..=1.0) >= death_probability { - continue; - } - let character_id: Option = row .get("id") .and_then(|v| v.parse::().ok()); @@ -1771,54 +1428,19 @@ impl EventsWorker { .get("employer_user_id") .and_then(|v| v.parse::().ok()) { - Self::notify_user(pool, broker, user_id, "director_death", None)?; + Self::notify_user(pool, broker, user_id, "director_death")?; } } // 2) Relationships löschen und betroffene User benachrichtigen conn.prepare("delete_relationship", QUERY_DELETE_RELATIONSHIP)?; let rel_result = conn.execute("delete_relationship", &[&character_id])?; - - // Logging: Anzahl gelöschter Relationships - let deleted_count = rel_result.len(); - if deleted_count > 0 { - eprintln!( - "[EventsWorker] {} Relationship(s) gelöscht für character_id={}", - deleted_count, character_id - ); - } - for row in rel_result { - let related_user_id = row + if let Some(related_user_id) = row .get("related_user_id") - .and_then(|v| v.parse::().ok()); - let related_character_id = row - .get("related_character_id") - .and_then(|v| v.parse::().ok()); - let relationship_type_tr = row - .get("relationship_type_tr") - .map(|s| s.to_string()); - - // Logging: Relationship wurde gelöscht - eprintln!( - "[EventsWorker] Relationship gelöscht: character_id={}, related_character_id={:?}, related_user_id={:?}, relationship_type={:?}", - character_id, - related_character_id, - related_user_id, - relationship_type_tr - ); - - if let Some(uid) = related_user_id { - // Spezielle Notification für Verlobungen - if relationship_type_tr.as_deref() == Some("engaged") { - let notification_json = serde_json::json!({ - "tr": "relationship.engaged_character_death", - "character_id": related_character_id - }); - Self::notify_user(pool, broker, uid, ¬ification_json.to_string(), related_character_id)?; - } else { - Self::notify_user(pool, broker, uid, "relationship_death", None)?; - } + .and_then(|v| v.parse::().ok()) + { + Self::notify_user(pool, broker, related_user_id, "relationship_death")?; } } @@ -1846,13 +1468,13 @@ impl EventsWorker { .get("father_user_id") .and_then(|v| v.parse::().ok()) { - Self::notify_user(pool, broker, father_user_id, "child_death", None)?; + Self::notify_user(pool, broker, father_user_id, "child_death")?; } if let Some(mother_user_id) = row .get("mother_user_id") .and_then(|v| v.parse::().ok()) { - Self::notify_user(pool, broker, mother_user_id, "child_death", None)?; + Self::notify_user(pool, broker, mother_user_id, "child_death")?; } } @@ -1879,51 +1501,23 @@ impl EventsWorker { .and_then(|r| r.get("child_character_id")) .and_then(|v| v.parse::().ok()); - // Kein Kind als Erbe vorhanden? Dann fallback: zufälliger NPC-Character aus der Region, - // Alter 10–14 Tage. let heir_id = match heir_id { Some(id) if id > 0 => id, _ => { - conn.prepare("random_heir_region", QUERY_GET_RANDOM_HEIR_FROM_REGION)?; - let rows = conn.execute("random_heir_region", &[&deceased_character_id])?; - rows.first() - .and_then(|r| r.get("child_character_id")) - .and_then(|v| v.parse::().ok()) - .unwrap_or(0) + // Kein Erbe gefunden - Vermögen geht verloren + eprintln!( + "[EventsWorker] Kein Erbe für Charakter {} gefunden, Vermögen geht verloren", + deceased_character_id + ); + return Ok(()); } }; - if heir_id <= 0 { - eprintln!( - "[EventsWorker] Kein Erbe für Charakter {} gefunden (weder Kind noch Fallback in Region). User {} hat danach keinen Character.", - deceased_character_id, falukant_user_id - ); - return Ok(()); - } - - // 2) Wichtig: erst die alte User-Zuordnung am verstorbenen Charakter lösen. - // Falls es einen Unique-Constraint auf `character.user_id` gibt, würde das - // direkte Setzen am Erben sonst fehlschlagen. - conn.prepare("clear_character_user", QUERY_CLEAR_CHARACTER_USER)?; - if let Err(err) = conn.execute("clear_character_user", &[&deceased_character_id]) { - eprintln!( - "[EventsWorker] Erben-Logik: Konnte user_id vom verstorbenen Charakter {} nicht lösen: {}", - deceased_character_id, err - ); - } - - // 3) Setze den Erben als neuen Spieler-Charakter + // 2) Setze den Erben als neuen Spieler-Charakter conn.prepare("set_character_user", QUERY_SET_CHARACTER_USER)?; - if let Err(err) = conn.execute("set_character_user", &[&falukant_user_id, &heir_id]) { - eprintln!( - "[EventsWorker] Erben-Logik: Konnte user_id={} nicht auf Erben {} setzen (verstorbener Charakter {}): {}", - falukant_user_id, heir_id, deceased_character_id, err - ); - // Abbrechen, damit wir nicht still ohne Character weiterlaufen. - return Err(err); - } + conn.execute("set_character_user", &[&falukant_user_id, &heir_id])?; - // 4) Berechne das neue Vermögen basierend auf dem gesamten Vermögen + // 3) Berechne das neue Vermögen basierend auf dem gesamten Vermögen // Hole alle Vermögenswerte (analog zu UserCharacterWorker::calculate_new_money) conn.prepare("get_current_money", QUERY_GET_CURRENT_MONEY)?; conn.prepare("get_house_value", QUERY_GET_HOUSE_VALUE)?; @@ -2015,13 +1609,18 @@ impl EventsWorker { broker: &MessageBroker, user_id: i32, event_type: &str, - character_id: Option, ) -> Result<(), DbError> { - // DB-Notification (zentralisiert). Historisch wird hier als `tr` der event_type-String gespeichert. - insert_notification(pool, user_id, event_type, character_id)?; + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - // Frontend-Update (zentralisiert) - publish_update_status(broker, user_id); + conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?; + conn.execute("insert_notification", &[&user_id, &event_type])?; + + // falukantUpdateStatus + let update_message = + format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); + broker.publish(update_message); // ursprüngliche Benachrichtigung let message = diff --git a/src/worker/house.rs b/src/worker/house.rs index 988646f..28ec5ab 100644 --- a/src/worker/house.rs +++ b/src/worker/house.rs @@ -10,10 +10,7 @@ use crate::worker::sql::{ QUERY_ADD_NEW_BUYABLE_HOUSE, QUERY_UPDATE_BUYABLE_HOUSE_STATE, QUERY_UPDATE_USER_HOUSE_STATE, - QUERY_DELETE_COLLAPSED_BUYABLE_HOUSES, - QUERY_DELETE_COLLAPSED_USER_HOUSES, }; -use crate::worker::{insert_notification_conn, publish_update_status}; pub struct HouseWorker { base: BaseWorker, @@ -26,7 +23,7 @@ impl HouseWorker { } } - fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { + fn run_loop(pool: ConnectionPool, _broker: MessageBroker, state: Arc) { let mut last_hourly_run: Option = None; let mut last_daily_run: Option = None; @@ -43,9 +40,6 @@ impl HouseWorker { if let Err(err) = Self::perform_task_inner(&pool) { eprintln!("[HouseWorker] Fehler in performTask: {err}"); } - if let Err(err) = Self::cleanup_collapsed_houses(&pool, &broker) { - eprintln!("[HouseWorker] Fehler in cleanup_collapsed_houses: {err}"); - } last_hourly_run = Some(now); } @@ -59,10 +53,6 @@ impl HouseWorker { if let Err(err) = Self::perform_house_state_change_inner(&pool) { eprintln!("[HouseWorker] Fehler in performHouseStateChange: {err}"); } - // Nach der täglichen Verschlechterung direkt zusammengefallene Häuser aufräumen. - if let Err(err) = Self::cleanup_collapsed_houses(&pool, &broker) { - eprintln!("[HouseWorker] Fehler in cleanup_collapsed_houses: {err}"); - } last_daily_run = Some(now); } @@ -111,57 +101,6 @@ impl HouseWorker { Ok(()) } - - fn cleanup_collapsed_houses(pool: &ConnectionPool, broker: &MessageBroker) -> Result<(), DbError> { - let mut conn = pool - .get() - .map_err(|e| DbError::new(format!("[HouseWorker] DB-Verbindung fehlgeschlagen: {e}")))?; - - conn.prepare( - "delete_collapsed_buyable_houses", - QUERY_DELETE_COLLAPSED_BUYABLE_HOUSES, - )?; - conn.prepare( - "delete_collapsed_user_houses", - QUERY_DELETE_COLLAPSED_USER_HOUSES, - )?; - - // 1) Kaufbare Häuser (ohne Besitzer) löschen - let deleted_buyable = conn.execute("delete_collapsed_buyable_houses", &[])?; - if !deleted_buyable.is_empty() { - eprintln!( - "[HouseWorker] {} kaufbare(s) Haus/Häuser zusammengefallen und entfernt", - deleted_buyable.len() - ); - } - - // 2) Häuser im Besitz löschen + Owner informieren - let deleted_user_houses = conn.execute("delete_collapsed_user_houses", &[])?; - if deleted_user_houses.is_empty() { - return Ok(()); - } - - for row in deleted_user_houses { - let house_id = row - .get("house_id") - .and_then(|v| v.parse::().ok()) - .unwrap_or(-1); - let user_id = row - .get("user_id") - .and_then(|v| v.parse::().ok()) - .unwrap_or(-1); - if house_id < 0 || user_id < 0 { - continue; - } - - // Notification + Frontend-Update - let tr = format!(r#"{{"tr":"house.collapsed","house_id":{}}}"#, house_id); - let _ = insert_notification_conn(&mut conn, user_id, &tr, None); - publish_update_status(broker, user_id); - } - - Ok(()) - } } impl Worker for HouseWorker { diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 76a02ae..317d6ae 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -12,7 +12,6 @@ mod transport; mod weather; mod events; mod sql; -mod notify; pub use base::Worker; pub use crate::db::ConnectionPool; @@ -28,5 +27,4 @@ pub use user_character::UserCharacterWorker; pub use transport::TransportWorker; pub use weather::WeatherWorker; pub use events::EventsWorker; -pub use notify::{insert_notification, insert_notification_conn, publish_update_status}; diff --git a/src/worker/politics.rs b/src/worker/politics.rs index 5a014c1..75934b9 100644 --- a/src/worker/politics.rs +++ b/src/worker/politics.rs @@ -4,6 +4,7 @@ use std::collections::HashSet; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; +use chrono::{Local, Timelike}; use super::base::{BaseWorker, Worker, WorkerState}; use crate::worker::sql::{ @@ -22,6 +23,15 @@ use crate::worker::sql::{ QUERY_GET_USERS_WITH_FILLED_OFFICES, QUERY_PROCESS_ELECTIONS, QUERY_TRIM_EXCESS_OFFICES_GLOBAL, + QUERY_FIND_AVAILABLE_CHURCH_OFFICES, + QUERY_FIND_CHURCH_SUPERVISOR, + QUERY_GET_CHURCH_OFFICE_REQUIREMENTS, + QUERY_GET_PENDING_CHURCH_APPLICATIONS, + QUERY_CHECK_CHARACTER_ELIGIBILITY, + QUERY_APPROVE_CHURCH_APPLICATION, + QUERY_REJECT_CHURCH_APPLICATION, + QUERY_CREATE_CHURCH_APPLICATION_JOB, + QUERY_GET_CHARACTERS_FOR_CHURCH_OFFICE, }; pub struct PoliticsWorker { @@ -57,6 +67,32 @@ struct Office { region_id: i32, } +#[derive(Debug, Clone)] +struct AvailableChurchOffice { + office_type_id: i32, + seats_per_region: i32, + region_id: i32, + occupied_seats: i32, +} + +#[derive(Debug, Clone)] +struct ChurchSupervisor { + supervisor_character_id: i32, +} + +#[derive(Debug, Clone)] +struct ChurchOfficeRequirement { + prerequisite_office_type_id: Option, + min_title_level: Option, +} + +#[derive(Debug, Clone)] +struct ChurchApplication { + application_id: i32, + office_type_id: i32, + applicant_character_id: i32, +} + // --- SQL-Konstanten (1:1 aus politics_worker.h übernommen) ------------------ @@ -69,6 +105,7 @@ impl PoliticsWorker { fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { let mut last_execution: Option = None; + let mut last_church_office_run: Option = None; while state.running_worker.load(Ordering::Relaxed) { let now = Instant::now(); @@ -84,6 +121,25 @@ impl PoliticsWorker { last_execution = Some(now); } + // Church Office Job um 13 Uhr + if Self::is_time_13_00() { + let should_run_church = match last_church_office_run { + None => true, + Some(prev) => { + // Prüfe ob seit letztem Lauf mindestens 23 Stunden vergangen sind + // (um sicherzustellen, dass es nur einmal pro Tag läuft) + now.saturating_duration_since(prev) >= Duration::from_secs(23 * 3600) + } + }; + + if should_run_church { + if let Err(err) = Self::perform_church_office_task(&pool, &broker) { + eprintln!("[PoliticsWorker] Fehler bei performChurchOfficeTask: {err}"); + } + last_church_office_run = Some(now); + } + } + // Entspricht ungefähr der 5-Sekunden-Schleife im C++-Code for _ in 0..5 { if !state.running_worker.load(Ordering::Relaxed) { @@ -94,6 +150,14 @@ impl PoliticsWorker { } } + /// Prüft ob die aktuelle Uhrzeit 13:00 ist (mit Toleranz von ±1 Minute) + fn is_time_13_00() -> bool { + let now = Local::now(); + let hour = now.hour(); + let minute = now.minute(); + hour == 13 && minute <= 1 + } + fn perform_daily_politics_task( pool: &ConnectionPool, broker: &MessageBroker, @@ -553,6 +617,415 @@ impl PoliticsWorker { .filter_map(map_row_to_office) .collect()) } + + /// Verarbeitet Church Office Jobs um 13 Uhr: + /// - Findet verfügbare Positionen + /// - Verarbeitet bestehende Bewerbungen + /// - Erstellt neue Bewerbungen falls nötig + fn perform_church_office_task( + pool: &ConnectionPool, + broker: &MessageBroker, + ) -> Result<(), DbError> { + eprintln!("[PoliticsWorker] Starte Church Office Task um 13 Uhr"); + + // 1) Verfügbare Church Office Positionen finden + let available_offices = Self::find_available_church_offices(pool)?; + eprintln!( + "[PoliticsWorker] Gefunden: {} verfügbare Church Office Positionen", + available_offices.len() + ); + + // 2) Für jede verfügbare Position Bewerbungen verarbeiten + for office in &available_offices { + // Supervisor finden + if let Some(supervisor) = Self::find_church_supervisor(pool, office.region_id, office.office_type_id)? { + // Bestehende Bewerbungen für diesen Supervisor verarbeiten + Self::process_church_applications(pool, broker, supervisor.supervisor_character_id)?; + + // Falls noch Plätze frei sind, neue Bewerbungen erstellen + let remaining_seats = office.seats_per_region - office.occupied_seats; + if remaining_seats > 0 { + Self::create_church_application_jobs( + pool, + office.office_type_id, + office.region_id, + supervisor.supervisor_character_id, + remaining_seats, + )?; + } + } else { + eprintln!( + "[PoliticsWorker] Kein Supervisor gefunden für office_type_id={}, region_id={}", + office.office_type_id, office.region_id + ); + } + } + + eprintln!("[PoliticsWorker] Church Office Task abgeschlossen"); + Ok(()) + } + + fn find_available_church_offices( + pool: &ConnectionPool, + ) -> Result, DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + conn.prepare( + "find_available_church_offices", + QUERY_FIND_AVAILABLE_CHURCH_OFFICES, + ) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] prepare find_available_church_offices: {e}" + )) + })?; + let rows = conn + .execute("find_available_church_offices", &[]) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec find_available_church_offices: {e}" + )) + })?; + + let mut offices = Vec::new(); + for row in rows { + let office_type_id = parse_i32(&row, "office_type_id", -1); + let region_id = parse_i32(&row, "region_id", -1); + let seats_per_region = parse_i32(&row, "seats_per_region", 0); + let occupied_seats = parse_i32(&row, "occupied_seats", 0); + + if office_type_id >= 0 && region_id >= 0 { + offices.push(AvailableChurchOffice { + office_type_id, + seats_per_region, + region_id, + occupied_seats, + }); + } + } + + Ok(offices) + } + + fn find_church_supervisor( + pool: &ConnectionPool, + region_id: i32, + office_type_id: i32, + ) -> Result, DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + conn.prepare("find_church_supervisor", QUERY_FIND_CHURCH_SUPERVISOR) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] prepare find_church_supervisor: {e}" + )) + })?; + let rows = conn + .execute("find_church_supervisor", &[®ion_id, &office_type_id]) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec find_church_supervisor: {e}" + )) + })?; + + for row in rows { + let supervisor_character_id = parse_i32(&row, "supervisor_character_id", -1); + + if supervisor_character_id >= 0 { + return Ok(Some(ChurchSupervisor { + supervisor_character_id, + })); + } + } + + Ok(None) + } + + fn process_church_applications( + pool: &ConnectionPool, + broker: &MessageBroker, + supervisor_id: i32, + ) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + // Bewerbungen für diesen Supervisor abrufen + conn.prepare( + "get_pending_church_applications", + QUERY_GET_PENDING_CHURCH_APPLICATIONS, + ) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] prepare get_pending_church_applications: {e}" + )) + })?; + let rows = conn + .execute("get_pending_church_applications", &[&supervisor_id]) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec get_pending_church_applications: {e}" + )) + })?; + + let mut applications = Vec::new(); + for row in rows { + let application_id = parse_i32(&row, "application_id", -1); + let office_type_id = parse_i32(&row, "office_type_id", -1); + let applicant_character_id = parse_i32(&row, "applicant_character_id", -1); + + if application_id >= 0 { + applications.push(ChurchApplication { + application_id, + office_type_id, + applicant_character_id, + }); + } + } + + // Voraussetzungen prüfen und Bewerbungen verarbeiten + conn.prepare( + "get_church_office_requirements", + QUERY_GET_CHURCH_OFFICE_REQUIREMENTS, + ) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] prepare get_church_office_requirements: {e}" + )) + })?; + + conn.prepare( + "check_character_eligibility", + QUERY_CHECK_CHARACTER_ELIGIBILITY, + ) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] prepare check_character_eligibility: {e}" + )) + })?; + + conn.prepare("approve_church_application", QUERY_APPROVE_CHURCH_APPLICATION) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] prepare approve_church_application: {e}" + )) + })?; + + conn.prepare("reject_church_application", QUERY_REJECT_CHURCH_APPLICATION) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] prepare reject_church_application: {e}" + )) + })?; + + for app in &applications { + // Voraussetzungen für dieses Amt abrufen + let req_rows = conn + .execute("get_church_office_requirements", &[&app.office_type_id]) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec get_church_office_requirements: {e}" + )) + })?; + + let mut requirements = Vec::new(); + for req_row in req_rows { + let prerequisite_office_type_id = req_row + .get("prerequisite_office_type_id") + .and_then(|v| v.parse::().ok()); + let min_title_level = req_row + .get("min_title_level") + .and_then(|v| v.parse::().ok()); + + requirements.push(ChurchOfficeRequirement { + prerequisite_office_type_id, + min_title_level, + }); + } + + // Prüfe ob Character die Voraussetzungen erfüllt + let mut eligible = true; + for req in &requirements { + let elig_rows = conn + .execute( + "check_character_eligibility", + &[ + &app.applicant_character_id, + &req.prerequisite_office_type_id, + &req.min_title_level, + ], + ) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec check_character_eligibility: {e}" + )) + })?; + + for elig_row in elig_rows { + let has_prerequisite: bool = elig_row + .get("has_prerequisite") + .and_then(|v| v.parse::().ok()) + .unwrap_or(false); + let meets_title_requirement: bool = elig_row + .get("meets_title_requirement") + .and_then(|v| v.parse::().ok()) + .unwrap_or(false); + + if !has_prerequisite || !meets_title_requirement { + eligible = false; + break; + } + } + } + + // Bewerbung genehmigen oder ablehnen + if eligible { + let approve_rows = conn + .execute("approve_church_application", &[&app.application_id]) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec approve_church_application: {e}" + )) + })?; + + if !approve_rows.is_empty() { + eprintln!( + "[PoliticsWorker] Church Application {} genehmigt (office_type_id={}, character_id={})", + app.application_id, app.office_type_id, app.applicant_character_id + ); + + // Benachrichtigung senden + if let Some(user_id) = Self::get_user_id_for_character(pool, app.applicant_character_id)? { + let msg = format!( + r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, + user_id + ); + broker.publish(msg); + } + } + } else { + conn.execute("reject_church_application", &[&app.application_id]) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec reject_church_application: {e}" + )) + })?; + eprintln!( + "[PoliticsWorker] Church Application {} abgelehnt (Voraussetzungen nicht erfüllt)", + app.application_id + ); + } + } + + Ok(()) + } + + fn create_church_application_jobs( + pool: &ConnectionPool, + office_type_id: i32, + region_id: i32, + supervisor_id: i32, + count: i32, + ) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + // Charaktere für diese Region finden + conn.prepare( + "get_characters_for_church_office", + QUERY_GET_CHARACTERS_FOR_CHURCH_OFFICE, + ) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] prepare get_characters_for_church_office: {e}" + )) + })?; + + let rows = conn + .execute("get_characters_for_church_office", &[®ion_id, &count]) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec get_characters_for_church_office: {e}" + )) + })?; + + conn.prepare( + "create_church_application_job", + QUERY_CREATE_CHURCH_APPLICATION_JOB, + ) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] prepare create_church_application_job: {e}" + )) + })?; + + let mut created = 0; + for row in rows { + if let Some(character_id) = row + .get("character_id") + .and_then(|v| v.parse::().ok()) + { + let app_rows = conn + .execute( + "create_church_application_job", + &[&office_type_id, &character_id, ®ion_id, &supervisor_id], + ) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec create_church_application_job: {e}" + )) + })?; + + if !app_rows.is_empty() { + created += 1; + eprintln!( + "[PoliticsWorker] Church Application Job erstellt: office_type_id={}, character_id={}, region_id={}", + office_type_id, character_id, region_id + ); + } + } + } + + eprintln!( + "[PoliticsWorker] {} Church Application Jobs erstellt für office_type_id={}, region_id={}", + created, office_type_id, region_id + ); + + Ok(()) + } + + fn get_user_id_for_character( + pool: &ConnectionPool, + character_id: i32, + ) -> Result, DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + let query = "SELECT user_id FROM falukant_data.character WHERE id = $1"; + conn.prepare("get_user_id_for_character", query) + .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_user_id_for_character: {e}")))?; + let rows = conn + .execute("get_user_id_for_character", &[&character_id]) + .map_err(|e| { + DbError::new(format!( + "[PoliticsWorker] exec get_user_id_for_character: {e}" + )) + })?; + + for row in rows { + if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::().ok()) { + return Ok(Some(user_id)); + } + } + + Ok(None) + } } impl Worker for PoliticsWorker { diff --git a/src/worker/produce.rs b/src/worker/produce.rs index 7094edb..16c8d24 100644 --- a/src/worker/produce.rs +++ b/src/worker/produce.rs @@ -1,7 +1,6 @@ use crate::db::{Row, Rows}; use crate::message_broker::MessageBroker; use std::cmp::min; -use std::sync::Mutex; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -14,12 +13,8 @@ use crate::worker::sql::{ QUERY_DELETE_PRODUCTION, QUERY_INSERT_INVENTORY, QUERY_INSERT_UPDATE_PRODUCTION_LOG, - QUERY_SET_PRODUCTION_SLEEP, - QUERY_GET_SLEEP_PRODUCTIONS, + QUERY_ADD_OVERPRODUCTION_NOTIFICATION, }; -use crate::worker::insert_notification_conn; - -const QUERY_COUNT_OPEN_PRODUCTIONS: &str = r#"SELECT COUNT(*) AS cnt FROM falukant_data.production;"#; /// Abbildet eine abgeschlossene Produktion aus der Datenbank. #[derive(Debug, Clone)] @@ -45,8 +40,6 @@ struct StockInfo { pub struct ProduceWorker { base: BaseWorker, last_iteration: Option, - last_start_fix: Option, - last_sleep_check: Option, } impl ProduceWorker { @@ -54,8 +47,6 @@ impl ProduceWorker { Self { base: BaseWorker::new("ProduceWorker", pool, broker), last_iteration: None, - last_start_fix: None, - last_sleep_check: None, } } @@ -76,8 +67,6 @@ impl ProduceWorker { self.base.set_current_step("Process Productions"); self.process_productions(); - self.base.set_current_step("Process Sleep Productions"); - self.process_sleep_productions(); self.base.set_current_step("Signal Activity"); // TODO: Später Analogie zu signalActivity() aus der C++-Basisklasse herstellen. self.base.set_current_step("Loop Done"); @@ -123,20 +112,6 @@ impl ProduceWorker { } fn process_productions(&mut self) { - // Heartbeat (damit sichtbar ist, dass der Worker läuft), max. 1x/Stunde. - static LAST_HEARTBEAT: Mutex> = Mutex::new(None); - { - let mut last = LAST_HEARTBEAT.lock().unwrap(); - let should_log = last - .map(|t| t.elapsed().as_secs() >= 3600) - .unwrap_or(true); - if should_log { - eprintln!("[ProduceWorker] Heartbeat: alive"); - *last = Some(Instant::now()); - } - } - - self.maybe_fix_null_production_starts(); self.base .set_current_step("Fetch Finished Productions"); @@ -148,50 +123,6 @@ impl ProduceWorker { } }; - // Debug: Gedrosselt loggen, wenn nie etwas "fertig" wird. - if finished_productions.is_empty() { - static LAST_EMPTY_LOG: Mutex> = Mutex::new(None); - let mut last = LAST_EMPTY_LOG.lock().unwrap(); - let should_log = last - .map(|t| t.elapsed().as_secs() >= 60) - .unwrap_or(true); - if should_log { - let mut conn = match self.base.pool.get() { - Ok(c) => c, - Err(e) => { - eprintln!("[ProduceWorker] DB-Verbindung fehlgeschlagen (debug): {e}"); - return; - } - }; - - let open_cnt = (|| -> Result { - conn.prepare("count_open_productions", QUERY_COUNT_OPEN_PRODUCTIONS)?; - let rows = conn.execute("count_open_productions", &[])?; - Ok(rows - .first() - .and_then(|r| r.get("cnt")) - .and_then(|v| v.parse::().ok()) - .unwrap_or(0)) - })() - .unwrap_or(0); - - let mut sql_preview = QUERY_GET_FINISHED_PRODUCTIONS.to_string(); - const MAX_SQL_PREVIEW: usize = 1200; - if sql_preview.len() > MAX_SQL_PREVIEW { - sql_preview.truncate(MAX_SQL_PREVIEW); - sql_preview.push_str(" …"); - } - - eprintln!( - "[ProduceWorker] Keine fertigen Produktionen gefunden. Offene Produktionen in DB: {}. Query(get_finished_productions): {}", - open_cnt, - sql_preview - ); - - *last = Some(Instant::now()); - } - } - self.base .set_current_step("Process Finished Productions"); @@ -221,37 +152,12 @@ impl ProduceWorker { production_id, } = *production; - // Prüfe VOR dem Abschluss, ob genug Lagerplatz vorhanden ist - if self.has_enough_storage_capacity(branch_id, quantity) { - // Genug Platz: Produktion abschließen - if self.add_to_inventory(branch_id, product_id, quantity, quality, user_id) { - self.delete_production(production_id); - self.add_production_to_log(region_id, user_id, product_id, quantity); - } - } else { - // Nicht genug Platz: Produktion auf sleep setzen - if let Err(err) = self.set_production_sleep(production_id) { - eprintln!("[ProduceWorker] Fehler beim Setzen von sleep für Produktion {}: {}", production_id, err); - } + if self.add_to_inventory(branch_id, product_id, quantity, quality, user_id) { + self.delete_production(production_id); + self.add_production_to_log(region_id, user_id, product_id, quantity); } } - fn has_enough_storage_capacity(&self, branch_id: i32, required_quantity: i32) -> bool { - let stocks = match self.get_available_stocks(branch_id) { - Ok(rows) => rows, - Err(err) => { - eprintln!("[ProduceWorker] Fehler in getAvailableStocks: {err}"); - return false; - } - }; - - let total_free_capacity: i32 = stocks.iter() - .map(|stock| (stock.total_capacity - stock.filled).max(0)) - .sum(); - - total_free_capacity >= required_quantity - } - fn add_to_inventory( &mut self, branch_id: i32, @@ -395,37 +301,6 @@ impl ProduceWorker { conn.execute("get_finished_productions", &[]) } - fn maybe_fix_null_production_starts(&mut self) { - let now = Instant::now(); - let should_run = self - .last_start_fix - .map(|t| now.saturating_duration_since(t) >= Duration::from_secs(3600)) - .unwrap_or(true); - if !should_run { - return; - } - - self.last_start_fix = Some(now); - - let mut conn = match self.base.pool.get() { - Ok(c) => c, - Err(e) => { - eprintln!("[ProduceWorker] DB-Verbindung fehlgeschlagen (start_fix): {e}"); - return; - } - }; - - // best-effort: nur loggen, wenn es scheitert - use crate::worker::sql::QUERY_FIX_NULL_PRODUCTION_START; - if let Err(err) = conn.prepare("fix_null_production_start", QUERY_FIX_NULL_PRODUCTION_START) { - eprintln!("[ProduceWorker] Fehler prepare fix_null_production_start: {err}"); - return; - } - if let Err(err) = conn.execute("fix_null_production_start", &[]) { - eprintln!("[ProduceWorker] Fehler exec fix_null_production_start: {err}"); - } - } - fn load_available_stocks(&self, branch_id: i32) -> Result { let mut conn = self .base @@ -489,95 +364,6 @@ impl ProduceWorker { Ok(()) } - fn set_production_sleep(&self, production_id: i32) -> Result<(), crate::db::DbError> { - let mut conn = self - .base - .pool - .get() - .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - - conn.prepare("set_production_sleep", QUERY_SET_PRODUCTION_SLEEP)?; - conn.execute("set_production_sleep", &[&production_id])?; - Ok(()) - } - - fn process_sleep_productions(&mut self) { - const SLEEP_CHECK_INTERVAL_SECS: u64 = 300; // 5 Minuten - - let now = Instant::now(); - let should_check = match self.last_sleep_check { - None => { - self.last_sleep_check = Some(now); - true - } - Some(last) => { - if now.saturating_duration_since(last).as_secs() >= SLEEP_CHECK_INTERVAL_SECS { - self.last_sleep_check = Some(now); - true - } else { - false - } - } - }; - - if !should_check { - return; - } - - let sleep_productions = match self.get_sleep_productions() { - Ok(rows) => rows, - Err(err) => { - eprintln!("[ProduceWorker] Fehler beim Laden von sleep-Produktionen: {err}"); - Vec::new() - } - }; - - for production in sleep_productions { - self.handle_sleep_production(&production); - } - } - - fn get_sleep_productions(&self) -> Result, crate::db::DbError> { - let rows = self.load_sleep_productions()?; - Ok(rows - .into_iter() - .filter_map(Self::map_row_to_finished_production) - .collect()) - } - - fn load_sleep_productions(&self) -> Result { - let mut conn = self - .base - .pool - .get() - .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - - conn.prepare("get_sleep_productions", QUERY_GET_SLEEP_PRODUCTIONS)?; - conn.execute("get_sleep_productions", &[]) - } - - fn handle_sleep_production(&mut self, production: &FinishedProduction) { - let FinishedProduction { - branch_id, - product_id, - quantity, - quality, - user_id, - region_id, - production_id, - } = *production; - - // Prüfe erneut, ob jetzt genug Lagerplatz vorhanden ist - if self.has_enough_storage_capacity(branch_id, quantity) { - // Jetzt genug Platz: Produktion abschließen - if self.add_to_inventory(branch_id, product_id, quantity, quality, user_id) { - self.delete_production(production_id); - self.add_production_to_log(region_id, user_id, product_id, quantity); - } - } - // Wenn immer noch nicht genug Platz: Produktion bleibt auf sleep - } - fn insert_or_update_production_log( &self, region_id: i32, @@ -616,13 +402,22 @@ impl ProduceWorker { .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + conn.prepare( + "add_overproduction_notification", + QUERY_ADD_OVERPRODUCTION_NOTIFICATION, + )?; + // Zusätzlich zur Menge die Branch-ID in der Payload mitschicken, damit // das Frontend die Überproduktion einem konkreten Branch zuordnen kann. let notification = format!( r#"{{"tr":"production.overproduction","value":{},"branch_id":{}}}"#, remaining_quantity, branch_id ); - insert_notification_conn(&mut conn, user_id, ¬ification, None)?; + + conn.execute( + "add_overproduction_notification", + &[&user_id, ¬ification], + )?; Ok(()) } diff --git a/src/worker/sql.rs b/src/worker/sql.rs index d84dad8..4ef4004 100644 --- a/src/worker/sql.rs +++ b/src/worker/sql.rs @@ -14,12 +14,7 @@ SELECT id FROM falukant_data.falukant_user ORDER BY RANDOM() LIMIT 1; "#; pub const QUERY_GET_RANDOM_INFANT: &str = r#" -SELECT - c.id AS character_id, - c.user_id, - c.first_name, - c.last_name, - CURRENT_DATE - c.birthdate::date AS age_days +SELECT c.id AS character_id, c.user_id, CURRENT_DATE - c.birthdate::date AS age_days FROM falukant_data."character" c WHERE c.user_id IS NOT NULL AND c.health > 0 AND CURRENT_DATE - c.birthdate::date <= 730 ORDER BY RANDOM() LIMIT 1; @@ -35,62 +30,25 @@ SELECT DISTINCT b.falukant_user_id AS user_id FROM falukant_data.branch b WHERE pub const QUERY_UPDATE_WEATHER: &str = r#" WITH all_regions AS ( - SELECT DISTINCT r.id AS region_id - FROM falukant_data.region r - JOIN falukant_type.region tr ON r.region_type_id = tr.id - WHERE tr.label_tr = 'city' -), -random_weather AS ( - SELECT - ar.region_id, - ( - SELECT wt.id - FROM falukant_type.weather wt - ORDER BY RANDOM() - LIMIT 1 - ) AS weather_type_id - FROM all_regions ar + SELECT DISTINCT r.id AS region_id FROM falukant_data.region r JOIN falukant_type.region tr ON r.region_type_id = tr.id WHERE tr.label_tr = 'city' ) INSERT INTO falukant_data.weather (region_id, weather_type_id) -SELECT rw.region_id, rw.weather_type_id -FROM random_weather rw -ON CONFLICT (region_id) -DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id; +SELECT ar.region_id, (SELECT wt.id FROM falukant_type.weather wt ORDER BY random() + ar.region_id * 0 LIMIT 1) FROM all_regions ar +ON CONFLICT (region_id) DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id; "#; -/// Vollständige Notification-Insert-Query (inkl. `character_id`), damit Trigger/Schemaänderungen -/// nicht an fehlenden Spalten scheitern. -pub const QUERY_INSERT_NOTIFICATION_FULL: &str = r#" - INSERT INTO falukant_log.notification ( - user_id, - tr, - character_id, - shown, - created_at, - updated_at - ) VALUES ($1, $2, $3, FALSE, NOW(), NOW()); -"#; - -/// Löscht alte Notifications (älter als 30 Tage) -pub const QUERY_DELETE_OLD_NOTIFICATIONS: &str = r#" - DELETE FROM falukant_log.notification - WHERE created_at < NOW() - INTERVAL '30 days'; +pub const QUERY_INSERT_NOTIFICATION: &str = r#" +INSERT INTO falukant_log.notification (user_id, tr, shown, created_at, updated_at) +VALUES ($1, $2, FALSE, NOW(), NOW()); "#; // Product pricing pub const QUERY_GET_PRODUCT_COST: &str = r#" -SELECT sell_cost, sell_cost AS original_sell_cost FROM falukant_type.product WHERE id = $1; +SELECT original_sell_cost, sell_cost FROM falukant_type.product WHERE id = $1; "#; pub const QUERY_GET_DIRECTORS: &str = r#" -SELECT - d.may_produce, - d.may_sell, - d.may_start_transport, - d.may_repair_vehicles, - b.id AS branch_id, - fu.id AS falukant_user_id, - d.id +SELECT d.may_produce, d.may_sell, d.may_start_transport, b.id AS branch_id, fu.id AS falukantUserId, d.id FROM falukant_data.director d JOIN falukant_data.falukant_user fu ON fu.id = d.employer_user_id JOIN falukant_data.character c ON c.id = d.director_character_id @@ -98,116 +56,27 @@ JOIN falukant_data.branch b ON b.region_id = c.region_id AND b.falukant_user_id WHERE current_time BETWEEN '08:00:00' AND '17:00:00'; "#; -// Director: vehicle repair automation -pub const QUERY_GET_VEHICLES_TO_REPAIR_IN_REGION: &str = r#" -SELECT - v.id AS vehicle_id, - v.condition, - vt.cost, - COALESCE(vt.build_time_minutes, 0) AS build_time_minutes -FROM falukant_data.vehicle v -JOIN falukant_type.vehicle vt ON vt.id = v.vehicle_type_id -WHERE v.falukant_user_id = $1 - AND v.region_id = $2 - AND v.condition < 10 - AND v.condition < 100 - AND v.available_from <= NOW() - AND NOT EXISTS ( - SELECT 1 FROM falukant_data.transport t WHERE t.vehicle_id = v.id - ) -ORDER BY v.condition ASC, v.id ASC; -"#; - -pub const QUERY_REPAIR_VEHICLE: &str = r#" -UPDATE falukant_data.vehicle v - SET condition = 100, - available_from = NOW() + (COALESCE(vt.build_time_minutes, 0) * INTERVAL '1 minute'), - updated_at = NOW() - FROM falukant_type.vehicle vt - WHERE vt.id = v.vehicle_type_id - AND v.id = $1 - AND v.condition < 100 - AND v.available_from <= NOW() - AND NOT EXISTS (SELECT 1 FROM falukant_data.transport t WHERE t.vehicle_id = v.id) - RETURNING v.id AS vehicle_id, v.available_from, COALESCE(vt.build_time_minutes, 0) AS build_time_minutes; -"#; - pub const QUERY_GET_BEST_PRODUCTION: &str = r#" -SELECT - fdu.id AS falukant_user_id, - CAST(fdu.money AS text) AS money, - fdu.certificate, - ftp.id AS product_id, - ftp.label_tr, - fdb.region_id, - (SELECT SUM(quantity) FROM falukant_data.stock fds WHERE fds.branch_id = fdb.id) AS stock_size, - COALESCE( - (SELECT SUM(COALESCE(fdi.quantity, 0)) - FROM falukant_data.stock fds - JOIN falukant_data.inventory fdi ON fdi.stock_id = fds.id - WHERE fds.branch_id = fdb.id), - 0 - ) AS used_in_stock, - ( - ftp.sell_cost - * ( - COALESCE(tpw.worth_percent, 100.0) - + ( - (COALESCE(k_char.knowledge, 0) * 2 + COALESCE(k_dir.knowledge, 0))::double precision / 3.0 - ) - ) / 100.0 - - 6 * ftp.category - ) / (300.0 * GREATEST(1, ftp.production_time)) AS worth, - fdb.id AS branch_id, - (SELECT COUNT(id) FROM falukant_data.production WHERE branch_id = fdb.id) AS running_productions, - COALESCE( - (SELECT SUM(COALESCE(fdp.quantity, 0)) - FROM falukant_data.production fdp - WHERE fdp.branch_id = fdb.id), - 0 - ) AS running_productions_quantity +SELECT fdu.id falukant_user_id, CAST(fdu.money AS text) AS money, fdu.certificate, ftp.id product_id, ftp.label_tr, fdb.region_id, +(SELECT SUM(quantity) FROM falukant_data.stock fds WHERE fds.branch_id = fdb.id) AS stock_size, +COALESCE((SELECT SUM(COALESCE(fdi.quantity, 0)) FROM falukant_data.stock fds JOIN falukant_data.inventory fdi ON fdi.stock_id = fds.id WHERE fds.branch_id = fdb.id), 0) AS used_in_stock, +(ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category) / (300.0 * ftp.production_time) AS worth, +fdb.id AS branch_id, (SELECT COUNT(id) FROM falukant_data.production WHERE branch_id = fdb.id) AS running_productions, +COALESCE((SELECT SUM(COALESCE(fdp.quantity, 0)) quantity FROM falukant_data.production fdp WHERE fdp.branch_id = fdb.id), 0) AS running_productions_quantity FROM falukant_data.director fdd -JOIN falukant_data."character" fdc - ON fdc.id = fdd.director_character_id -JOIN falukant_data.falukant_user fdu - ON fdd.employer_user_id = fdu.id -LEFT JOIN falukant_data."character" user_character - ON user_character.user_id = fdu.id - AND user_character.health > 0 -JOIN falukant_data.branch fdb - ON fdb.falukant_user_id = fdu.id - AND fdb.region_id = fdc.region_id -JOIN falukant_type.product ftp - ON ftp.category <= fdu.certificate -LEFT JOIN falukant_data.town_product_worth tpw - ON tpw.region_id = fdb.region_id - AND tpw.product_id = ftp.id -LEFT JOIN falukant_data.knowledge k_char - ON k_char.product_id = ftp.id - AND user_character.id IS NOT NULL - AND k_char.character_id = user_character.id -LEFT JOIN falukant_data.knowledge k_dir - ON k_dir.product_id = ftp.id - AND k_dir.character_id = fdd.director_character_id -WHERE fdd.id = $1 - AND fdb.id = $2 -ORDER BY worth DESC -LIMIT 1; +JOIN falukant_data.character fdc ON fdc.id = fdd.director_character_id +JOIN falukant_data.falukant_user fdu ON fdd.employer_user_id = fdu.id +JOIN falukant_data.character user_character ON user_character.user_id = fdu.id +JOIN falukant_data.branch fdb ON fdb.falukant_user_id = fdu.id AND fdb.region_id = fdc.region_id +JOIN falukant_data.town_product_worth fdtpw ON fdtpw.region_id = fdb.region_id +JOIN falukant_data.knowledge fdk_character ON fdk_character.product_id = fdtpw.product_id AND fdk_character.character_id = user_character.id +JOIN falukant_data.knowledge fdk_director ON fdk_director.product_id = fdtpw.product_id AND fdk_director.character_id = fdd.director_character_id +JOIN falukant_type.product ftp ON ftp.id = fdtpw.product_id AND ftp.category <= fdu.certificate +WHERE fdd.id = $1 AND fdb.id = $2 ORDER BY worth DESC LIMIT 1; "#; pub const QUERY_INSERT_PRODUCTION: &str = r#" -INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id, start_timestamp) -VALUES ($1, $2, $3, (SELECT weather_type_id FROM falukant_data.weather WHERE region_id = $4), NOW()); -"#; - -// Reparatur: falls Produktionen ohne start_timestamp angelegt wurden, werden sie so gesetzt, -// dass sie sofort als „fertig“ gelten (start = NOW() - production_time Minuten). -pub const QUERY_FIX_NULL_PRODUCTION_START: &str = r#" - UPDATE falukant_data.production p - SET start_timestamp = NOW() - (INTERVAL '1 minute' * pr.production_time) - FROM falukant_type.product pr - WHERE p.start_timestamp IS NULL - AND pr.id = p.product_id; +INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id) VALUES ($1, $2, $3, (SELECT weather_type_id FROM falukant_data.weather WHERE region_id = $4)); "#; // Character creation related queries (missing from earlier extraction) @@ -321,12 +190,8 @@ DELETE FROM falukant_data.inventory WHERE id = $1; "#; pub const QUERY_ADD_SELL_LOG: &str = r#" -INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id, sell_timestamp) -VALUES ($1, $2, $3, $4, NOW()) -ON CONFLICT (seller_id, product_id, region_id) -DO UPDATE SET - quantity = falukant_log.sell.quantity + EXCLUDED.quantity, - sell_timestamp = COALESCE(EXCLUDED.sell_timestamp, NOW()); +INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id) VALUES ($1, $2, $3, $4) +ON CONFLICT (region_id, product_id, seller_id) DO UPDATE SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity; "#; pub const QUERY_GET_ARRIVED_TRANSPORTS: &str = r#" SELECT @@ -338,37 +203,15 @@ SELECT t.target_region_id, b_target.id AS target_branch_id, b_source.id AS source_branch_id, - COALESCE(rd.distance, 0.0) AS distance, + rd.distance AS distance, v.falukant_user_id AS user_id FROM falukant_data.transport AS t JOIN falukant_data.vehicle AS v ON v.id = t.vehicle_id JOIN falukant_type.vehicle AS vt ON vt.id = v.vehicle_type_id -LEFT JOIN LATERAL ( - SELECT rd.distance - FROM falukant_data.region_distance AS rd - WHERE ( - (rd.source_region_id = t.source_region_id AND rd.target_region_id = t.target_region_id) - OR (rd.source_region_id = t.target_region_id AND rd.target_region_id = t.source_region_id) - ) - AND ( - rd.transport_mode = vt.transport_mode - OR rd.transport_mode IS NULL - OR vt.transport_mode IS NULL - ) - ORDER BY - (rd.transport_mode = vt.transport_mode) DESC, - (rd.transport_mode IS NULL) DESC - LIMIT 1 -) AS rd ON TRUE +JOIN falukant_data.region_distance AS rd ON ((rd.source_region_id = t.source_region_id AND rd.target_region_id = t.target_region_id) OR (rd.source_region_id = t.target_region_id AND rd.target_region_id = t.source_region_id)) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL) LEFT JOIN falukant_data.branch AS b_target ON b_target.region_id = t.target_region_id AND b_target.falukant_user_id = v.falukant_user_id LEFT JOIN falukant_data.branch AS b_source ON b_source.region_id = t.source_region_id AND b_source.falukant_user_id = v.falukant_user_id -WHERE ( - -- Transport ist angekommen basierend auf Distanz und Geschwindigkeit (wenn speed > 0) - (vt.speed > 0 AND rd.distance IS NOT NULL AND t.created_at + (rd.distance / vt.speed::double precision) * INTERVAL '1 minute' <= NOW()) - OR - -- Fallback: Transport ist länger als 24 Stunden unterwegs (falls keine Distanz gefunden wurde oder speed = 0/NULL) - (t.created_at + INTERVAL '24 hours' <= NOW()) - ); +WHERE vt.speed > 0 AND t.created_at + (rd.distance / vt.speed::double precision) * INTERVAL '1 minute' <= NOW(); "#; pub const QUERY_GET_AVAILABLE_STOCKS: &str = r#" @@ -410,25 +253,16 @@ pub const QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT: &str = r#" UPDATE falukant_data.vehicle SET region_id = $2, condition = GREATEST(0, condition - $3::int), available_from = NOW(), updated_at = NOW() WHERE id = $1; "#; -// Vehicle maintenance (TransportWorker) -pub const QUERY_GET_BROKEN_VEHICLES: &str = r#" -SELECT id AS vehicle_id, falukant_user_id AS user_id - FROM falukant_data.vehicle - WHERE condition <= 0; -"#; - -pub const QUERY_DELETE_TRANSPORTS_BY_VEHICLE: &str = r#" -DELETE FROM falukant_data.transport WHERE vehicle_id = $1; -"#; - -pub const QUERY_DELETE_VEHICLE: &str = r#" -DELETE FROM falukant_data.vehicle WHERE id = $1; -"#; - pub const QUERY_DELETE_TRANSPORT: &str = r#" DELETE FROM falukant_data.transport WHERE id = $1; "#; +#[allow(dead_code)] +pub const QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION: &str = r#" +INSERT INTO falukant_log.notification (user_id, tr, shown, created_at, updated_at) +VALUES ((SELECT c.user_id FROM falukant_data.character c WHERE c.user_id = $1 LIMIT 1), $2, FALSE, NOW(), NOW()); +"#; + pub const QUERY_UPDATE_TRANSPORT_SIZE: &str = r#" UPDATE falukant_data.transport SET size = $2, @@ -477,13 +311,7 @@ SELECT v.id AS vehicle_id, vt.capacity AS capacity FROM falukant_data.vehicle v JOIN falukant_type.vehicle vt ON vt.id = v.vehicle_type_id JOIN falukant_data.region_distance rd ON ((rd.source_region_id = v.region_id AND rd.target_region_id = $3) OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id)) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL) -WHERE v.falukant_user_id = $1 - AND v.region_id = $2 - AND v.id NOT IN ( - SELECT DISTINCT t.vehicle_id - FROM falukant_data.transport t - WHERE t.vehicle_id IS NOT NULL - ); +WHERE v.falukant_user_id = $1 AND v.region_id = $2; "#; pub const QUERY_INSERT_TRANSPORT: &str = r#" @@ -521,6 +349,10 @@ WITH new_sats AS ( UPDATE falukant_data.director dir SET satisfaction = ns.new_satisfaction FROM new_sats ns WHERE dir.id = ns.id AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction RETURNING dir.employer_user_id; "#; +pub const QUERY_GET_DIRECTOR_USER: &str = r#" +SELECT fu.id AS falukant_user_id FROM falukant_data.director d JOIN falukant_data.falukant_user fu ON fu.id = d.employer_user_id WHERE d.id = $1 LIMIT 1; +"#; + pub const QUERY_COUNT_VEHICLES_IN_BRANCH_REGION: &str = r#" SELECT COUNT(v.id) AS cnt FROM falukant_data.vehicle v WHERE v.falukant_user_id = $1 AND v.region_id = $2; "#; @@ -656,36 +488,6 @@ pub const QUERY_UPDATE_USER_HOUSE_STATE: &str = r#" ); "#; -// Haus-Zusammenfall: Sobald eine Komponente <= 0 ist, wird das Haus entfernt. -// Gilt sowohl für kaufbare Häuser als auch für Häuser im Besitz. -pub const QUERY_DELETE_COLLAPSED_BUYABLE_HOUSES: &str = r#" - WITH deleted AS ( - DELETE FROM falukant_data.buyable_house - WHERE LEAST( - roof_condition, - floor_condition, - wall_condition, - window_condition - ) <= 0 - RETURNING id - ) - SELECT id AS house_id FROM deleted; -"#; - -pub const QUERY_DELETE_COLLAPSED_USER_HOUSES: &str = r#" - WITH deleted AS ( - DELETE FROM falukant_data.user_house - WHERE LEAST( - roof_condition, - floor_condition, - wall_condition, - window_condition - ) <= 0 - RETURNING id, user_id - ) - SELECT id AS house_id, user_id FROM deleted; -"#; - pub const QUERY_UPDATE_HOUSE_QUALITY: &str = r#" UPDATE falukant_data.user_house SET roof_condition = GREATEST(0, LEAST(100, roof_condition + $1)), @@ -704,7 +506,7 @@ WHERE region_id = $1; "#; pub const QUERY_GET_RANDOM_CHARACTER: &str = r#" -SELECT id, health, first_name, last_name +SELECT id, health FROM falukant_data."character" WHERE user_id = $1 AND health > 0 ORDER BY RANDOM() LIMIT 1; @@ -715,7 +517,7 @@ UPDATE falukant_data."character" SET health = $1 WHERE id = $2; "#; pub const QUERY_GET_REGION_CHARACTERS: &str = r#" -SELECT id, health FROM falukant_data."character" WHERE region_id = $1 AND health > 0 AND user_id IS NOT NULL; +SELECT id, health FROM falukant_data."character" WHERE region_id = $1 AND health > 0; "#; pub const QUERY_DELETE_DIRECTOR: &str = r#" @@ -726,18 +528,9 @@ pub const QUERY_DELETE_RELATIONSHIP: &str = r#" WITH deleted AS ( DELETE FROM falukant_data.relationship WHERE character1_id = $1 OR character2_id = $1 - RETURNING - CASE WHEN character1_id = $1 THEN character2_id ELSE character1_id END AS related_character_id, - relationship_type_id + RETURNING CASE WHEN character1_id = $1 THEN character2_id ELSE character1_id END AS related_character_id, relationship_type_id ) -SELECT - c.user_id AS related_user_id, - d.related_character_id, - d.relationship_type_id, - rt.tr AS relationship_type_tr -FROM deleted d -LEFT JOIN falukant_data.character c ON c.id = d.related_character_id -LEFT JOIN falukant_type.relationship rt ON rt.id = d.relationship_type_id; +SELECT c.user_id AS related_user_id FROM deleted d JOIN falukant_data.character c ON c.id = d.related_character_id; "#; pub const QUERY_GET_USER_ID: &str = r#" @@ -763,12 +556,6 @@ pub const QUERY_SET_CHARACTER_USER: &str = r#" UPDATE falukant_data.character SET user_id = $1, updated_at = NOW() WHERE id = $2; "#; -// Erben-Logik: erst alte User-Zuordnung lösen (für Unique-Constraints auf character.user_id), -// dann den Erben übernehmen lassen. -pub const QUERY_CLEAR_CHARACTER_USER: &str = r#" -UPDATE falukant_data.character SET user_id = NULL, updated_at = NOW() WHERE id = $1; -"#; - pub const QUERY_GET_CURRENT_MONEY: &str = r#" SELECT money FROM falukant_data.falukant_user WHERE id = $1; "#; @@ -1001,8 +788,7 @@ pub const QUERY_PROCESS_EXPIRED_AND_FILL: &str = r#" dt.office_type_id, dt.region_id, c.character_id, - COUNT(v.id) AS vote_count, - COALESCE(ch.reputation, 0) AS reputation + COUNT(v.id) AS vote_count FROM distinct_types AS dt JOIN falukant_data.election AS e ON e.office_type_id = dt.office_type_id @@ -1011,10 +797,8 @@ pub const QUERY_PROCESS_EXPIRED_AND_FILL: &str = r#" JOIN falukant_data.candidate AS c ON c.election_id = e.id AND c.id = v.candidate_id - JOIN falukant_data."character" AS ch - ON ch.id = c.character_id WHERE e.date >= (NOW() - INTERVAL '30 days') - GROUP BY dt.office_type_id, dt.region_id, c.character_id, ch.reputation + GROUP BY dt.office_type_id, dt.region_id, c.character_id ), ranked_winners AS ( SELECT @@ -1023,7 +807,7 @@ pub const QUERY_PROCESS_EXPIRED_AND_FILL: &str = r#" vpc.character_id, ROW_NUMBER() OVER ( PARTITION BY vpc.office_type_id, vpc.region_id - ORDER BY vpc.vote_count DESC, vpc.reputation DESC, vpc.character_id ASC + ORDER BY vpc.vote_count DESC ) AS rn FROM votes_per_candidate AS vpc ), @@ -1371,31 +1155,6 @@ pub const QUERY_RANDOM_HEIR: &str = r#" chosen.child_character_id; "#; -// Fallback-Erbe: Wenn ein Spieler-Character ohne Kinder stirbt, suchen wir einen zufälligen -// NPC-Character in derselben Region (Alter 10–14 Tage) und übergeben ihm die user_id. -pub const QUERY_GET_RANDOM_HEIR_FROM_REGION: &str = r#" - SELECT ch.id AS child_character_id - FROM falukant_data.character ch - WHERE ch.user_id IS NULL - AND ch.health > 0 - AND ch.id <> $1 - AND ch.region_id = ( - SELECT region_id - FROM falukant_data.character - WHERE id = $1 - ) - -- Alter zwischen 10 und 14 Tagen: birthdate in [now-14d, now-10d] - AND ch.birthdate <= NOW() - INTERVAL '10 days' - AND ch.birthdate >= NOW() - INTERVAL '14 days' - AND ch.title_of_nobility = ( - SELECT id - FROM falukant_type.title - WHERE label_tr = 'noncivil' - ) - ORDER BY RANDOM() - LIMIT 1; -"#; - pub const QUERY_UPDATE_USER_MONEY: &str = r#" UPDATE falukant_data.falukant_user SET money = $1, @@ -1645,16 +1404,8 @@ pub const QUERY_GET_FINISHED_PRODUCTIONS: &str = r#" ON p.product_id = pr.id JOIN falukant_data.branch br ON p.branch_id = br.id - -- Es kann vorkommen, dass ein User temporär keinen zugeordneten Character hat - -- (z.B. nach Tod/Erbe). Produktionen sollen trotzdem abschließen. - -- LATERAL verhindert Duplikate, falls ein User mehrere Characters hat. - LEFT JOIN LATERAL ( - SELECT c.id, c.user_id - FROM falukant_data.character c - WHERE c.user_id = br.falukant_user_id - ORDER BY c.updated_at DESC NULLS LAST, c.id DESC - LIMIT 1 - ) c ON TRUE + JOIN falukant_data.character c + ON c.user_id = br.falukant_user_id LEFT JOIN falukant_data.knowledge k ON p.product_id = k.product_id AND k.character_id = c.id @@ -1670,7 +1421,6 @@ pub const QUERY_GET_FINISHED_PRODUCTIONS: &str = r#" AND pwe.weather_type_id = w.weather_type_id -- Wetter-Effekte derzeit aus der Qualitätsberechnung entfernt WHERE p.start_timestamp + INTERVAL '1 minute' * pr.production_time <= NOW() - AND COALESCE(p.sleep, FALSE) = FALSE ORDER BY p.start_timestamp; "#; @@ -1679,64 +1429,6 @@ pub const QUERY_DELETE_PRODUCTION: &str = r#" WHERE id = $1; "#; -pub const QUERY_SET_PRODUCTION_SLEEP: &str = r#" - UPDATE falukant_data.production - SET sleep = TRUE - WHERE id = $1; -"#; - -pub const QUERY_GET_SLEEP_PRODUCTIONS: &str = r#" - SELECT - p.id AS production_id, - p.branch_id, - p.product_id, - p.quantity, - p.start_timestamp, - pr.production_time, - br.region_id, - br.falukant_user_id AS user_id, - ROUND( - GREATEST( - 0, - LEAST( - 100, - ( - (COALESCE(k.knowledge, 0) * 0.75 - + COALESCE(k2.knowledge, 0) * 0.25) - * COALESCE(pwe.quality_effect, 100) / 100.0 - ) - ) - ) - )::int AS quality - FROM falukant_data.production p - JOIN falukant_type.product pr - ON p.product_id = pr.id - JOIN falukant_data.branch br - ON p.branch_id = br.id - LEFT JOIN LATERAL ( - SELECT c.id, c.user_id - FROM falukant_data.character c - WHERE c.user_id = br.falukant_user_id - ORDER BY c.updated_at DESC NULLS LAST, c.id DESC - LIMIT 1 - ) c ON TRUE - LEFT JOIN falukant_data.knowledge k - ON p.product_id = k.product_id - AND k.character_id = c.id - LEFT JOIN falukant_data.director d - ON d.employer_user_id = c.user_id - LEFT JOIN falukant_data.knowledge k2 - ON k2.character_id = d.director_character_id - AND k2.product_id = p.product_id - LEFT JOIN falukant_data.weather w - ON w.region_id = br.region_id - LEFT JOIN falukant_type.product_weather_effect pwe - ON pwe.product_id = p.product_id - AND pwe.weather_type_id = w.weather_type_id - WHERE p.sleep = TRUE - ORDER BY p.start_timestamp; -"#; - pub const QUERY_INSERT_UPDATE_PRODUCTION_LOG: &str = r#" INSERT INTO falukant_log.production ( region_id, @@ -1750,6 +1442,16 @@ pub const QUERY_INSERT_UPDATE_PRODUCTION_LOG: &str = r#" SET quantity = falukant_log.production.quantity + EXCLUDED.quantity; "#; +pub const QUERY_ADD_OVERPRODUCTION_NOTIFICATION: &str = r#" + INSERT INTO falukant_log.notification ( + user_id, + tr, + shown, + created_at, + updated_at + ) VALUES ($1, $2, FALSE, NOW(), NOW()); +"#; + // Aliases for personal variants (keeps original prepared statement names used in events.worker) pub const QUERY_REDUCE_INVENTORY_PERSONAL: &str = QUERY_REDUCE_INVENTORY; pub const QUERY_DELETE_INVENTORY_PERSONAL: &str = QUERY_DELETE_INVENTORY; @@ -1932,5 +1634,202 @@ pub const QUERY_SET_LEARNING_DONE: &str = r#" WHERE id = $1; "#; +// Church Office Queries +pub const QUERY_FIND_AVAILABLE_CHURCH_OFFICES: &str = r#" + SELECT + cot.id AS office_type_id, + cot.name AS office_type_name, + cot.seats_per_region, + cot.region_type, + r.id AS region_id, + COUNT(co.id) AS occupied_seats + FROM falukant_type.church_office_type cot + CROSS JOIN falukant_data.region r + JOIN falukant_type.region tr ON r.region_type_id = tr.id + LEFT JOIN falukant_data.church_office co + ON cot.id = co.office_type_id + AND co.region_id = r.id + WHERE tr.label_tr = cot.region_type + GROUP BY cot.id, cot.name, cot.seats_per_region, cot.region_type, r.id + HAVING COUNT(co.id) < cot.seats_per_region + ORDER BY cot.hierarchy_level ASC, r.id; +"#; +pub const QUERY_FIND_CHURCH_SUPERVISOR: &str = r#" + SELECT + co.id AS office_id, + co.character_id AS supervisor_character_id, + co.region_id, + cot.hierarchy_level + FROM falukant_data.church_office co + JOIN falukant_type.church_office_type cot ON co.office_type_id = cot.id + WHERE co.region_id = $1 + AND cot.hierarchy_level > ( + SELECT hierarchy_level + FROM falukant_type.church_office_type + WHERE id = $2 + ) + ORDER BY cot.hierarchy_level ASC + LIMIT 1; +"#; + +pub const QUERY_GET_CHURCH_OFFICE_REQUIREMENTS: &str = r#" + SELECT + id, + office_type_id, + prerequisite_office_type_id, + min_title_level + FROM falukant_predefine.church_office_requirement + WHERE office_type_id = $1; +"#; + +pub const QUERY_GET_PENDING_CHURCH_APPLICATIONS: &str = r#" + SELECT + ca.id AS application_id, + ca.office_type_id, + ca.character_id AS applicant_character_id, + ca.region_id, + ca.supervisor_id, + cot.name AS office_type_name, + cot.hierarchy_level + FROM falukant_data.church_application ca + JOIN falukant_type.church_office_type cot ON ca.office_type_id = cot.id + WHERE ca.status = 'pending' + AND ca.supervisor_id = $1 + ORDER BY cot.hierarchy_level ASC, ca.created_at ASC; +"#; + +pub const QUERY_CHECK_CHARACTER_ELIGIBILITY: &str = r#" + WITH character_info AS ( + SELECT + c.id AS character_id, + c.title_of_nobility, + t.level AS title_level, + EXISTS( + SELECT 1 + FROM falukant_data.church_office co2 + WHERE co2.character_id = c.id + ) AS has_office + FROM falukant_data.character c + LEFT JOIN falukant_type.title t ON c.title_of_nobility = t.id + WHERE c.id = $1 + ), + prerequisite_check AS ( + SELECT + CASE + WHEN $2::int IS NULL THEN TRUE + ELSE EXISTS( + SELECT 1 + FROM falukant_data.church_office co + WHERE co.character_id = $1 + AND co.office_type_id = $2::int + ) + END AS has_prerequisite + ) + SELECT + ci.character_id, + ci.title_level, + ci.has_office, + pc.has_prerequisite, + CASE + WHEN $3::int IS NULL THEN TRUE + ELSE COALESCE(ci.title_level, 0) >= $3::int + END AS meets_title_requirement + FROM character_info ci + CROSS JOIN prerequisite_check pc; +"#; + +pub const QUERY_APPROVE_CHURCH_APPLICATION: &str = r#" + WITH updated_application AS ( + UPDATE falukant_data.church_application + SET status = 'approved', + decision_date = NOW(), + updated_at = NOW() + WHERE id = $1 + AND status = 'pending' + RETURNING + office_type_id, + character_id, + region_id, + supervisor_id + ), + inserted_office AS ( + INSERT INTO falukant_data.church_office + (office_type_id, character_id, region_id, supervisor_id, created_at, updated_at) + SELECT + office_type_id, + character_id, + region_id, + supervisor_id, + NOW(), + NOW() + FROM updated_application + WHERE NOT EXISTS( + SELECT 1 + FROM falukant_data.church_office co + WHERE co.office_type_id = updated_application.office_type_id + AND co.region_id = updated_application.region_id + AND co.character_id = updated_application.character_id + ) + RETURNING id, office_type_id, character_id, region_id + ) + SELECT + id AS office_id, + office_type_id, + character_id, + region_id + FROM inserted_office; +"#; + +pub const QUERY_REJECT_CHURCH_APPLICATION: &str = r#" + UPDATE falukant_data.church_application + SET status = 'rejected', + decision_date = NOW(), + updated_at = NOW() + WHERE id = $1 + AND status = 'pending' + RETURNING id; +"#; + +pub const QUERY_CREATE_CHURCH_APPLICATION_JOB: &str = r#" + INSERT INTO falukant_data.church_application + (office_type_id, character_id, region_id, supervisor_id, status, created_at, updated_at) + SELECT + $1::int AS office_type_id, + $2::int AS character_id, + $3::int AS region_id, + $4::int AS supervisor_id, + 'pending' AS status, + NOW() AS created_at, + NOW() AS updated_at + WHERE NOT EXISTS( + SELECT 1 + FROM falukant_data.church_application ca + WHERE ca.office_type_id = $1::int + AND ca.character_id = $2::int + AND ca.region_id = $3::int + AND ca.status = 'pending' + ) + RETURNING id; +"#; + +pub const QUERY_GET_CHARACTERS_FOR_CHURCH_OFFICE: &str = r#" + SELECT DISTINCT + c.id AS character_id, + c.user_id, + c.region_id, + c.title_of_nobility, + t.level AS title_level + FROM falukant_data.character c + LEFT JOIN falukant_type.title t ON c.title_of_nobility = t.id + WHERE c.region_id = $1 + AND c.health > 0 + AND NOT EXISTS( + SELECT 1 + FROM falukant_data.church_office co + WHERE co.character_id = c.id + ) + ORDER BY RANDOM() + LIMIT $2; +"#; diff --git a/src/worker/transport.rs b/src/worker/transport.rs index 63e8116..291dbab 100644 --- a/src/worker/transport.rs +++ b/src/worker/transport.rs @@ -1,10 +1,9 @@ use crate::db::{ConnectionPool, DbError}; use crate::message_broker::MessageBroker; use std::cmp::min; -use std::collections::HashSet; use std::sync::atomic::Ordering; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::sync::Arc; +use std::time::Duration; use super::base::{BaseWorker, Worker, WorkerState}; use crate::worker::sql::{ @@ -15,11 +14,7 @@ use crate::worker::sql::{ QUERY_DELETE_TRANSPORT, QUERY_GET_BRANCH_REGION, QUERY_UPDATE_TRANSPORT_SIZE, - QUERY_GET_BROKEN_VEHICLES, - QUERY_DELETE_TRANSPORTS_BY_VEHICLE, - QUERY_DELETE_VEHICLE, }; -use crate::worker::{insert_notification_conn, publish_update_status}; #[derive(Debug, Clone)] struct ArrivedTransport { id: i32, @@ -51,26 +46,13 @@ impl TransportWorker { } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { - let mut last_vehicle_check: Option = None; while state.running_worker.load(Ordering::Relaxed) { if let Err(err) = Self::process_arrived_transports(&pool, &broker) { eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}"); } - // Stündlich Fahrzeuge prüfen (condition==0 => löschen + Notification) - let now = Instant::now(); - let should_vehicle_check = last_vehicle_check - .map(|t| now.saturating_duration_since(t) >= Duration::from_secs(3600)) - .unwrap_or(true); - if should_vehicle_check { - if let Err(err) = Self::cleanup_broken_vehicles(&pool, &broker) { - eprintln!("[TransportWorker] Fehler in cleanup_broken_vehicles: {err}"); - } - last_vehicle_check = Some(now); - } - - // Minütlich prüfen (nicht sekündlich pollen) - for _ in 0..60 { + // Einmal pro Sekunde prüfen + for _ in 0..1 { if !state.running_worker.load(Ordering::Relaxed) { break; } @@ -79,104 +61,17 @@ impl TransportWorker { } } - fn cleanup_broken_vehicles( - pool: &ConnectionPool, - broker: &MessageBroker, - ) -> Result<(), DbError> { - let mut conn = pool - .get() - .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - - conn.prepare("get_broken_vehicles", QUERY_GET_BROKEN_VEHICLES)?; - conn.prepare( - "delete_transports_by_vehicle", - QUERY_DELETE_TRANSPORTS_BY_VEHICLE, - )?; - conn.prepare("delete_vehicle", QUERY_DELETE_VEHICLE)?; - - let rows = conn.execute("get_broken_vehicles", &[])?; - if rows.is_empty() { - return Ok(()); - } - - let mut affected_users: HashSet = HashSet::new(); - let mut deleted_count = 0usize; - - for row in rows { - let vehicle_id = parse_i32(&row, "vehicle_id", -1); - let user_id = parse_i32(&row, "user_id", -1); - if vehicle_id < 0 || user_id < 0 { - continue; - } - - // 1) Laufende/alte Transporte des Fahrzeugs entfernen (falls vorhanden), - // damit das Löschen des Fahrzeugs nicht an FK-Constraints scheitert. - if let Err(err) = conn.execute("delete_transports_by_vehicle", &[&vehicle_id]) { - eprintln!( - "[TransportWorker] Fehler beim Löschen von Transporten für vehicle_id={}: {}", - vehicle_id, err - ); - // weiter versuchen: evtl. existieren keine oder keine Constraints - } - - // 2) Fahrzeug löschen - if let Err(err) = conn.execute("delete_vehicle", &[&vehicle_id]) { - eprintln!( - "[TransportWorker] Fehler beim Löschen von vehicle_id={}: {}", - vehicle_id, err - ); - continue; - } - - // 3) Notification an Besitzer (DB) - let tr = format!( - r#"{{"tr":"vehicle_destroyed","vehicle_id":{}}}"#, - vehicle_id - ); - if let Err(err) = insert_notification_conn(&mut conn, user_id, &tr, None) { - eprintln!( - "[TransportWorker] Fehler beim Schreiben der Vehicle-Notification (user_id={}, vehicle_id={}): {}", - user_id, vehicle_id, err - ); - } - - affected_users.insert(user_id); - deleted_count += 1; - } - - if deleted_count > 0 { - eprintln!( - "[TransportWorker] {} Fahrzeug(e) mit condition=0 gelöscht", - deleted_count - ); - } - - // 4) Frontend informieren: Branches/Status neu laden - for user_id in affected_users { - publish_update_status(broker, user_id); - } - - Ok(()) - } - fn process_arrived_transports( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let transports = Self::load_arrived_transports(pool)?; - if !transports.is_empty() { - eprintln!( - "[TransportWorker] {} angekommene Transport(e) gefunden", - transports.len() - ); - } - for t in transports { if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) { eprintln!( - "[TransportWorker] Fehler beim Verarbeiten von Transport {} (vehicle_id={}, product_id={:?}, size={}): {}", - t.id, t.vehicle_id, t.product_id, t.size, err + "[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}", + t.id ); } } @@ -192,19 +87,6 @@ impl TransportWorker { conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?; let rows = conn.execute("get_arrived_transports", &[])?; - if rows.is_empty() { - // Nur alle 60 Sekunden loggen, um Log-Flut zu vermeiden - static LAST_LOG: Mutex> = Mutex::new(None); - let mut last_log = LAST_LOG.lock().unwrap(); - let should_log = last_log - .map(|t| t.elapsed().as_secs() >= 60) - .unwrap_or(true); - if should_log { - eprintln!("[TransportWorker] Keine angekommenen Transporte gefunden"); - *last_log = Some(Instant::now()); - } - } - let mut result = Vec::with_capacity(rows.len()); for row in rows { let id = parse_i32(&row, "id", -1); @@ -341,13 +223,6 @@ impl TransportWorker { ); broker.publish(target_inventory_message); } - } else { - // Nichts konnte eingelagert werden - Transport bleibt unverändert - // Logge dies, damit wir sehen, dass der Transport wartet - eprintln!( - "[TransportWorker] Transport {} wartet: Kein Lagerplatz verfügbar (branch_id={}, product_id={}, size={})", - t.id, t.target_branch_id, product_id, t.size - ); } // Keine Notification für wartende Transporte, um Notification-System zu entlasten. diff --git a/src/worker/user_character.rs b/src/worker/user_character.rs index 89880ff..c5c76f4 100644 --- a/src/worker/user_character.rs +++ b/src/worker/user_character.rs @@ -28,9 +28,7 @@ use crate::worker::sql::{ QUERY_COUNT_CHILDREN, QUERY_GET_HEIR, QUERY_RANDOM_HEIR, - QUERY_GET_RANDOM_HEIR_FROM_REGION, QUERY_SET_CHARACTER_USER, - QUERY_CLEAR_CHARACTER_USER, QUERY_UPDATE_USER_MONEY, QUERY_GET_FALUKANT_USER_ID, QUERY_AUTOBATISM, @@ -578,55 +576,7 @@ impl UserCharacterWorker { conn.prepare("delete_election_candidate", QUERY_DELETE_ELECTION_CANDIDATE)?; conn.execute("delete_director", &[&character_id])?; - - // Relationships löschen mit Logging und spezieller Notification für Verlobungen - let rel_result = conn.execute("delete_relationship", &[&character_id])?; - - // Logging: Anzahl gelöschter Relationships - let deleted_count = rel_result.len(); - if deleted_count > 0 { - eprintln!( - "[UserCharacterWorker] {} Relationship(s) gelöscht für character_id={}", - deleted_count, character_id - ); - } - - for row in rel_result { - let related_user_id = row - .get("related_user_id") - .and_then(|v| v.parse::().ok()); - let related_character_id = row - .get("related_character_id") - .and_then(|v| v.parse::().ok()); - let relationship_type_tr = row - .get("relationship_type_tr") - .map(|s| s.to_string()); - - // Logging: Relationship wurde gelöscht - eprintln!( - "[UserCharacterWorker] Relationship gelöscht: character_id={}, related_character_id={:?}, related_user_id={:?}, relationship_type={:?}", - character_id, - related_character_id, - related_user_id, - relationship_type_tr - ); - - if let Some(uid) = related_user_id { - use crate::worker::insert_notification; - // Spezielle Notification für Verlobungen - if relationship_type_tr.as_deref() == Some("engaged") { - let notification_json = serde_json::json!({ - "tr": "relationship.engaged_character_death", - "character_id": related_character_id - }); - insert_notification(&self.base.pool, uid, ¬ification_json.to_string(), related_character_id)?; - } else { - use crate::worker::insert_notification; - insert_notification(&self.base.pool, uid, "relationship_death", None)?; - } - } - } - + conn.execute("delete_relationship", &[&character_id])?; conn.execute("delete_child_relation", &[&character_id])?; conn.execute("delete_knowledge", &[&character_id])?; conn.execute("delete_debtors_prism", &[&character_id])?; @@ -657,15 +607,7 @@ impl UserCharacterWorker { new_money = self.calculate_new_money(falukant_user_id, heir_id > 0)?; } - // Wenn es gar keine Kinder gibt, nimm einen zufälligen NPC in der Region (Alter 10–14 Tage). - if heir_id < 1 { - heir_id = self.get_random_heir_from_region(character_id)?; - new_money = self.calculate_new_money(falukant_user_id, heir_id > 0)?; - } - if heir_id > 0 { - // Erst die alte Zuordnung lösen (Unique-Constraint safety), dann den Erben zuweisen. - self.clear_character_user(character_id)?; self.set_new_character(falukant_user_id, heir_id)?; } self.set_new_money(falukant_user_id, new_money)?; @@ -724,23 +666,6 @@ impl UserCharacterWorker { .unwrap_or(-1)) } - fn get_random_heir_from_region(&mut self, deceased_character_id: i32) -> Result { - let mut conn = self - .base - .pool - .get() - .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - - conn.prepare("random_heir_region", QUERY_GET_RANDOM_HEIR_FROM_REGION)?; - let rows = conn.execute("random_heir_region", &[&deceased_character_id])?; - - Ok(rows - .first() - .and_then(|r| r.get("child_character_id")) - .and_then(|v| v.parse::().ok()) - .unwrap_or(-1)) - } - fn set_new_character( &mut self, falukant_user_id: i32, @@ -760,18 +685,6 @@ impl UserCharacterWorker { Ok(()) } - fn clear_character_user(&mut self, deceased_character_id: i32) -> Result<(), DbError> { - let mut conn = self - .base - .pool - .get() - .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - - conn.prepare("clear_character_user", QUERY_CLEAR_CHARACTER_USER)?; - conn.execute("clear_character_user", &[&deceased_character_id])?; - Ok(()) - } - fn set_new_money(&mut self, falukant_user_id: i32, new_amount: f64) -> Result<(), DbError> { let mut conn = self .base diff --git a/src/worker/weather.rs b/src/worker/weather.rs index ea24a58..5dbe9db 100644 --- a/src/worker/weather.rs +++ b/src/worker/weather.rs @@ -11,6 +11,8 @@ pub struct WeatherWorker { base: BaseWorker, } +// Reuse QUERY_UPDATE_WEATHER from centralized SQL module + impl WeatherWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { @@ -61,17 +63,17 @@ impl WeatherWorker { .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + // Run the prepared SQL that uses per-row RANDOM() trick conn.prepare("update_weather", QUERY_UPDATE_WEATHER)?; let updated_rows = conn.execute("update_weather", &[])?; eprintln!( - "[WeatherWorker] Wetter aktualisiert. {} Regionen betroffen.", + "[WeatherWorker] Wetter aktualisiert (per-row random). {} Regionen betroffen.", updated_rows.len() ); // Benachrichtige alle Clients über Wetteränderungen - let message = r#"{"event":"weather_updated"}"#; - broker.publish(message.to_string()); + broker.publish("{\"event\":\"weather_updated\"}".to_string()); Ok(()) }