From 58436bc0160a4d7b48af8871b2aef55348f6a706 Mon Sep 17 00:00:00 2001 From: "Torsten Schulz (local)" Date: Fri, 5 Dec 2025 14:12:17 +0100 Subject: [PATCH] Enhance transport processing in TransportWorker: Updated run_loop and process_arrived_transports methods to include broker for publishing notifications. Improved handling of empty transports and added notifications for transport arrival and inventory updates, ensuring timely user alerts and better transport management. --- src/worker/transport.rs | 76 ++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 17 deletions(-) diff --git a/src/worker/transport.rs b/src/worker/transport.rs index 8da4ca4..1691566 100644 --- a/src/worker/transport.rs +++ b/src/worker/transport.rs @@ -120,9 +120,9 @@ impl TransportWorker { } } - fn run_loop(pool: ConnectionPool, _broker: MessageBroker, state: Arc) { + fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { while state.running_worker.load(Ordering::Relaxed) { - if let Err(err) = Self::process_arrived_transports(&pool) { + if let Err(err) = Self::process_arrived_transports(&pool, &broker) { eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}"); } @@ -136,11 +136,14 @@ impl TransportWorker { } } - fn process_arrived_transports(pool: &ConnectionPool) -> Result<(), DbError> { + fn process_arrived_transports( + pool: &ConnectionPool, + broker: &MessageBroker, + ) -> Result<(), DbError> { let transports = Self::load_arrived_transports(pool)?; for t in transports { - if let Err(err) = Self::handle_arrived_transport(pool, &t) { + if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) { eprintln!( "[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}", t.id @@ -196,7 +199,14 @@ impl TransportWorker { Ok(result) } - fn handle_arrived_transport(pool: &ConnectionPool, t: &ArrivedTransport) -> Result<(), DbError> { + fn handle_arrived_transport( + pool: &ConnectionPool, + broker: &MessageBroker, + t: &ArrivedTransport, + ) -> Result<(), DbError> { + // Ermittle user_id für Notifications + let user_id = Self::get_branch_user_id(pool, t.branch_id).unwrap_or(-1); + // Leere Transporte (ohne Produkt) werden anders behandelt if t.product_id.is_none() { // Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen @@ -206,6 +216,16 @@ impl TransportWorker { ); Self::update_vehicle_after_transport(pool, t.vehicle_id, t.branch_id, t.distance)?; Self::delete_transport(pool, t.id)?; + + // Notification für leeren Transport + if user_id > 0 { + let message = format!( + r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"empty":true}}"#, + t.branch_id, user_id + ); + broker.publish(message); + } + return Ok(()); } @@ -225,6 +245,21 @@ impl TransportWorker { // 3) Transport-Eintrag löschen Self::delete_transport(pool, t.id)?; + + // Notifications: Transport angekommen und Inventar aktualisiert + if user_id > 0 { + let transport_message = format!( + r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"product_id":{},"quantity":{}}}"#, + t.branch_id, user_id, product_id, delivered + ); + broker.publish(transport_message); + + let inventory_message = format!( + r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#, + t.branch_id, user_id + ); + broker.publish(inventory_message); + } } else { // Es konnte nur ein Teil (oder nichts) eingelagert werden: // - Transport bleibt in der Tabelle bestehen @@ -232,22 +267,29 @@ impl TransportWorker { // beim nächsten Durchlauf erneut versucht wird. if delivered > 0 { Self::update_transport_size(pool, t.id, remaining_quantity)?; + + // Notification: Teilweise eingelagert, Inventar aktualisiert + if user_id > 0 { + let inventory_message = format!( + r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#, + t.branch_id, user_id + ); + broker.publish(inventory_message); + } } // Nutzer informieren, dass Ware noch im Transportmittel liegt. // Wir ermitteln hierzu den Besitzer des Ziel-Branches. - if let Ok(user_id) = Self::get_branch_user_id(pool, t.branch_id) { - if user_id > 0 { - if let Err(err) = Self::insert_transport_waiting_notification( - pool, - user_id, - product_id, - remaining_quantity, - ) { - eprintln!( - "[TransportWorker] Fehler beim Schreiben der Transport-Waiting-Notification: {err}" - ); - } + if user_id > 0 { + if let Err(err) = Self::insert_transport_waiting_notification( + pool, + user_id, + product_id, + remaining_quantity, + ) { + eprintln!( + "[TransportWorker] Fehler beim Schreiben der Transport-Waiting-Notification: {err}" + ); } } }