Files
yourpart-daemon/src/worker/transport.rs

417 lines
14 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use crate::db::{ConnectionPool, DbError};
use crate::message_broker::MessageBroker;
use std::cmp::min;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_ARRIVED_TRANSPORTS,
QUERY_GET_AVAILABLE_STOCKS,
QUERY_INSERT_INVENTORY,
QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT,
QUERY_DELETE_TRANSPORT,
QUERY_GET_BRANCH_REGION,
QUERY_UPDATE_TRANSPORT_SIZE,
};
#[derive(Debug, Clone)]
struct ArrivedTransport {
id: i32,
source_branch_id: Option<i32>, // NULL wenn kein Branch in Source-Region
target_branch_id: i32,
product_id: Option<i32>, // NULL für leere Transporte
size: i32,
vehicle_id: i32,
distance: f64,
user_id: i32, // User-ID für Notifications
}
#[derive(Debug, Clone)]
struct StockInfo {
stock_id: i32,
total_capacity: i32,
filled: i32,
}
pub struct TransportWorker {
base: BaseWorker,
}
impl TransportWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self {
base: BaseWorker::new("TransportWorker", pool, broker),
}
}
fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc<WorkerState>) {
while state.running_worker.load(Ordering::Relaxed) {
if let Err(err) = Self::process_arrived_transports(&pool, &broker) {
eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}");
}
// Einmal pro Sekunde prüfen
for _ in 0..1 {
if !state.running_worker.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(Duration::from_secs(1));
}
}
}
fn process_arrived_transports(
pool: &ConnectionPool,
broker: &MessageBroker,
) -> Result<(), DbError> {
let transports = Self::load_arrived_transports(pool)?;
for t in transports {
if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) {
eprintln!(
"[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}",
t.id
);
}
}
Ok(())
}
fn load_arrived_transports(pool: &ConnectionPool) -> Result<Vec<ArrivedTransport>, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?;
let rows = conn.execute("get_arrived_transports", &[])?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
let id = parse_i32(&row, "id", -1);
let target_branch_id = parse_i32(&row, "target_branch_id", -1);
// source_branch_id kann NULL sein (wenn kein Branch in Source-Region)
let source_branch_id_str = row.get("source_branch_id");
let source_branch_id = source_branch_id_str
.and_then(|v| v.parse::<i32>().ok())
.filter(|&x| x >= 0);
// product_id kann NULL sein (für leere Transporte)
let product_id_str = row.get("product_id");
let product_id = product_id_str
.and_then(|v| v.parse::<i32>().ok());
let size = parse_i32(&row, "size", 0);
let vehicle_id = parse_i32(&row, "vehicle_id", -1);
let distance = parse_f64(&row, "distance", 0.0);
let user_id = parse_i32(&row, "user_id", -1);
// Für normale Transporte: product_id muss vorhanden sein und size > 0
// Für leere Transporte: product_id ist NULL und size = 0
let is_valid = if product_id.is_some() {
// Normaler Transport mit Produkt
id >= 0 && target_branch_id >= 0 && vehicle_id >= 0 && size > 0 && user_id >= 0
} else {
// Leerer Transport (ohne Produkt)
id >= 0 && target_branch_id >= 0 && vehicle_id >= 0 && size == 0 && user_id >= 0
};
if is_valid {
result.push(ArrivedTransport {
id,
source_branch_id,
target_branch_id,
product_id,
size,
vehicle_id,
distance,
user_id,
});
}
}
Ok(result)
}
fn handle_arrived_transport(
pool: &ConnectionPool,
broker: &MessageBroker,
t: &ArrivedTransport,
) -> Result<(), DbError> {
// Leere Transporte (ohne Produkt) werden anders behandelt
if t.product_id.is_none() {
// Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen
// keine Info-Logs
Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?;
Self::delete_transport(pool, t.id)?;
// Notifications für beide Branches
if t.user_id > 0 {
// Target-Branch: Transport angekommen
let target_message = format!(
r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"empty":true}}"#,
t.target_branch_id, t.user_id
);
broker.publish(target_message);
// Source-Branch: Transport entfernt (falls vorhanden)
if let Some(source_branch_id) = t.source_branch_id {
let source_message = format!(
r#"{{"event":"transport_removed","branch_id":{},"user_id":{}}}"#,
source_branch_id, t.user_id
);
broker.publish(source_message);
}
}
return Ok(());
}
// Normaler Transport mit Produkt:
// 1) Waren in das Ziel-Branch-Lager einbuchen wir erhalten die
// tatsächlich verbleibende Menge im Transportmittel zurück.
let product_id = t.product_id.unwrap();
let remaining_quantity =
Self::add_to_inventory(pool, t.target_branch_id, product_id, t.size)?;
let delivered = t.size.saturating_sub(remaining_quantity);
if remaining_quantity <= 0 {
// Alles konnte eingelagert werden:
// 2) Fahrzeug-Region & Zustand aktualisieren
Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?;
// 3) Transport-Eintrag löschen
Self::delete_transport(pool, t.id)?;
// Notifications für beide Branches
if t.user_id > 0 {
// Target-Branch: Transport angekommen und Inventar aktualisiert
let target_transport_message = format!(
r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"product_id":{},"quantity":{}}}"#,
t.target_branch_id, t.user_id, product_id, delivered
);
broker.publish(target_transport_message);
let target_inventory_message = format!(
r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#,
t.target_branch_id, t.user_id
);
broker.publish(target_inventory_message);
// Source-Branch: Transport entfernt (falls vorhanden)
if let Some(source_branch_id) = t.source_branch_id {
let source_message = format!(
r#"{{"event":"transport_removed","branch_id":{},"user_id":{}}}"#,
source_branch_id, t.user_id
);
broker.publish(source_message);
}
}
} else {
// Es konnte nur ein Teil (oder nichts) eingelagert werden:
// - Transport bleibt in der Tabelle bestehen
// - Größe im Transport anpassen, damit nur der verbliebene Rest
// beim nächsten Durchlauf erneut versucht wird.
if delivered > 0 {
Self::update_transport_size(pool, t.id, remaining_quantity)?;
// Notification: Teilweise eingelagert, Inventar aktualisiert
if t.user_id > 0 {
let target_inventory_message = format!(
r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#,
t.target_branch_id, t.user_id
);
broker.publish(target_inventory_message);
}
}
// Keine Notification für wartende Transporte, um Notification-System zu entlasten.
}
Ok(())
}
fn add_to_inventory(
pool: &ConnectionPool,
branch_id: i32,
product_id: i32,
quantity: i32,
) -> Result<i32, DbError> {
let stocks = Self::get_available_stocks(pool, branch_id)?;
let mut remaining_quantity = quantity;
for stock in stocks {
if remaining_quantity <= 0 {
break;
}
let free_capacity = stock.total_capacity - stock.filled;
if free_capacity <= 0 {
continue;
}
let to_store = min(remaining_quantity, free_capacity);
Self::store_in_stock(pool, stock.stock_id, product_id, to_store)?;
remaining_quantity -= to_store;
}
Ok(remaining_quantity)
}
fn get_available_stocks(pool: &ConnectionPool, branch_id: i32) -> Result<Vec<StockInfo>, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("transport_get_stocks", QUERY_GET_AVAILABLE_STOCKS)?;
let rows = conn.execute("transport_get_stocks", &[&branch_id])?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
let stock_id = parse_i32(&row, "id", -1);
let total_capacity = parse_i32(&row, "total_capacity", 0);
let filled = parse_i32(&row, "filled", 0);
if stock_id >= 0 {
result.push(StockInfo {
stock_id,
total_capacity,
filled,
});
}
}
Ok(result)
}
fn store_in_stock(
pool: &ConnectionPool,
stock_id: i32,
product_id: i32,
quantity: i32,
) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("transport_insert_inventory", QUERY_INSERT_INVENTORY)?;
// Qualität für Transporte zunächst konservativ auf 100 setzen. Eine
// spätere Erweiterung könnte Qualitätseinbußen durch lange Strecken
// oder schlechten Fahrzeugzustand berücksichtigen.
let quality: i32 = 100;
conn.execute(
"transport_insert_inventory",
&[&stock_id, &product_id, &quantity, &quality],
)?;
Ok(())
}
fn update_vehicle_after_transport(
pool: &ConnectionPool,
vehicle_id: i32,
target_branch_id: i32,
distance: f64,
) -> Result<(), DbError> {
// Hole die Ziel-Region über den Branch
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Region des Branches abrufen
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
let rows = conn.execute("get_branch_region", &[&target_branch_id])?;
let region_id = rows
.first()
.and_then(|r| r.get("region_id"))
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
if region_id < 0 {
return Err(DbError::new(format!(
"Konnte Region für Branch {} nicht bestimmen",
target_branch_id
)));
}
conn.prepare(
"update_vehicle_after_transport",
QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT,
)?;
let distance_int = distance.round() as i32;
conn.execute(
"update_vehicle_after_transport",
&[&vehicle_id, &region_id, &distance_int],
)?;
Ok(())
}
fn delete_transport(pool: &ConnectionPool, transport_id: i32) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("delete_transport", QUERY_DELETE_TRANSPORT)?;
conn.execute("delete_transport", &[&transport_id])?;
Ok(())
}
fn update_transport_size(
pool: &ConnectionPool,
transport_id: i32,
new_size: i32,
) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("update_transport_size", QUERY_UPDATE_TRANSPORT_SIZE)?;
conn.execute("update_transport_size", &[&transport_id, &new_size])?;
Ok(())
}
}
impl Worker for TransportWorker {
fn start_worker_thread(&mut self) {
let pool = self.base.pool.clone();
let broker = self.base.broker.clone();
self.base
.start_worker_with_loop(move |state: Arc<WorkerState>| {
TransportWorker::run_loop(pool.clone(), broker.clone(), state);
});
}
fn stop_worker_thread(&mut self) {
self.base.stop_worker();
}
fn enable_watchdog(&mut self) {
self.base.start_watchdog();
}
}
fn parse_i32(row: &crate::db::Row, key: &str, default: i32) -> i32 {
row.get(key)
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(default)
}
fn parse_f64(row: &crate::db::Row, key: &str, default: f64) -> f64 {
row.get(key)
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(default)
}