Add heartbeat and null production start fix in ProduceWorker: Implement periodic logging to monitor worker activity and add functionality to update productions with null start timestamps. Enhance SQL queries for better data integrity and debugging capabilities.

This commit is contained in:
Torsten Schulz (local)
2026-01-05 15:30:39 +01:00
parent 108ac6c82b
commit e2630eb32a
2 changed files with 106 additions and 1 deletions

View File

@@ -1,6 +1,7 @@
use crate::db::{Row, Rows}; use crate::db::{Row, Rows};
use crate::message_broker::MessageBroker; use crate::message_broker::MessageBroker;
use std::cmp::min; use std::cmp::min;
use std::sync::Mutex;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@@ -16,6 +17,8 @@ use crate::worker::sql::{
}; };
use crate::worker::insert_notification_conn; use crate::worker::insert_notification_conn;
const QUERY_COUNT_OPEN_PRODUCTIONS: &str = r#"SELECT COUNT(*) AS cnt FROM falukant_data.production;"#;
/// Abbildet eine abgeschlossene Produktion aus der Datenbank. /// Abbildet eine abgeschlossene Produktion aus der Datenbank.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct FinishedProduction { struct FinishedProduction {
@@ -40,6 +43,7 @@ struct StockInfo {
pub struct ProduceWorker { pub struct ProduceWorker {
base: BaseWorker, base: BaseWorker,
last_iteration: Option<Instant>, last_iteration: Option<Instant>,
last_start_fix: Option<Instant>,
} }
impl ProduceWorker { impl ProduceWorker {
@@ -47,6 +51,7 @@ impl ProduceWorker {
Self { Self {
base: BaseWorker::new("ProduceWorker", pool, broker), base: BaseWorker::new("ProduceWorker", pool, broker),
last_iteration: None, last_iteration: None,
last_start_fix: None,
} }
} }
@@ -112,6 +117,20 @@ impl ProduceWorker {
} }
fn process_productions(&mut self) { fn process_productions(&mut self) {
// Heartbeat (damit sichtbar ist, dass der Worker läuft), max. 1x/Stunde.
static LAST_HEARTBEAT: Mutex<Option<Instant>> = Mutex::new(None);
{
let mut last = LAST_HEARTBEAT.lock().unwrap();
let should_log = last
.map(|t| t.elapsed().as_secs() >= 3600)
.unwrap_or(true);
if should_log {
eprintln!("[ProduceWorker] Heartbeat: alive");
*last = Some(Instant::now());
}
}
self.maybe_fix_null_production_starts();
self.base self.base
.set_current_step("Fetch Finished Productions"); .set_current_step("Fetch Finished Productions");
@@ -123,6 +142,50 @@ impl ProduceWorker {
} }
}; };
// Debug: Gedrosselt loggen, wenn nie etwas "fertig" wird.
if finished_productions.is_empty() {
static LAST_EMPTY_LOG: Mutex<Option<Instant>> = Mutex::new(None);
let mut last = LAST_EMPTY_LOG.lock().unwrap();
let should_log = last
.map(|t| t.elapsed().as_secs() >= 60)
.unwrap_or(true);
if should_log {
let mut conn = match self.base.pool.get() {
Ok(c) => c,
Err(e) => {
eprintln!("[ProduceWorker] DB-Verbindung fehlgeschlagen (debug): {e}");
return;
}
};
let open_cnt = (|| -> Result<i32, crate::db::DbError> {
conn.prepare("count_open_productions", QUERY_COUNT_OPEN_PRODUCTIONS)?;
let rows = conn.execute("count_open_productions", &[])?;
Ok(rows
.first()
.and_then(|r| r.get("cnt"))
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0))
})()
.unwrap_or(0);
let mut sql_preview = QUERY_GET_FINISHED_PRODUCTIONS.to_string();
const MAX_SQL_PREVIEW: usize = 1200;
if sql_preview.len() > MAX_SQL_PREVIEW {
sql_preview.truncate(MAX_SQL_PREVIEW);
sql_preview.push_str("");
}
eprintln!(
"[ProduceWorker] Keine fertigen Produktionen gefunden. Offene Produktionen in DB: {}. Query(get_finished_productions): {}",
open_cnt,
sql_preview
);
*last = Some(Instant::now());
}
}
self.base self.base
.set_current_step("Process Finished Productions"); .set_current_step("Process Finished Productions");
@@ -301,6 +364,37 @@ impl ProduceWorker {
conn.execute("get_finished_productions", &[]) conn.execute("get_finished_productions", &[])
} }
fn maybe_fix_null_production_starts(&mut self) {
let now = Instant::now();
let should_run = self
.last_start_fix
.map(|t| now.saturating_duration_since(t) >= Duration::from_secs(3600))
.unwrap_or(true);
if !should_run {
return;
}
self.last_start_fix = Some(now);
let mut conn = match self.base.pool.get() {
Ok(c) => c,
Err(e) => {
eprintln!("[ProduceWorker] DB-Verbindung fehlgeschlagen (start_fix): {e}");
return;
}
};
// best-effort: nur loggen, wenn es scheitert
use crate::worker::sql::QUERY_FIX_NULL_PRODUCTION_START;
if let Err(err) = conn.prepare("fix_null_production_start", QUERY_FIX_NULL_PRODUCTION_START) {
eprintln!("[ProduceWorker] Fehler prepare fix_null_production_start: {err}");
return;
}
if let Err(err) = conn.execute("fix_null_production_start", &[]) {
eprintln!("[ProduceWorker] Fehler exec fix_null_production_start: {err}");
}
}
fn load_available_stocks(&self, branch_id: i32) -> Result<Rows, crate::db::DbError> { fn load_available_stocks(&self, branch_id: i32) -> Result<Rows, crate::db::DbError> {
let mut conn = self let mut conn = self
.base .base

View File

@@ -185,7 +185,18 @@ LIMIT 1;
"#; "#;
pub const QUERY_INSERT_PRODUCTION: &str = r#" pub const QUERY_INSERT_PRODUCTION: &str = r#"
INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id) VALUES ($1, $2, $3, (SELECT weather_type_id FROM falukant_data.weather WHERE region_id = $4)); INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id, start_timestamp)
VALUES ($1, $2, $3, (SELECT weather_type_id FROM falukant_data.weather WHERE region_id = $4), NOW());
"#;
// Reparatur: falls Produktionen ohne start_timestamp angelegt wurden, werden sie so gesetzt,
// dass sie sofort als „fertig“ gelten (start = NOW() - production_time Minuten).
pub const QUERY_FIX_NULL_PRODUCTION_START: &str = r#"
UPDATE falukant_data.production p
SET start_timestamp = NOW() - (INTERVAL '1 minute' * pr.production_time)
FROM falukant_type.product pr
WHERE p.start_timestamp IS NULL
AND pr.id = p.product_id;
"#; "#;
// Character creation related queries (missing from earlier extraction) // Character creation related queries (missing from earlier extraction)