Files
yourpart-daemon/src/worker/base.rs

221 lines
7.3 KiB
Rust

use crate::db::{ConnectionPool, DbError};
use crate::message_broker::MessageBroker;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
pub trait Worker: Send {
fn start_worker_thread(&mut self);
fn stop_worker_thread(&mut self);
fn enable_watchdog(&mut self);
}
pub(crate) struct WorkerState {
pub(crate) running_worker: AtomicBool,
pub(crate) running_watchdog: AtomicBool,
pub(crate) current_step: Mutex<String>,
}
impl WorkerState {
pub(crate) fn new(name: &str) -> Self {
Self {
running_worker: AtomicBool::new(false),
running_watchdog: AtomicBool::new(false),
current_step: Mutex::new(format!("{name}: idle")),
}
}
}
pub struct BaseWorker {
pub name: String,
pub pool: ConnectionPool,
pub broker: MessageBroker,
pub(crate) state: Arc<WorkerState>,
worker_thread: Option<thread::JoinHandle<()>>,
watchdog_thread: Option<thread::JoinHandle<()>>,
}
impl BaseWorker {
pub fn new(name: &str, pool: ConnectionPool, broker: MessageBroker) -> Self {
Self {
name: name.to_string(),
pool,
broker,
state: Arc::new(WorkerState::new(name)),
worker_thread: None,
watchdog_thread: None,
}
}
pub fn set_current_step<S: Into<String>>(&self, step: S) {
if let Ok(mut guard) = self.state.current_step.lock() {
*guard = step.into();
}
}
pub(crate) fn start_worker_with_loop<F>(&mut self, loop_fn: F)
where
F: Fn(Arc<WorkerState>) + Send + 'static,
{
if self.state.running_worker.swap(true, Ordering::SeqCst) {
eprintln!("[{}] Worker thread already running, skipping start.", self.name);
return;
}
let state = Arc::clone(&self.state);
self.worker_thread = Some(thread::spawn(move || {
loop_fn(state);
}));
}
pub(crate) fn stop_worker(&mut self) {
// Erst den Worker stoppen, dann auch den Watchdog beenden, damit keine
// Hintergrund-Threads weiterlaufen.
self.state.running_worker.store(false, Ordering::Relaxed);
self.stop_watchdog();
if let Some(handle) = self.worker_thread.take() {
let _ = handle.join();
}
}
pub(crate) fn start_watchdog(&mut self) {
if self
.state
.running_watchdog
.swap(true, Ordering::SeqCst)
{
eprintln!("[{}] Watchdog already enabled, skipping.", self.name);
return;
}
let state = Arc::clone(&self.state);
let name = self.name.clone();
self.watchdog_thread = Some(thread::spawn(move || {
while state.running_watchdog.load(Ordering::Relaxed) {
// Nicht in einem großen 10s-Sleep blockieren, damit der
// Shutdown (stop_watchdog) zügig reagieren kann. Stattdessen
// in 1s-Scheiben schlafen und dazwischen das Flag prüfen.
for _ in 0..10 {
if !state.running_watchdog.load(Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_secs(1));
}
let step = state.current_step.lock().unwrap().clone();
// "idle"-Meldungen sind im Dauerbetrieb eher Spam und helfen
// beim Debuggen selten. Deshalb nur loggen, wenn der Worker
// sich nicht im Idle-Zustand befindet.
if !step.ends_with(" idle") {
eprintln!("[{name}] Watchdog: current step = {step}");
}
}
}));
}
pub(crate) fn stop_watchdog(&mut self) {
self.state.running_watchdog.store(false, Ordering::Relaxed);
if let Some(handle) = self.watchdog_thread.take() {
let _ = handle.join();
}
}
// Bei Bedarf kann hier später wieder ein expliziter Statuszugriff ergänzt werden.
}
impl BaseWorker {
/// Aktualisiert das Geld eines Falukant-Users über die DB-Funktion `falukant_data.update_money`.
/// `action` entspricht dem Log-/Aktions-Tag (z.B. "credit pay rate", "debitor_prism").
pub fn change_falukant_user_money(
&self,
falukant_user_id: i32,
money_change: f64,
action: &str,
) -> Result<(), DbError> {
let mut conn = self
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection
const QUERY_UPDATE_MONEY: &str = r#"
SELECT falukant_data.update_money($1, $2, $3);
"#;
conn.prepare("update_money", QUERY_UPDATE_MONEY)?;
use postgres::types::ToSql;
// Validate float to avoid passing NaN/inf which the postgres client
// may fail to serialize with an unclear error message.
if !money_change.is_finite() {
return Err(DbError::new(format!(
"Ungültiger money_change: {} (not finite)",
money_change
)));
}
// We must ensure the resulting money fits in numeric(10,2).
// numeric(10,2) max absolute value is < 10^8 (100_000_000) before rounding.
// Fetch current money for the user and clamp the delta if needed.
const QUERY_GET_MONEY: &str = r#"
SELECT money FROM falukant_data.falukant_user WHERE id = $1;
"#;
conn.prepare("get_money_for_clamp", QUERY_GET_MONEY)?;
let rows = conn.execute("get_money_for_clamp", &[&falukant_user_id])?;
let current_money: f64 = rows
.get(0)
.and_then(|r| r.get("money"))
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0);
// compute tentative result
let tentative = current_money + money_change;
// numeric(10,2) allows values with absolute < 10^8 (100_000_000)
const MAX_ABS: f64 = 100_000_000.0 - 0.01; // leave room for scale
let adjusted_money_change = if tentative >= MAX_ABS {
let clipped = MAX_ABS - current_money;
eprintln!(
"[BaseWorker] Clamping money_change: tentative {} exceeds numeric(10,2) max, clipping to {}",
tentative, clipped
);
clipped
} else if tentative <= -MAX_ABS {
let clipped = -MAX_ABS - current_money;
eprintln!(
"[BaseWorker] Clamping money_change: tentative {} below min, clipping to {}",
tentative, clipped
);
clipped
} else {
money_change
};
// Send exact types matching the DB function signature:
let uid_i32: i32 = falukant_user_id;
let money_str = format!("{:.2}", adjusted_money_change);
let p1: &(dyn ToSql + Sync) = &uid_i32;
let p2: &(dyn ToSql + Sync) = &money_str;
let p3: &(dyn ToSql + Sync) = &action;
eprintln!(
"[BaseWorker] change_falukant_user_money: update_money(user_id={}, money_change='{}', action={})",
uid_i32, money_str, action
);
// Execute parameterized
let _ = conn.execute("update_money", &[p1, p2, p3])?;
Ok(())
}
}