Enhance transport processing in TransportWorker: Updated run_loop and process_arrived_transports methods to include broker for publishing notifications. Improved handling of empty transports and added notifications for transport arrival and inventory updates, ensuring timely user alerts and better transport management.

This commit is contained in:
Torsten Schulz (local)
2025-12-05 14:12:17 +01:00
parent fbcea09257
commit 58436bc016

View File

@@ -120,9 +120,9 @@ impl TransportWorker {
} }
} }
fn run_loop(pool: ConnectionPool, _broker: MessageBroker, state: Arc<WorkerState>) { fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc<WorkerState>) {
while state.running_worker.load(Ordering::Relaxed) { while state.running_worker.load(Ordering::Relaxed) {
if let Err(err) = Self::process_arrived_transports(&pool) { if let Err(err) = Self::process_arrived_transports(&pool, &broker) {
eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}"); eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}");
} }
@@ -136,11 +136,14 @@ impl TransportWorker {
} }
} }
fn process_arrived_transports(pool: &ConnectionPool) -> Result<(), DbError> { fn process_arrived_transports(
pool: &ConnectionPool,
broker: &MessageBroker,
) -> Result<(), DbError> {
let transports = Self::load_arrived_transports(pool)?; let transports = Self::load_arrived_transports(pool)?;
for t in transports { for t in transports {
if let Err(err) = Self::handle_arrived_transport(pool, &t) { if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) {
eprintln!( eprintln!(
"[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}", "[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}",
t.id t.id
@@ -196,7 +199,14 @@ impl TransportWorker {
Ok(result) Ok(result)
} }
fn handle_arrived_transport(pool: &ConnectionPool, t: &ArrivedTransport) -> Result<(), DbError> { fn handle_arrived_transport(
pool: &ConnectionPool,
broker: &MessageBroker,
t: &ArrivedTransport,
) -> Result<(), DbError> {
// Ermittle user_id für Notifications
let user_id = Self::get_branch_user_id(pool, t.branch_id).unwrap_or(-1);
// Leere Transporte (ohne Produkt) werden anders behandelt // Leere Transporte (ohne Produkt) werden anders behandelt
if t.product_id.is_none() { if t.product_id.is_none() {
// Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen // Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen
@@ -206,6 +216,16 @@ impl TransportWorker {
); );
Self::update_vehicle_after_transport(pool, t.vehicle_id, t.branch_id, t.distance)?; Self::update_vehicle_after_transport(pool, t.vehicle_id, t.branch_id, t.distance)?;
Self::delete_transport(pool, t.id)?; Self::delete_transport(pool, t.id)?;
// Notification für leeren Transport
if user_id > 0 {
let message = format!(
r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"empty":true}}"#,
t.branch_id, user_id
);
broker.publish(message);
}
return Ok(()); return Ok(());
} }
@@ -225,6 +245,21 @@ impl TransportWorker {
// 3) Transport-Eintrag löschen // 3) Transport-Eintrag löschen
Self::delete_transport(pool, t.id)?; Self::delete_transport(pool, t.id)?;
// Notifications: Transport angekommen und Inventar aktualisiert
if user_id > 0 {
let transport_message = format!(
r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"product_id":{},"quantity":{}}}"#,
t.branch_id, user_id, product_id, delivered
);
broker.publish(transport_message);
let inventory_message = format!(
r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#,
t.branch_id, user_id
);
broker.publish(inventory_message);
}
} else { } else {
// Es konnte nur ein Teil (oder nichts) eingelagert werden: // Es konnte nur ein Teil (oder nichts) eingelagert werden:
// - Transport bleibt in der Tabelle bestehen // - Transport bleibt in der Tabelle bestehen
@@ -232,11 +267,19 @@ impl TransportWorker {
// beim nächsten Durchlauf erneut versucht wird. // beim nächsten Durchlauf erneut versucht wird.
if delivered > 0 { if delivered > 0 {
Self::update_transport_size(pool, t.id, remaining_quantity)?; Self::update_transport_size(pool, t.id, remaining_quantity)?;
// Notification: Teilweise eingelagert, Inventar aktualisiert
if user_id > 0 {
let inventory_message = format!(
r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#,
t.branch_id, user_id
);
broker.publish(inventory_message);
}
} }
// Nutzer informieren, dass Ware noch im Transportmittel liegt. // Nutzer informieren, dass Ware noch im Transportmittel liegt.
// Wir ermitteln hierzu den Besitzer des Ziel-Branches. // Wir ermitteln hierzu den Besitzer des Ziel-Branches.
if let Ok(user_id) = Self::get_branch_user_id(pool, t.branch_id) {
if user_id > 0 { if user_id > 0 {
if let Err(err) = Self::insert_transport_waiting_notification( if let Err(err) = Self::insert_transport_waiting_notification(
pool, pool,
@@ -250,7 +293,6 @@ impl TransportWorker {
} }
} }
} }
}
Ok(()) Ok(())
} }