diff --git a/src/worker/produce.rs b/src/worker/produce.rs index 7c2b4dd..69b8507 100644 --- a/src/worker/produce.rs +++ b/src/worker/produce.rs @@ -1,6 +1,7 @@ use crate::db::{Row, Rows}; use crate::message_broker::MessageBroker; use std::cmp::min; +use std::sync::Mutex; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -16,6 +17,8 @@ use crate::worker::sql::{ }; use crate::worker::insert_notification_conn; +const QUERY_COUNT_OPEN_PRODUCTIONS: &str = r#"SELECT COUNT(*) AS cnt FROM falukant_data.production;"#; + /// Abbildet eine abgeschlossene Produktion aus der Datenbank. #[derive(Debug, Clone)] struct FinishedProduction { @@ -40,6 +43,7 @@ struct StockInfo { pub struct ProduceWorker { base: BaseWorker, last_iteration: Option, + last_start_fix: Option, } impl ProduceWorker { @@ -47,6 +51,7 @@ impl ProduceWorker { Self { base: BaseWorker::new("ProduceWorker", pool, broker), last_iteration: None, + last_start_fix: None, } } @@ -112,6 +117,20 @@ impl ProduceWorker { } fn process_productions(&mut self) { + // Heartbeat (damit sichtbar ist, dass der Worker läuft), max. 1x/Stunde. + static LAST_HEARTBEAT: Mutex> = Mutex::new(None); + { + let mut last = LAST_HEARTBEAT.lock().unwrap(); + let should_log = last + .map(|t| t.elapsed().as_secs() >= 3600) + .unwrap_or(true); + if should_log { + eprintln!("[ProduceWorker] Heartbeat: alive"); + *last = Some(Instant::now()); + } + } + + self.maybe_fix_null_production_starts(); self.base .set_current_step("Fetch Finished Productions"); @@ -123,6 +142,50 @@ impl ProduceWorker { } }; + // Debug: Gedrosselt loggen, wenn nie etwas "fertig" wird. + if finished_productions.is_empty() { + static LAST_EMPTY_LOG: Mutex> = Mutex::new(None); + let mut last = LAST_EMPTY_LOG.lock().unwrap(); + let should_log = last + .map(|t| t.elapsed().as_secs() >= 60) + .unwrap_or(true); + if should_log { + let mut conn = match self.base.pool.get() { + Ok(c) => c, + Err(e) => { + eprintln!("[ProduceWorker] DB-Verbindung fehlgeschlagen (debug): {e}"); + return; + } + }; + + let open_cnt = (|| -> Result { + conn.prepare("count_open_productions", QUERY_COUNT_OPEN_PRODUCTIONS)?; + let rows = conn.execute("count_open_productions", &[])?; + Ok(rows + .first() + .and_then(|r| r.get("cnt")) + .and_then(|v| v.parse::().ok()) + .unwrap_or(0)) + })() + .unwrap_or(0); + + let mut sql_preview = QUERY_GET_FINISHED_PRODUCTIONS.to_string(); + const MAX_SQL_PREVIEW: usize = 1200; + if sql_preview.len() > MAX_SQL_PREVIEW { + sql_preview.truncate(MAX_SQL_PREVIEW); + sql_preview.push_str(" …"); + } + + eprintln!( + "[ProduceWorker] Keine fertigen Produktionen gefunden. Offene Produktionen in DB: {}. Query(get_finished_productions): {}", + open_cnt, + sql_preview + ); + + *last = Some(Instant::now()); + } + } + self.base .set_current_step("Process Finished Productions"); @@ -301,6 +364,37 @@ impl ProduceWorker { conn.execute("get_finished_productions", &[]) } + fn maybe_fix_null_production_starts(&mut self) { + let now = Instant::now(); + let should_run = self + .last_start_fix + .map(|t| now.saturating_duration_since(t) >= Duration::from_secs(3600)) + .unwrap_or(true); + if !should_run { + return; + } + + self.last_start_fix = Some(now); + + let mut conn = match self.base.pool.get() { + Ok(c) => c, + Err(e) => { + eprintln!("[ProduceWorker] DB-Verbindung fehlgeschlagen (start_fix): {e}"); + return; + } + }; + + // best-effort: nur loggen, wenn es scheitert + use crate::worker::sql::QUERY_FIX_NULL_PRODUCTION_START; + if let Err(err) = conn.prepare("fix_null_production_start", QUERY_FIX_NULL_PRODUCTION_START) { + eprintln!("[ProduceWorker] Fehler prepare fix_null_production_start: {err}"); + return; + } + if let Err(err) = conn.execute("fix_null_production_start", &[]) { + eprintln!("[ProduceWorker] Fehler exec fix_null_production_start: {err}"); + } + } + fn load_available_stocks(&self, branch_id: i32) -> Result { let mut conn = self .base diff --git a/src/worker/sql.rs b/src/worker/sql.rs index 8f9b369..33b1b33 100644 --- a/src/worker/sql.rs +++ b/src/worker/sql.rs @@ -185,7 +185,18 @@ LIMIT 1; "#; pub const QUERY_INSERT_PRODUCTION: &str = r#" -INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id) VALUES ($1, $2, $3, (SELECT weather_type_id FROM falukant_data.weather WHERE region_id = $4)); +INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id, start_timestamp) +VALUES ($1, $2, $3, (SELECT weather_type_id FROM falukant_data.weather WHERE region_id = $4), NOW()); +"#; + +// Reparatur: falls Produktionen ohne start_timestamp angelegt wurden, werden sie so gesetzt, +// dass sie sofort als „fertig“ gelten (start = NOW() - production_time Minuten). +pub const QUERY_FIX_NULL_PRODUCTION_START: &str = r#" + UPDATE falukant_data.production p + SET start_timestamp = NOW() - (INTERVAL '1 minute' * pr.production_time) + FROM falukant_type.product pr + WHERE p.start_timestamp IS NULL + AND pr.id = p.product_id; "#; // Character creation related queries (missing from earlier extraction)