Refactor notification handling in Workers: Centralize notification insertion and frontend updates by replacing direct SQL queries with dedicated functions. This improves code maintainability and reduces redundancy across character creation, events, and transport processing.
This commit is contained in:
@@ -20,9 +20,9 @@ use crate::worker::sql::{
|
||||
QUERY_DELETE_DIRECTOR,
|
||||
QUERY_DELETE_RELATIONSHIP,
|
||||
QUERY_DELETE_CHILD_RELATION,
|
||||
QUERY_INSERT_NOTIFICATION,
|
||||
QUERY_MARK_CHARACTER_DECEASED,
|
||||
};
|
||||
use crate::worker::{insert_notification, publish_update_status};
|
||||
|
||||
pub struct CharacterCreationWorker {
|
||||
pub(crate) base: BaseWorker,
|
||||
@@ -456,17 +456,11 @@ impl CharacterCreationWorker {
|
||||
user_id: i32,
|
||||
event_type: &str,
|
||||
) -> Result<(), DbError> {
|
||||
let mut conn = pool
|
||||
.get()
|
||||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||||
// DB-Notification (zentralisiert). Historisch wird hier als `tr` der event_type-String gespeichert.
|
||||
insert_notification(pool, user_id, event_type, None)?;
|
||||
|
||||
conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?;
|
||||
conn.execute("insert_notification", &[&user_id])?;
|
||||
|
||||
// falukantUpdateStatus
|
||||
let update_message =
|
||||
format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id);
|
||||
broker.publish(update_message);
|
||||
// Frontend-Update (zentralisiert)
|
||||
publish_update_status(broker, user_id);
|
||||
|
||||
// ursprüngliche Benachrichtigung
|
||||
let message =
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::worker::sql::{
|
||||
QUERY_GET_RANDOM_INFANT,
|
||||
QUERY_GET_RANDOM_CITY,
|
||||
QUERY_GET_AFFECTED_USERS,
|
||||
QUERY_INSERT_NOTIFICATION,
|
||||
QUERY_GET_MONEY,
|
||||
QUERY_UPDATE_MONEY,
|
||||
QUERY_GET_REGION_STOCKS,
|
||||
@@ -53,6 +52,7 @@ use crate::worker::sql::{
|
||||
QUERY_GET_CREDIT_DEBT,
|
||||
QUERY_COUNT_CHILDREN,
|
||||
};
|
||||
use crate::worker::{insert_notification, publish_update_status};
|
||||
|
||||
/// Typisierung von Ereignissen
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
@@ -1617,17 +1617,11 @@ impl EventsWorker {
|
||||
user_id: i32,
|
||||
event_type: &str,
|
||||
) -> Result<(), DbError> {
|
||||
let mut conn = pool
|
||||
.get()
|
||||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||||
// DB-Notification (zentralisiert). Historisch wird hier als `tr` der event_type-String gespeichert.
|
||||
insert_notification(pool, user_id, event_type, None)?;
|
||||
|
||||
conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?;
|
||||
conn.execute("insert_notification", &[&user_id, &event_type])?;
|
||||
|
||||
// falukantUpdateStatus
|
||||
let update_message =
|
||||
format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id);
|
||||
broker.publish(update_message);
|
||||
// Frontend-Update (zentralisiert)
|
||||
publish_update_status(broker, user_id);
|
||||
|
||||
// ursprüngliche Benachrichtigung
|
||||
let message =
|
||||
|
||||
@@ -12,6 +12,7 @@ mod transport;
|
||||
mod weather;
|
||||
mod events;
|
||||
mod sql;
|
||||
mod notify;
|
||||
|
||||
pub use base::Worker;
|
||||
pub use crate::db::ConnectionPool;
|
||||
@@ -27,4 +28,5 @@ pub use user_character::UserCharacterWorker;
|
||||
pub use transport::TransportWorker;
|
||||
pub use weather::WeatherWorker;
|
||||
pub use events::EventsWorker;
|
||||
pub use notify::{insert_notification, insert_notification_conn, publish_update_status};
|
||||
|
||||
|
||||
45
src/worker/notify.rs
Normal file
45
src/worker/notify.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
use crate::db::{ConnectionPool, DbConnection, DbError};
|
||||
use crate::message_broker::MessageBroker;
|
||||
use crate::worker::sql::QUERY_INSERT_NOTIFICATION_FULL;
|
||||
|
||||
/// Schreibt eine Notification in `falukant_log.notification`.
|
||||
///
|
||||
/// - `tr` ist ein (i.d.R. JSON-)String, den das Frontend parst.
|
||||
/// - `character_id` ist optional (NULL).
|
||||
pub fn insert_notification(
|
||||
pool: &ConnectionPool,
|
||||
user_id: i32,
|
||||
tr: &str,
|
||||
character_id: Option<i32>,
|
||||
) -> Result<(), DbError> {
|
||||
let mut conn = pool
|
||||
.get()
|
||||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||||
|
||||
insert_notification_conn(&mut conn, user_id, tr, character_id)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Variante für bestehende DB-Verbindungen (spart Connect/Prepare in Loops).
|
||||
pub fn insert_notification_conn(
|
||||
conn: &mut DbConnection,
|
||||
user_id: i32,
|
||||
tr: &str,
|
||||
character_id: Option<i32>,
|
||||
) -> Result<(), DbError> {
|
||||
conn.prepare("insert_notification_full", QUERY_INSERT_NOTIFICATION_FULL)?;
|
||||
conn.execute(
|
||||
"insert_notification_full",
|
||||
&[&user_id, &tr, &character_id],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Informiert das Frontend, dass sich der Status geändert hat (z.B. Branches neu laden).
|
||||
pub fn publish_update_status(broker: &MessageBroker, user_id: i32) {
|
||||
broker.publish(format!(
|
||||
r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#,
|
||||
user_id
|
||||
));
|
||||
}
|
||||
|
||||
@@ -13,8 +13,8 @@ use crate::worker::sql::{
|
||||
QUERY_DELETE_PRODUCTION,
|
||||
QUERY_INSERT_INVENTORY,
|
||||
QUERY_INSERT_UPDATE_PRODUCTION_LOG,
|
||||
QUERY_ADD_OVERPRODUCTION_NOTIFICATION,
|
||||
};
|
||||
use crate::worker::insert_notification_conn;
|
||||
|
||||
/// Abbildet eine abgeschlossene Produktion aus der Datenbank.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -402,22 +402,13 @@ impl ProduceWorker {
|
||||
.get()
|
||||
.map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||||
|
||||
conn.prepare(
|
||||
"add_overproduction_notification",
|
||||
QUERY_ADD_OVERPRODUCTION_NOTIFICATION,
|
||||
)?;
|
||||
|
||||
// Zusätzlich zur Menge die Branch-ID in der Payload mitschicken, damit
|
||||
// das Frontend die Überproduktion einem konkreten Branch zuordnen kann.
|
||||
let notification = format!(
|
||||
r#"{{"tr":"production.overproduction","value":{},"branch_id":{}}}"#,
|
||||
remaining_quantity, branch_id
|
||||
);
|
||||
|
||||
conn.execute(
|
||||
"add_overproduction_notification",
|
||||
&[&user_id, ¬ification],
|
||||
)?;
|
||||
insert_notification_conn(&mut conn, user_id, ¬ification, None)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -53,9 +53,17 @@ ON CONFLICT (region_id)
|
||||
DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id;
|
||||
"#;
|
||||
|
||||
pub const QUERY_INSERT_NOTIFICATION: &str = r#"
|
||||
INSERT INTO falukant_log.notification (user_id, tr, shown, created_at, updated_at)
|
||||
VALUES ($1, $2, FALSE, NOW(), NOW());
|
||||
/// Vollständige Notification-Insert-Query (inkl. `character_id`), damit Trigger/Schemaänderungen
|
||||
/// nicht an fehlenden Spalten scheitern.
|
||||
pub const QUERY_INSERT_NOTIFICATION_FULL: &str = r#"
|
||||
INSERT INTO falukant_log.notification (
|
||||
user_id,
|
||||
tr,
|
||||
character_id,
|
||||
shown,
|
||||
created_at,
|
||||
updated_at
|
||||
) VALUES ($1, $2, $3, FALSE, NOW(), NOW());
|
||||
"#;
|
||||
|
||||
// Product pricing
|
||||
@@ -295,6 +303,21 @@ pub const QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT: &str = r#"
|
||||
UPDATE falukant_data.vehicle SET region_id = $2, condition = GREATEST(0, condition - $3::int), available_from = NOW(), updated_at = NOW() WHERE id = $1;
|
||||
"#;
|
||||
|
||||
// Vehicle maintenance (TransportWorker)
|
||||
pub const QUERY_GET_BROKEN_VEHICLES: &str = r#"
|
||||
SELECT id AS vehicle_id, falukant_user_id AS user_id
|
||||
FROM falukant_data.vehicle
|
||||
WHERE condition <= 0;
|
||||
"#;
|
||||
|
||||
pub const QUERY_DELETE_TRANSPORTS_BY_VEHICLE: &str = r#"
|
||||
DELETE FROM falukant_data.transport WHERE vehicle_id = $1;
|
||||
"#;
|
||||
|
||||
pub const QUERY_DELETE_VEHICLE: &str = r#"
|
||||
DELETE FROM falukant_data.vehicle WHERE id = $1;
|
||||
"#;
|
||||
|
||||
pub const QUERY_DELETE_TRANSPORT: &str = r#"
|
||||
DELETE FROM falukant_data.transport WHERE id = $1;
|
||||
"#;
|
||||
@@ -1487,17 +1510,6 @@ pub const QUERY_INSERT_UPDATE_PRODUCTION_LOG: &str = r#"
|
||||
SET quantity = falukant_log.production.quantity + EXCLUDED.quantity;
|
||||
"#;
|
||||
|
||||
pub const QUERY_ADD_OVERPRODUCTION_NOTIFICATION: &str = r#"
|
||||
INSERT INTO falukant_log.notification (
|
||||
user_id,
|
||||
tr,
|
||||
character_id,
|
||||
shown,
|
||||
created_at,
|
||||
updated_at
|
||||
) VALUES ($1, $2, NULL, FALSE, NOW(), NOW());
|
||||
"#;
|
||||
|
||||
// Aliases for personal variants (keeps original prepared statement names used in events.worker)
|
||||
pub const QUERY_REDUCE_INVENTORY_PERSONAL: &str = QUERY_REDUCE_INVENTORY;
|
||||
pub const QUERY_DELETE_INVENTORY_PERSONAL: &str = QUERY_DELETE_INVENTORY;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::db::{ConnectionPool, DbError};
|
||||
use crate::message_broker::MessageBroker;
|
||||
use std::cmp::min;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -14,7 +15,11 @@ use crate::worker::sql::{
|
||||
QUERY_DELETE_TRANSPORT,
|
||||
QUERY_GET_BRANCH_REGION,
|
||||
QUERY_UPDATE_TRANSPORT_SIZE,
|
||||
QUERY_GET_BROKEN_VEHICLES,
|
||||
QUERY_DELETE_TRANSPORTS_BY_VEHICLE,
|
||||
QUERY_DELETE_VEHICLE,
|
||||
};
|
||||
use crate::worker::{insert_notification_conn, publish_update_status};
|
||||
#[derive(Debug, Clone)]
|
||||
struct ArrivedTransport {
|
||||
id: i32,
|
||||
@@ -46,11 +51,24 @@ impl TransportWorker {
|
||||
}
|
||||
|
||||
fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc<WorkerState>) {
|
||||
let mut last_vehicle_check: Option<Instant> = None;
|
||||
while state.running_worker.load(Ordering::Relaxed) {
|
||||
if let Err(err) = Self::process_arrived_transports(&pool, &broker) {
|
||||
eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}");
|
||||
}
|
||||
|
||||
// Stündlich Fahrzeuge prüfen (condition==0 => löschen + Notification)
|
||||
let now = Instant::now();
|
||||
let should_vehicle_check = last_vehicle_check
|
||||
.map(|t| now.saturating_duration_since(t) >= Duration::from_secs(3600))
|
||||
.unwrap_or(true);
|
||||
if should_vehicle_check {
|
||||
if let Err(err) = Self::cleanup_broken_vehicles(&pool, &broker) {
|
||||
eprintln!("[TransportWorker] Fehler in cleanup_broken_vehicles: {err}");
|
||||
}
|
||||
last_vehicle_check = Some(now);
|
||||
}
|
||||
|
||||
// Minütlich prüfen (nicht sekündlich pollen)
|
||||
for _ in 0..60 {
|
||||
if !state.running_worker.load(Ordering::Relaxed) {
|
||||
@@ -61,6 +79,86 @@ impl TransportWorker {
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_broken_vehicles(
|
||||
pool: &ConnectionPool,
|
||||
broker: &MessageBroker,
|
||||
) -> Result<(), DbError> {
|
||||
let mut conn = pool
|
||||
.get()
|
||||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||||
|
||||
conn.prepare("get_broken_vehicles", QUERY_GET_BROKEN_VEHICLES)?;
|
||||
conn.prepare(
|
||||
"delete_transports_by_vehicle",
|
||||
QUERY_DELETE_TRANSPORTS_BY_VEHICLE,
|
||||
)?;
|
||||
conn.prepare("delete_vehicle", QUERY_DELETE_VEHICLE)?;
|
||||
|
||||
let rows = conn.execute("get_broken_vehicles", &[])?;
|
||||
if rows.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut affected_users: HashSet<i32> = HashSet::new();
|
||||
let mut deleted_count = 0usize;
|
||||
|
||||
for row in rows {
|
||||
let vehicle_id = parse_i32(&row, "vehicle_id", -1);
|
||||
let user_id = parse_i32(&row, "user_id", -1);
|
||||
if vehicle_id < 0 || user_id < 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 1) Laufende/alte Transporte des Fahrzeugs entfernen (falls vorhanden),
|
||||
// damit das Löschen des Fahrzeugs nicht an FK-Constraints scheitert.
|
||||
if let Err(err) = conn.execute("delete_transports_by_vehicle", &[&vehicle_id]) {
|
||||
eprintln!(
|
||||
"[TransportWorker] Fehler beim Löschen von Transporten für vehicle_id={}: {}",
|
||||
vehicle_id, err
|
||||
);
|
||||
// weiter versuchen: evtl. existieren keine oder keine Constraints
|
||||
}
|
||||
|
||||
// 2) Fahrzeug löschen
|
||||
if let Err(err) = conn.execute("delete_vehicle", &[&vehicle_id]) {
|
||||
eprintln!(
|
||||
"[TransportWorker] Fehler beim Löschen von vehicle_id={}: {}",
|
||||
vehicle_id, err
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 3) Notification an Besitzer (DB)
|
||||
let tr = format!(
|
||||
r#"{{"tr":"vehicle_destroyed","vehicle_id":{}}}"#,
|
||||
vehicle_id
|
||||
);
|
||||
if let Err(err) = insert_notification_conn(&mut conn, user_id, &tr, None) {
|
||||
eprintln!(
|
||||
"[TransportWorker] Fehler beim Schreiben der Vehicle-Notification (user_id={}, vehicle_id={}): {}",
|
||||
user_id, vehicle_id, err
|
||||
);
|
||||
}
|
||||
|
||||
affected_users.insert(user_id);
|
||||
deleted_count += 1;
|
||||
}
|
||||
|
||||
if deleted_count > 0 {
|
||||
eprintln!(
|
||||
"[TransportWorker] {} Fahrzeug(e) mit condition=0 gelöscht",
|
||||
deleted_count
|
||||
);
|
||||
}
|
||||
|
||||
// 4) Frontend informieren: Branches/Status neu laden
|
||||
for user_id in affected_users {
|
||||
publish_update_status(broker, user_id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_arrived_transports(
|
||||
pool: &ConnectionPool,
|
||||
broker: &MessageBroker,
|
||||
|
||||
Reference in New Issue
Block a user