Add TransportWorker and enhance logging: Introduced TransportWorker to the worker module and improved shutdown logging in the daemon for better visibility during system termination. Updated watchdog thread sleep mechanism for quicker shutdown response and refined error handling in DbError conversion to include SQLSTATE as a string.

This commit is contained in:
Torsten Schulz (local)
2025-11-26 16:03:27 +01:00
parent 25f2eb150d
commit 8ee0bbf3cd
5 changed files with 499 additions and 6 deletions

View File

@@ -33,7 +33,8 @@ impl std::error::Error for DbError {}
impl From<PgError> for DbError {
fn from(err: PgError) -> Self {
if let Some(db_err) = err.as_db_error() {
let code = db_err.code();
// SQLSTATE als String extrahieren (z.B. "23514")
let code = db_err.code().code().to_string();
let message = db_err.message();
let detail = db_err.detail().unwrap_or_default();
let hint = db_err.hint().unwrap_or_default();

View File

@@ -14,7 +14,7 @@ use message_broker::MessageBroker;
use websocket_server::WebSocketServer;
use worker::{
CharacterCreationWorker, ConnectionPool, DirectorWorker, HouseWorker, PoliticsWorker,
ProduceWorker, StockageManager, UndergroundWorker, UserCharacterWorker,
ProduceWorker, StockageManager, TransportWorker, UndergroundWorker, UserCharacterWorker,
ValueRecalculationWorker, Worker,
};
@@ -62,6 +62,7 @@ fn run_daemon() -> Result<(), Box<dyn std::error::Error>> {
fn install_signal_handler() -> Result<(), Box<dyn std::error::Error>> {
// Behandle SIGINT/SIGTERM (z.B. Strg+C) und leite auf das globale Flag um.
ctrlc::set_handler(|| {
eprintln!("[Daemon] SIGINT/SIGTERM empfangen, fahre kontrolliert herunter...");
KEEP_RUNNING.store(false, Ordering::SeqCst);
})?;
Ok(())
@@ -138,6 +139,7 @@ fn create_workers(pool: ConnectionPool, broker: MessageBroker) -> Vec<Box<dyn Wo
)),
Box::new(HouseWorker::new(pool.clone(), broker.clone())),
Box::new(PoliticsWorker::new(pool.clone(), broker.clone())),
Box::new(TransportWorker::new(pool.clone(), broker.clone())),
Box::new(UndergroundWorker::new(pool, broker)),
]
}
@@ -161,6 +163,7 @@ fn shutdown_system(
workers: &mut [Box<dyn Worker>],
broker: &MessageBroker,
) {
eprintln!("[Daemon] Shutdown-System gestartet: stoppe Worker...");
// systemd: wir fahren nun kontrolliert herunter
let _ = daemon::notify(false, &[NotifyState::Stopping]);
@@ -171,16 +174,21 @@ fn shutdown_system(
}
// 2) WebSocket-Server stoppen (Tokio-Runtime herunterfahren)
eprintln!("[Daemon] WebSocket-Server wird gestoppt...");
websocket_server.stop();
// 3) MessageBroker-Hook aktuell noch Stub, aber hier zentral ergänzt
// für eine spätere interne Queue/Thread-Implementierung.
eprintln!("[Daemon] Broker wird gestoppt...");
broker.stop();
eprintln!("[Daemon] Shutdown-System abgeschlossen.");
}
fn run_main_loop() {
eprintln!("[Daemon] Hauptloop gestartet.");
while KEEP_RUNNING.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(100));
}
eprintln!("[Daemon] Hauptloop beendet, starte Shutdown-Sequenz...");
}

View File

@@ -95,10 +95,14 @@ impl BaseWorker {
self.watchdog_thread = Some(thread::spawn(move || {
while state.running_watchdog.load(Ordering::Relaxed) {
thread::sleep(Duration::from_secs(10));
if !state.running_watchdog.load(Ordering::Relaxed) {
break;
// Nicht in einem großen 10s-Sleep blockieren, damit der
// Shutdown (stop_watchdog) zügig reagieren kann. Stattdessen
// in 1s-Scheiben schlafen und dazwischen das Flag prüfen.
for _ in 0..10 {
if !state.running_watchdog.load(Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_secs(1));
}
let step = state.current_step.lock().unwrap().clone();

View File

@@ -8,6 +8,7 @@ mod politics;
mod underground;
mod value_recalculation;
mod user_character;
mod transport;
pub use base::Worker;
pub use crate::db::ConnectionPool;
@@ -20,4 +21,5 @@ pub use politics::PoliticsWorker;
pub use underground::UndergroundWorker;
pub use value_recalculation::ValueRecalculationWorker;
pub use user_character::UserCharacterWorker;
pub use transport::TransportWorker;

478
src/worker/transport.rs Normal file
View File

@@ -0,0 +1,478 @@
use crate::db::{ConnectionPool, DbError};
use crate::message_broker::MessageBroker;
use std::cmp::min;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use super::base::{BaseWorker, Worker, WorkerState};
#[derive(Debug, Clone)]
struct ArrivedTransport {
id: i32,
branch_id: i32,
product_id: i32,
size: i32,
vehicle_id: i32,
distance: f64,
}
#[derive(Debug, Clone)]
struct StockInfo {
stock_id: i32,
total_capacity: i32,
filled: i32,
}
// Ermittelt alle Transporte, die gemäß Distanz und Fahrzeuggeschwindigkeit bereits
// angekommen sein sollten. Die Reisezeit wird hier vereinfacht als
// travel_minutes = distance / speed
// interpretiert, d.h. `speed` gibt die Einheiten "Distanz pro Minute" an.
const QUERY_GET_ARRIVED_TRANSPORTS: &str = r#"
SELECT
t.id,
t.product_id,
t.size,
t.vehicle_id,
b.id AS branch_id,
rd.distance AS distance
FROM falukant_data.transport AS t
JOIN falukant_data.vehicle AS v
ON v.id = t.vehicle_id
JOIN falukant_type.vehicle AS vt
ON vt.id = v.vehicle_type_id
JOIN falukant_data.region_distance AS rd
ON ((rd.source_region_id = t.source_region_id
AND rd.target_region_id = t.target_region_id)
OR (rd.source_region_id = t.target_region_id
AND rd.target_region_id = t.source_region_id))
AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
LEFT JOIN falukant_data.branch AS b
ON b.region_id = t.target_region_id
AND b.falukant_user_id = v.falukant_user_id
WHERE vt.speed > 0
AND t.created_at
+ (rd.distance / vt.speed::double precision) * INTERVAL '1 minute'
<= NOW();
"#;
// Verfügbare Lagerplätze in einem Branch, analog zur Logik im ProduceWorker.
const QUERY_GET_AVAILABLE_STOCKS: &str = r#"
SELECT
stock.id,
stock.quantity AS total_capacity,
(
SELECT COALESCE(SUM(inventory.quantity), 0)
FROM falukant_data.inventory
WHERE inventory.stock_id = stock.id
) AS filled
FROM falukant_data.stock stock
JOIN falukant_data.branch branch
ON stock.branch_id = branch.id
WHERE branch.id = $1
ORDER BY total_capacity DESC;
"#;
const QUERY_INSERT_INVENTORY: &str = r#"
INSERT INTO falukant_data.inventory (
stock_id,
product_id,
quantity,
quality,
produced_at
) VALUES ($1, $2, $3, $4, NOW());
"#;
const QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT: &str = r#"
UPDATE falukant_data.vehicle
SET region_id = $2,
condition = GREATEST(0, condition - $3::int),
available_from = NOW(),
updated_at = NOW()
WHERE id = $1;
"#;
const QUERY_DELETE_TRANSPORT: &str = r#"
DELETE FROM falukant_data.transport
WHERE id = $1;
"#;
/// Notification-Eintrag, analog zur Overproduction-Notification im ProduceWorker.
/// `tr` wird hier als JSON-String mit Übersetzungs-Key und Werten gespeichert.
const QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION: &str = r#"
INSERT INTO falukant_log.notification (
user_id,
tr,
shown,
created_at,
updated_at
) VALUES ($1, $2, FALSE, NOW(), NOW());
"#;
pub struct TransportWorker {
base: BaseWorker,
}
impl TransportWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self {
base: BaseWorker::new("TransportWorker", pool, broker),
}
}
fn run_loop(pool: ConnectionPool, _broker: MessageBroker, state: Arc<WorkerState>) {
while state.running_worker.load(Ordering::Relaxed) {
if let Err(err) = Self::process_arrived_transports(&pool) {
eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}");
}
// Einmal pro Sekunde prüfen
for _ in 0..1 {
if !state.running_worker.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(Duration::from_secs(1));
}
}
}
fn process_arrived_transports(pool: &ConnectionPool) -> Result<(), DbError> {
let transports = Self::load_arrived_transports(pool)?;
for t in transports {
if let Err(err) = Self::handle_arrived_transport(pool, &t) {
eprintln!(
"[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}",
t.id
);
}
}
Ok(())
}
fn load_arrived_transports(pool: &ConnectionPool) -> Result<Vec<ArrivedTransport>, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?;
let rows = conn.execute("get_arrived_transports", &[])?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
let id = parse_i32(&row, "id", -1);
let branch_id = parse_i32(&row, "branch_id", -1);
let product_id = parse_i32(&row, "product_id", -1);
let size = parse_i32(&row, "size", 0);
let vehicle_id = parse_i32(&row, "vehicle_id", -1);
let distance = parse_f64(&row, "distance", 0.0);
if id >= 0 && branch_id >= 0 && product_id >= 0 && vehicle_id >= 0 && size > 0 {
result.push(ArrivedTransport {
id,
branch_id,
product_id,
size,
vehicle_id,
distance,
});
}
}
Ok(result)
}
fn handle_arrived_transport(pool: &ConnectionPool, t: &ArrivedTransport) -> Result<(), DbError> {
// 1) Waren in das Ziel-Branch-Lager einbuchen wir erhalten die
// tatsächlich verbleibende Menge im Transportmittel zurück.
let remaining_quantity =
Self::add_to_inventory(pool, t.branch_id, t.product_id, t.size)?;
let delivered = t.size.saturating_sub(remaining_quantity);
if remaining_quantity <= 0 {
// Alles konnte eingelagert werden:
// 2) Fahrzeug-Region & Zustand aktualisieren
Self::update_vehicle_after_transport(pool, t.vehicle_id, t.branch_id, t.distance)?;
// 3) Transport-Eintrag löschen
Self::delete_transport(pool, t.id)?;
} else {
// Es konnte nur ein Teil (oder nichts) eingelagert werden:
// - Transport bleibt in der Tabelle bestehen
// - Größe im Transport anpassen, damit nur der verbliebene Rest
// beim nächsten Durchlauf erneut versucht wird.
if delivered > 0 {
Self::update_transport_size(pool, t.id, remaining_quantity)?;
}
// Nutzer informieren, dass Ware noch im Transportmittel liegt.
// Wir ermitteln hierzu den Besitzer des Ziel-Branches.
if let Ok(user_id) = Self::get_branch_user_id(pool, t.branch_id) {
if user_id > 0 {
if let Err(err) = Self::insert_transport_waiting_notification(
pool,
user_id,
t.product_id,
remaining_quantity,
) {
eprintln!(
"[TransportWorker] Fehler beim Schreiben der Transport-Waiting-Notification: {err}"
);
}
}
}
}
Ok(())
}
fn add_to_inventory(
pool: &ConnectionPool,
branch_id: i32,
product_id: i32,
quantity: i32,
) -> Result<i32, DbError> {
let stocks = Self::get_available_stocks(pool, branch_id)?;
let mut remaining_quantity = quantity;
for stock in stocks {
if remaining_quantity <= 0 {
break;
}
let free_capacity = stock.total_capacity - stock.filled;
if free_capacity <= 0 {
continue;
}
let to_store = min(remaining_quantity, free_capacity);
Self::store_in_stock(pool, stock.stock_id, product_id, to_store)?;
remaining_quantity -= to_store;
}
Ok(remaining_quantity)
}
fn get_available_stocks(pool: &ConnectionPool, branch_id: i32) -> Result<Vec<StockInfo>, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("transport_get_stocks", QUERY_GET_AVAILABLE_STOCKS)?;
let rows = conn.execute("transport_get_stocks", &[&branch_id])?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
let stock_id = parse_i32(&row, "id", -1);
let total_capacity = parse_i32(&row, "total_capacity", 0);
let filled = parse_i32(&row, "filled", 0);
if stock_id >= 0 {
result.push(StockInfo {
stock_id,
total_capacity,
filled,
});
}
}
Ok(result)
}
fn store_in_stock(
pool: &ConnectionPool,
stock_id: i32,
product_id: i32,
quantity: i32,
) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("transport_insert_inventory", QUERY_INSERT_INVENTORY)?;
// Qualität für Transporte zunächst konservativ auf 100 setzen. Eine
// spätere Erweiterung könnte Qualitätseinbußen durch lange Strecken
// oder schlechten Fahrzeugzustand berücksichtigen.
let quality: i32 = 100;
conn.execute(
"transport_insert_inventory",
&[&stock_id, &product_id, &quantity, &quality],
)?;
Ok(())
}
fn update_vehicle_after_transport(
pool: &ConnectionPool,
vehicle_id: i32,
target_branch_id: i32,
distance: f64,
) -> Result<(), DbError> {
// Hole die Ziel-Region über den Branch
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Region des Branches abrufen
const QUERY_GET_BRANCH_REGION: &str = r#"
SELECT region_id
FROM falukant_data.branch
WHERE id = $1
LIMIT 1;
"#;
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
let rows = conn.execute("get_branch_region", &[&target_branch_id])?;
let region_id = rows
.get(0)
.and_then(|r| r.get("region_id"))
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
if region_id < 0 {
return Err(DbError::new(format!(
"Konnte Region für Branch {} nicht bestimmen",
target_branch_id
)));
}
conn.prepare(
"update_vehicle_after_transport",
QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT,
)?;
let distance_int = distance.round() as i32;
conn.execute(
"update_vehicle_after_transport",
&[&vehicle_id, &region_id, &distance_int],
)?;
Ok(())
}
fn delete_transport(pool: &ConnectionPool, transport_id: i32) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("delete_transport", QUERY_DELETE_TRANSPORT)?;
conn.execute("delete_transport", &[&transport_id])?;
Ok(())
}
fn update_transport_size(
pool: &ConnectionPool,
transport_id: i32,
new_size: i32,
) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
const QUERY_UPDATE_TRANSPORT_SIZE: &str = r#"
UPDATE falukant_data.transport
SET size = $2,
updated_at = NOW()
WHERE id = $1;
"#;
conn.prepare("update_transport_size", QUERY_UPDATE_TRANSPORT_SIZE)?;
conn.execute("update_transport_size", &[&transport_id, &new_size])?;
Ok(())
}
fn get_branch_user_id(pool: &ConnectionPool, branch_id: i32) -> Result<i32, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
const QUERY_GET_BRANCH_USER: &str = r#"
SELECT falukant_user_id
FROM falukant_data.branch
WHERE id = $1
LIMIT 1;
"#;
conn.prepare("get_branch_user", QUERY_GET_BRANCH_USER)?;
let rows = conn.execute("get_branch_user", &[&branch_id])?;
let user_id = rows
.get(0)
.and_then(|r| r.get("falukant_user_id"))
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
Ok(user_id)
}
fn insert_transport_waiting_notification(
pool: &ConnectionPool,
user_id: i32,
product_id: i32,
remaining_quantity: i32,
) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare(
"add_transport_waiting_notification",
QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION,
)?;
let notification = format!(
r#"{{"tr":"transport.waiting","productId":{},"value":{}}}"#,
product_id, remaining_quantity
);
conn.execute(
"add_transport_waiting_notification",
&[&user_id, &notification],
)?;
Ok(())
}
}
impl Worker for TransportWorker {
fn start_worker_thread(&mut self) {
let pool = self.base.pool.clone();
let broker = self.base.broker.clone();
self.base
.start_worker_with_loop(move |state: Arc<WorkerState>| {
TransportWorker::run_loop(pool.clone(), broker.clone(), state);
});
}
fn stop_worker_thread(&mut self) {
self.base.stop_worker();
}
fn enable_watchdog(&mut self) {
self.base.start_watchdog();
}
}
fn parse_i32(row: &crate::db::Row, key: &str, default: i32) -> i32 {
row.get(key)
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(default)
}
fn parse_f64(row: &crate::db::Row, key: &str, default: f64) -> f64 {
row.get(key)
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(default)
}