use crate::db::{ConnectionPool, DbError}; use crate::worker::sql::{QUERY_UPDATE_MONEY, QUERY_GET_MONEY}; use crate::message_broker::MessageBroker; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; pub trait Worker: Send { fn start_worker_thread(&mut self); fn stop_worker_thread(&mut self); fn enable_watchdog(&mut self); } pub(crate) struct WorkerState { pub(crate) running_worker: AtomicBool, pub(crate) running_watchdog: AtomicBool, pub(crate) current_step: Mutex, } // Default tax percent and treasury user id used if no external config is available. // Percent, e.g. 10.0 => 10%. pub const DEFAULT_TAX_PERCENT: f64 = 10.0; pub const DEFAULT_TREASURY_USER_ID: i32 = 1; impl WorkerState { pub(crate) fn new(name: &str) -> Self { Self { running_worker: AtomicBool::new(false), running_watchdog: AtomicBool::new(false), current_step: Mutex::new(format!("{name}: idle")), } } } pub struct BaseWorker { pub name: String, pub pool: ConnectionPool, pub broker: MessageBroker, pub(crate) state: Arc, worker_thread: Option>, watchdog_thread: Option>, } impl BaseWorker { pub fn new(name: &str, pool: ConnectionPool, broker: MessageBroker) -> Self { Self { name: name.to_string(), pool, broker, state: Arc::new(WorkerState::new(name)), worker_thread: None, watchdog_thread: None, } } pub fn set_current_step>(&self, step: S) { if let Ok(mut guard) = self.state.current_step.lock() { *guard = step.into(); } } pub(crate) fn start_worker_with_loop(&mut self, loop_fn: F) where F: Fn(Arc) + Send + 'static, { if self.state.running_worker.swap(true, Ordering::SeqCst) { eprintln!("[{}] Worker thread already running, skipping start.", self.name); return; } let state = Arc::clone(&self.state); self.worker_thread = Some(thread::spawn(move || { loop_fn(state); })); } pub(crate) fn stop_worker(&mut self) { // Erst den Worker stoppen, dann auch den Watchdog beenden, damit keine // Hintergrund-Threads weiterlaufen. self.state.running_worker.store(false, Ordering::Relaxed); self.stop_watchdog(); if let Some(handle) = self.worker_thread.take() { let _ = handle.join(); } } pub(crate) fn start_watchdog(&mut self) { if self .state .running_watchdog .swap(true, Ordering::SeqCst) { eprintln!("[{}] Watchdog already enabled, skipping.", self.name); return; } let state = Arc::clone(&self.state); self.watchdog_thread = Some(thread::spawn(move || { while state.running_watchdog.load(Ordering::Relaxed) { // Nicht in einem großen 10s-Sleep blockieren, damit der // Shutdown (stop_watchdog) zügig reagieren kann. Stattdessen // in 1s-Scheiben schlafen und dazwischen das Flag prüfen. for _ in 0..10 { if !state.running_watchdog.load(Ordering::Relaxed) { break; } thread::sleep(Duration::from_secs(1)); } let step = state.current_step.lock().unwrap().clone(); // "idle"-Meldungen sind im Dauerbetrieb eher Spam und helfen // beim Debuggen selten. Deshalb nur loggen, wenn der Worker // sich nicht im Idle-Zustand befindet. if !step.ends_with(" idle") { // keine Info-Logs im Watchdog } } })); } pub(crate) fn stop_watchdog(&mut self) { self.state.running_watchdog.store(false, Ordering::Relaxed); if let Some(handle) = self.watchdog_thread.take() { let _ = handle.join(); } } // Bei Bedarf kann hier später wieder ein expliziter Statuszugriff ergänzt werden. } impl BaseWorker { /// Aktualisiert das Geld eines Falukant-Users über die DB-Funktion `falukant_data.update_money`. /// `action` entspricht dem Log-/Aktions-Tag (z.B. "credit pay rate", "debitor_prism"). pub fn change_falukant_user_money( &self, falukant_user_id: i32, money_change: f64, action: &str, ) -> Result<(), DbError> { let mut conn = self .pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; // Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection conn.prepare("update_money", QUERY_UPDATE_MONEY)?; // Validate float to avoid passing NaN/inf which the postgres client // may fail to serialize with an unclear error message. if !money_change.is_finite() { return Err(DbError::new(format!( "Ungültiger money_change: {} (not finite)", money_change ))); } // We must ensure the resulting money fits in numeric(10,2). // numeric(10,2) max absolute value is < 10^8 (100_000_000) before rounding. // Fetch current money for the user and clamp the delta if needed. conn.prepare("get_money_for_clamp", QUERY_GET_MONEY)?; let rows = conn.execute("get_money_for_clamp", &[&falukant_user_id])?; let current_money: f64 = rows .first() .and_then(|r| r.get("money")) .and_then(|v| v.parse::().ok()) .unwrap_or(0.0); // compute tentative result let tentative = current_money + money_change; // numeric(10,2) allows values with absolute < 10^8 (100_000_000) const MAX_ABS: f64 = 100_000_000.0 - 0.01; // leave room for scale let adjusted_money_change = if tentative >= MAX_ABS { let clipped = MAX_ABS - current_money; eprintln!( "[BaseWorker] Clamping money_change: tentative {} exceeds numeric(10,2) max, clipping to {}", tentative, clipped ); clipped } else if tentative <= -MAX_ABS { let clipped = -MAX_ABS - current_money; eprintln!( "[BaseWorker] Clamping money_change: tentative {} below min, clipping to {}", tentative, clipped ); clipped } else { money_change }; // Send exact types matching the DB function signature: let uid_i32: i32 = falukant_user_id; let money_str = format!("{:.2}", adjusted_money_change); // Note: we intentionally avoid parameterized call due to serialization // issues in this environment and instead execute a literal SQL below. fn escape_sql_literal(s: &str) -> String { s.replace('\'', "''") } let escaped_action = escape_sql_literal(action); let sql = format!( "SELECT falukant_data.update_money({uid}, {money}::numeric, '{act}');", uid = uid_i32, money = money_str, act = escaped_action ); let _ = conn.query(&sql)?; // Best-effort: insert a money history entry so the UI/history views // can show the change even if the DB-function doesn't write it. // We don't want to fail the whole operation if this insert fails, // so log errors and continue. // Ensure money_history table exists (best-effort). If this fails, // we still don't want to abort the money update. let create_sql = r#" CREATE TABLE IF NOT EXISTS falukant_log.money_history ( id BIGSERIAL PRIMARY KEY, user_id INTEGER NOT NULL, change NUMERIC(10,2) NOT NULL, action TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); "#; let _ = conn.query(create_sql); let history_sql = format!( "INSERT INTO falukant_log.money_history (user_id, change, action, created_at) VALUES ({uid}, {money}::numeric, '{act}', NOW());", uid = uid_i32, money = money_str, act = escaped_action ); if let Err(err) = conn.query(&history_sql) { eprintln!( "[BaseWorker] Warning: inserting money_history failed for user {}: {}", uid_i32, err ); } Ok(()) } }