169 lines
5.4 KiB
Rust
169 lines
5.4 KiB
Rust
use crate::db::{ConnectionPool, DbError};
|
||
use crate::message_broker::MessageBroker;
|
||
use std::sync::atomic::{AtomicBool, Ordering};
|
||
use std::sync::{Arc, Mutex};
|
||
use std::thread;
|
||
use std::time::Duration;
|
||
|
||
pub trait Worker: Send {
|
||
fn start_worker_thread(&mut self);
|
||
fn stop_worker_thread(&mut self);
|
||
fn enable_watchdog(&mut self);
|
||
}
|
||
|
||
pub(crate) struct WorkerState {
|
||
pub(crate) running_worker: AtomicBool,
|
||
pub(crate) running_watchdog: AtomicBool,
|
||
pub(crate) current_step: Mutex<String>,
|
||
}
|
||
|
||
impl WorkerState {
|
||
pub(crate) fn new(name: &str) -> Self {
|
||
Self {
|
||
running_worker: AtomicBool::new(false),
|
||
running_watchdog: AtomicBool::new(false),
|
||
current_step: Mutex::new(format!("{name}: idle")),
|
||
}
|
||
}
|
||
}
|
||
|
||
pub struct BaseWorker {
|
||
pub name: String,
|
||
pub pool: ConnectionPool,
|
||
pub broker: MessageBroker,
|
||
pub(crate) state: Arc<WorkerState>,
|
||
worker_thread: Option<thread::JoinHandle<()>>,
|
||
watchdog_thread: Option<thread::JoinHandle<()>>,
|
||
}
|
||
|
||
impl BaseWorker {
|
||
pub fn new(name: &str, pool: ConnectionPool, broker: MessageBroker) -> Self {
|
||
Self {
|
||
name: name.to_string(),
|
||
pool,
|
||
broker,
|
||
state: Arc::new(WorkerState::new(name)),
|
||
worker_thread: None,
|
||
watchdog_thread: None,
|
||
}
|
||
}
|
||
|
||
pub fn set_current_step<S: Into<String>>(&self, step: S) {
|
||
if let Ok(mut guard) = self.state.current_step.lock() {
|
||
*guard = step.into();
|
||
}
|
||
}
|
||
|
||
pub(crate) fn start_worker_with_loop<F>(&mut self, loop_fn: F)
|
||
where
|
||
F: Fn(Arc<WorkerState>) + Send + 'static,
|
||
{
|
||
if self.state.running_worker.swap(true, Ordering::SeqCst) {
|
||
eprintln!("[{}] Worker thread already running, skipping start.", self.name);
|
||
return;
|
||
}
|
||
|
||
let state = Arc::clone(&self.state);
|
||
|
||
self.worker_thread = Some(thread::spawn(move || {
|
||
loop_fn(state);
|
||
}));
|
||
}
|
||
|
||
pub(crate) fn stop_worker(&mut self) {
|
||
// Erst den Worker stoppen, dann auch den Watchdog beenden, damit keine
|
||
// Hintergrund-Threads weiterlaufen.
|
||
self.state.running_worker.store(false, Ordering::Relaxed);
|
||
self.stop_watchdog();
|
||
if let Some(handle) = self.worker_thread.take() {
|
||
let _ = handle.join();
|
||
}
|
||
}
|
||
|
||
pub(crate) fn start_watchdog(&mut self) {
|
||
if self
|
||
.state
|
||
.running_watchdog
|
||
.swap(true, Ordering::SeqCst)
|
||
{
|
||
eprintln!("[{}] Watchdog already enabled, skipping.", self.name);
|
||
return;
|
||
}
|
||
|
||
let state = Arc::clone(&self.state);
|
||
let name = self.name.clone();
|
||
|
||
self.watchdog_thread = Some(thread::spawn(move || {
|
||
while state.running_watchdog.load(Ordering::Relaxed) {
|
||
// Nicht in einem großen 10s-Sleep blockieren, damit der
|
||
// Shutdown (stop_watchdog) zügig reagieren kann. Stattdessen
|
||
// in 1s-Scheiben schlafen und dazwischen das Flag prüfen.
|
||
for _ in 0..10 {
|
||
if !state.running_watchdog.load(Ordering::Relaxed) {
|
||
break;
|
||
}
|
||
thread::sleep(Duration::from_secs(1));
|
||
}
|
||
|
||
let step = state.current_step.lock().unwrap().clone();
|
||
|
||
// "idle"-Meldungen sind im Dauerbetrieb eher Spam und helfen
|
||
// beim Debuggen selten. Deshalb nur loggen, wenn der Worker
|
||
// sich nicht im Idle-Zustand befindet.
|
||
if !step.ends_with(" idle") {
|
||
eprintln!("[{name}] Watchdog: current step = {step}");
|
||
}
|
||
}
|
||
}));
|
||
}
|
||
|
||
pub(crate) fn stop_watchdog(&mut self) {
|
||
self.state.running_watchdog.store(false, Ordering::Relaxed);
|
||
if let Some(handle) = self.watchdog_thread.take() {
|
||
let _ = handle.join();
|
||
}
|
||
}
|
||
|
||
// Bei Bedarf kann hier später wieder ein expliziter Statuszugriff ergänzt werden.
|
||
}
|
||
|
||
// Hinweis: In der bestehenden Datenbank ist `falukant_data.update_money` typischerweise
|
||
// so definiert, dass der Geldbetrag als NUMERIC erwartet wird. Wir übergeben
|
||
// die Parameter ohne explizite Casts; PostgreSQL castet sie anhand der
|
||
// Funktionssignatur automatisch.
|
||
const QUERY_UPDATE_MONEY: &str = r#"
|
||
SELECT falukant_data.update_money(
|
||
$1,
|
||
$2,
|
||
$3
|
||
);
|
||
"#;
|
||
|
||
impl BaseWorker {
|
||
/// Aktualisiert das Geld eines Falukant-Users über die DB-Funktion `falukant_data.update_money`.
|
||
/// `action` entspricht dem Log-/Aktions-Tag (z.B. "credit pay rate", "debitor_prism").
|
||
pub fn change_falukant_user_money(
|
||
&self,
|
||
falukant_user_id: i32,
|
||
money_change: f64,
|
||
action: &str,
|
||
) -> Result<(), DbError> {
|
||
let mut conn = self
|
||
.pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
// Statement einmalig registrieren und anschließend über den Namen
|
||
// ausführen – analog zum restlichen Code.
|
||
conn.prepare("update_money", QUERY_UPDATE_MONEY)?;
|
||
// Parameter werden in ihren natürlichen Typen übergeben; PostgreSQL
|
||
// wählt anhand der Funktionssignatur die passenden Zieltypen.
|
||
let money_str = money_change.to_string(); // NUMERIC erwartet String-Repräsentation
|
||
conn.execute("update_money", &[&falukant_user_id, &money_str, &action])?;
|
||
|
||
Ok(())
|
||
}
|
||
}
|
||
|
||
|