diff --git a/src/worker/character_creation.rs b/src/worker/character_creation.rs index 704338a..adc0834 100644 --- a/src/worker/character_creation.rs +++ b/src/worker/character_creation.rs @@ -20,9 +20,9 @@ use crate::worker::sql::{ QUERY_DELETE_DIRECTOR, QUERY_DELETE_RELATIONSHIP, QUERY_DELETE_CHILD_RELATION, - QUERY_INSERT_NOTIFICATION, QUERY_MARK_CHARACTER_DECEASED, }; +use crate::worker::{insert_notification, publish_update_status}; pub struct CharacterCreationWorker { pub(crate) base: BaseWorker, @@ -456,17 +456,11 @@ impl CharacterCreationWorker { user_id: i32, event_type: &str, ) -> Result<(), DbError> { - let mut conn = pool - .get() - .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + // DB-Notification (zentralisiert). Historisch wird hier als `tr` der event_type-String gespeichert. + insert_notification(pool, user_id, event_type, None)?; - conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?; - conn.execute("insert_notification", &[&user_id])?; - - // falukantUpdateStatus - let update_message = - format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); - broker.publish(update_message); + // Frontend-Update (zentralisiert) + publish_update_status(broker, user_id); // ursprüngliche Benachrichtigung let message = diff --git a/src/worker/events.rs b/src/worker/events.rs index 0727a00..21b4d6b 100644 --- a/src/worker/events.rs +++ b/src/worker/events.rs @@ -13,7 +13,6 @@ use crate::worker::sql::{ QUERY_GET_RANDOM_INFANT, QUERY_GET_RANDOM_CITY, QUERY_GET_AFFECTED_USERS, - QUERY_INSERT_NOTIFICATION, QUERY_GET_MONEY, QUERY_UPDATE_MONEY, QUERY_GET_REGION_STOCKS, @@ -53,6 +52,7 @@ use crate::worker::sql::{ QUERY_GET_CREDIT_DEBT, QUERY_COUNT_CHILDREN, }; +use crate::worker::{insert_notification, publish_update_status}; /// Typisierung von Ereignissen #[derive(Debug, Clone, Copy, PartialEq)] @@ -1617,17 +1617,11 @@ impl EventsWorker { user_id: i32, event_type: &str, ) -> Result<(), DbError> { - let mut conn = pool - .get() - .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + // DB-Notification (zentralisiert). Historisch wird hier als `tr` der event_type-String gespeichert. + insert_notification(pool, user_id, event_type, None)?; - conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?; - conn.execute("insert_notification", &[&user_id, &event_type])?; - - // falukantUpdateStatus - let update_message = - format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); - broker.publish(update_message); + // Frontend-Update (zentralisiert) + publish_update_status(broker, user_id); // ursprüngliche Benachrichtigung let message = diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 317d6ae..76a02ae 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -12,6 +12,7 @@ mod transport; mod weather; mod events; mod sql; +mod notify; pub use base::Worker; pub use crate::db::ConnectionPool; @@ -27,4 +28,5 @@ pub use user_character::UserCharacterWorker; pub use transport::TransportWorker; pub use weather::WeatherWorker; pub use events::EventsWorker; +pub use notify::{insert_notification, insert_notification_conn, publish_update_status}; diff --git a/src/worker/notify.rs b/src/worker/notify.rs new file mode 100644 index 0000000..d270146 --- /dev/null +++ b/src/worker/notify.rs @@ -0,0 +1,45 @@ +use crate::db::{ConnectionPool, DbConnection, DbError}; +use crate::message_broker::MessageBroker; +use crate::worker::sql::QUERY_INSERT_NOTIFICATION_FULL; + +/// Schreibt eine Notification in `falukant_log.notification`. +/// +/// - `tr` ist ein (i.d.R. JSON-)String, den das Frontend parst. +/// - `character_id` ist optional (NULL). +pub fn insert_notification( + pool: &ConnectionPool, + user_id: i32, + tr: &str, + character_id: Option, +) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + insert_notification_conn(&mut conn, user_id, tr, character_id)?; + Ok(()) +} + +/// Variante für bestehende DB-Verbindungen (spart Connect/Prepare in Loops). +pub fn insert_notification_conn( + conn: &mut DbConnection, + user_id: i32, + tr: &str, + character_id: Option, +) -> Result<(), DbError> { + conn.prepare("insert_notification_full", QUERY_INSERT_NOTIFICATION_FULL)?; + conn.execute( + "insert_notification_full", + &[&user_id, &tr, &character_id], + )?; + Ok(()) +} + +/// Informiert das Frontend, dass sich der Status geändert hat (z.B. Branches neu laden). +pub fn publish_update_status(broker: &MessageBroker, user_id: i32) { + broker.publish(format!( + r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, + user_id + )); +} + diff --git a/src/worker/produce.rs b/src/worker/produce.rs index 16c8d24..7c2b4dd 100644 --- a/src/worker/produce.rs +++ b/src/worker/produce.rs @@ -13,8 +13,8 @@ use crate::worker::sql::{ QUERY_DELETE_PRODUCTION, QUERY_INSERT_INVENTORY, QUERY_INSERT_UPDATE_PRODUCTION_LOG, - QUERY_ADD_OVERPRODUCTION_NOTIFICATION, }; +use crate::worker::insert_notification_conn; /// Abbildet eine abgeschlossene Produktion aus der Datenbank. #[derive(Debug, Clone)] @@ -402,22 +402,13 @@ impl ProduceWorker { .get() .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - conn.prepare( - "add_overproduction_notification", - QUERY_ADD_OVERPRODUCTION_NOTIFICATION, - )?; - // Zusätzlich zur Menge die Branch-ID in der Payload mitschicken, damit // das Frontend die Überproduktion einem konkreten Branch zuordnen kann. let notification = format!( r#"{{"tr":"production.overproduction","value":{},"branch_id":{}}}"#, remaining_quantity, branch_id ); - - conn.execute( - "add_overproduction_notification", - &[&user_id, ¬ification], - )?; + insert_notification_conn(&mut conn, user_id, ¬ification, None)?; Ok(()) } diff --git a/src/worker/sql.rs b/src/worker/sql.rs index 2052068..23b17f5 100644 --- a/src/worker/sql.rs +++ b/src/worker/sql.rs @@ -53,9 +53,17 @@ ON CONFLICT (region_id) DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id; "#; -pub const QUERY_INSERT_NOTIFICATION: &str = r#" -INSERT INTO falukant_log.notification (user_id, tr, shown, created_at, updated_at) -VALUES ($1, $2, FALSE, NOW(), NOW()); +/// Vollständige Notification-Insert-Query (inkl. `character_id`), damit Trigger/Schemaänderungen +/// nicht an fehlenden Spalten scheitern. +pub const QUERY_INSERT_NOTIFICATION_FULL: &str = r#" + INSERT INTO falukant_log.notification ( + user_id, + tr, + character_id, + shown, + created_at, + updated_at + ) VALUES ($1, $2, $3, FALSE, NOW(), NOW()); "#; // Product pricing @@ -295,6 +303,21 @@ pub const QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT: &str = r#" UPDATE falukant_data.vehicle SET region_id = $2, condition = GREATEST(0, condition - $3::int), available_from = NOW(), updated_at = NOW() WHERE id = $1; "#; +// Vehicle maintenance (TransportWorker) +pub const QUERY_GET_BROKEN_VEHICLES: &str = r#" +SELECT id AS vehicle_id, falukant_user_id AS user_id + FROM falukant_data.vehicle + WHERE condition <= 0; +"#; + +pub const QUERY_DELETE_TRANSPORTS_BY_VEHICLE: &str = r#" +DELETE FROM falukant_data.transport WHERE vehicle_id = $1; +"#; + +pub const QUERY_DELETE_VEHICLE: &str = r#" +DELETE FROM falukant_data.vehicle WHERE id = $1; +"#; + pub const QUERY_DELETE_TRANSPORT: &str = r#" DELETE FROM falukant_data.transport WHERE id = $1; "#; @@ -1487,17 +1510,6 @@ pub const QUERY_INSERT_UPDATE_PRODUCTION_LOG: &str = r#" SET quantity = falukant_log.production.quantity + EXCLUDED.quantity; "#; -pub const QUERY_ADD_OVERPRODUCTION_NOTIFICATION: &str = r#" - INSERT INTO falukant_log.notification ( - user_id, - tr, - character_id, - shown, - created_at, - updated_at - ) VALUES ($1, $2, NULL, FALSE, NOW(), NOW()); -"#; - // Aliases for personal variants (keeps original prepared statement names used in events.worker) pub const QUERY_REDUCE_INVENTORY_PERSONAL: &str = QUERY_REDUCE_INVENTORY; pub const QUERY_DELETE_INVENTORY_PERSONAL: &str = QUERY_DELETE_INVENTORY; diff --git a/src/worker/transport.rs b/src/worker/transport.rs index c3c7741..63e8116 100644 --- a/src/worker/transport.rs +++ b/src/worker/transport.rs @@ -1,6 +1,7 @@ use crate::db::{ConnectionPool, DbError}; use crate::message_broker::MessageBroker; use std::cmp::min; +use std::collections::HashSet; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -14,7 +15,11 @@ use crate::worker::sql::{ QUERY_DELETE_TRANSPORT, QUERY_GET_BRANCH_REGION, QUERY_UPDATE_TRANSPORT_SIZE, + QUERY_GET_BROKEN_VEHICLES, + QUERY_DELETE_TRANSPORTS_BY_VEHICLE, + QUERY_DELETE_VEHICLE, }; +use crate::worker::{insert_notification_conn, publish_update_status}; #[derive(Debug, Clone)] struct ArrivedTransport { id: i32, @@ -46,11 +51,24 @@ impl TransportWorker { } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { + let mut last_vehicle_check: Option = None; while state.running_worker.load(Ordering::Relaxed) { if let Err(err) = Self::process_arrived_transports(&pool, &broker) { eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}"); } + // Stündlich Fahrzeuge prüfen (condition==0 => löschen + Notification) + let now = Instant::now(); + let should_vehicle_check = last_vehicle_check + .map(|t| now.saturating_duration_since(t) >= Duration::from_secs(3600)) + .unwrap_or(true); + if should_vehicle_check { + if let Err(err) = Self::cleanup_broken_vehicles(&pool, &broker) { + eprintln!("[TransportWorker] Fehler in cleanup_broken_vehicles: {err}"); + } + last_vehicle_check = Some(now); + } + // Minütlich prüfen (nicht sekündlich pollen) for _ in 0..60 { if !state.running_worker.load(Ordering::Relaxed) { @@ -61,6 +79,86 @@ impl TransportWorker { } } + fn cleanup_broken_vehicles( + pool: &ConnectionPool, + broker: &MessageBroker, + ) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + conn.prepare("get_broken_vehicles", QUERY_GET_BROKEN_VEHICLES)?; + conn.prepare( + "delete_transports_by_vehicle", + QUERY_DELETE_TRANSPORTS_BY_VEHICLE, + )?; + conn.prepare("delete_vehicle", QUERY_DELETE_VEHICLE)?; + + let rows = conn.execute("get_broken_vehicles", &[])?; + if rows.is_empty() { + return Ok(()); + } + + let mut affected_users: HashSet = HashSet::new(); + let mut deleted_count = 0usize; + + for row in rows { + let vehicle_id = parse_i32(&row, "vehicle_id", -1); + let user_id = parse_i32(&row, "user_id", -1); + if vehicle_id < 0 || user_id < 0 { + continue; + } + + // 1) Laufende/alte Transporte des Fahrzeugs entfernen (falls vorhanden), + // damit das Löschen des Fahrzeugs nicht an FK-Constraints scheitert. + if let Err(err) = conn.execute("delete_transports_by_vehicle", &[&vehicle_id]) { + eprintln!( + "[TransportWorker] Fehler beim Löschen von Transporten für vehicle_id={}: {}", + vehicle_id, err + ); + // weiter versuchen: evtl. existieren keine oder keine Constraints + } + + // 2) Fahrzeug löschen + if let Err(err) = conn.execute("delete_vehicle", &[&vehicle_id]) { + eprintln!( + "[TransportWorker] Fehler beim Löschen von vehicle_id={}: {}", + vehicle_id, err + ); + continue; + } + + // 3) Notification an Besitzer (DB) + let tr = format!( + r#"{{"tr":"vehicle_destroyed","vehicle_id":{}}}"#, + vehicle_id + ); + if let Err(err) = insert_notification_conn(&mut conn, user_id, &tr, None) { + eprintln!( + "[TransportWorker] Fehler beim Schreiben der Vehicle-Notification (user_id={}, vehicle_id={}): {}", + user_id, vehicle_id, err + ); + } + + affected_users.insert(user_id); + deleted_count += 1; + } + + if deleted_count > 0 { + eprintln!( + "[TransportWorker] {} Fahrzeug(e) mit condition=0 gelöscht", + deleted_count + ); + } + + // 4) Frontend informieren: Branches/Status neu laden + for user_id in affected_users { + publish_update_status(broker, user_id); + } + + Ok(()) + } + fn process_arrived_transports( pool: &ConnectionPool, broker: &MessageBroker,