diff --git a/src/worker/director.rs b/src/worker/director.rs index 741f68a..f45c75d 100644 --- a/src/worker/director.rs +++ b/src/worker/director.rs @@ -851,22 +851,12 @@ impl DirectorWorker { item, )?; + // Inventar wird bereits in plan_transports_for_item reduziert if shipped > 0 { eprintln!( - "[DirectorWorker] Transport geplant: {} Einheiten von Produkt {} transportiert", + "[DirectorWorker] Transport geplant: {} Einheiten von Produkt {} transportiert (Inventar bereits reduziert)", shipped, item.product_id ); - if shipped >= item.quantity { - // Alles wurde in Transporte umgewandelt, Inventar komplett entfernen. - conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?; - conn.execute("remove_inventory", &[&item.id])?; - item.quantity = 0; - } else { - // Inventar-Menge in der DB reduzieren und im Item anpassen. - let remaining = item.quantity - shipped; - Self::update_inventory_quantity(&mut conn, item.id, remaining)?; - item.quantity = remaining; - } } else { eprintln!( "[DirectorWorker] Kein lohnender Transport gefunden für Produkt {} (region_id={})", @@ -937,7 +927,7 @@ impl DirectorWorker { // Vor dem eigentlichen Verkauf versucht der Director, lohnende // Transporte zu planen. Dabei werden: // - ggf. Transport-Einträge erzeugt - // - Inventar-Mengen reduziert + // - Inventar-Mengen reduziert (geschieht bereits in plan_transports_for_item) // Die zurückgegebenen Mengen werden dann lokal verkauft. for item in items.iter_mut() { let shipped = self.plan_transports_for_item( @@ -946,17 +936,8 @@ impl DirectorWorker { item, )?; - if shipped > 0 { - if shipped >= item.quantity { - // Alles wurde in Transporte umgewandelt, lokal nichts mehr zu verkaufen. - item.quantity = 0; - } else { - // Inventar-Menge in der DB reduzieren und im Item anpassen. - let remaining = item.quantity - shipped; - Self::update_inventory_quantity(&mut conn, item.id, remaining)?; - item.quantity = remaining; - } - } + // Inventar wird bereits in plan_transports_for_item reduziert + // item.quantity wurde dort bereits aktualisiert } // Anschließend lokale Verkäufe für die verbleibenden Mengen durchführen. @@ -1295,10 +1276,23 @@ impl DirectorWorker { let shipped = best_quantity - remaining.max(0); - // Optional: Logging zur Nachvollziehbarkeit + // Inventar sofort reduzieren, nachdem Transporte erfolgreich angelegt wurden + // Dies stellt sicher, dass Inventar und Transporte immer konsistent sind if shipped > 0 { + if shipped >= item.quantity { + // Alles wurde in Transporte umgewandelt, Inventar komplett entfernen + conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?; + conn.execute("remove_inventory", &[&item.id])?; + item.quantity = 0; + } else { + // Inventar-Menge in der DB reduzieren und im Item anpassen + let remaining_quantity = item.quantity - shipped; + Self::update_inventory_quantity(conn, item.id, remaining_quantity)?; + item.quantity = remaining_quantity; + } + eprintln!( - "[DirectorWorker] Transport geplant: {} Einheiten von Produkt {} von Region {} nach Region {} (Stückpreis lokal {:.2}, remote {:.2})", + "[DirectorWorker] Transport geplant: {} Einheiten von Produkt {} von Region {} nach Region {} (Stückpreis lokal {:.2}, remote {:.2}). Inventar reduziert.", shipped, item.product_id, item.region_id, target_region, local_piece_price, best_remote_piece_price ); } diff --git a/src/worker/transport.rs b/src/worker/transport.rs index 47f5f57..a32afd6 100644 --- a/src/worker/transport.rs +++ b/src/worker/transport.rs @@ -174,7 +174,12 @@ impl TransportWorker { let mut result = Vec::with_capacity(rows.len()); for row in rows { let id = parse_i32(&row, "id", -1); - let branch_id = parse_i32(&row, "branch_id", -1); + let target_branch_id = parse_i32(&row, "target_branch_id", -1); + // source_branch_id kann NULL sein (wenn kein Branch in Source-Region) + let source_branch_id_str = row.get("source_branch_id"); + let source_branch_id = source_branch_id_str + .and_then(|v| v.parse::().ok()) + .filter(|&x| x >= 0); // product_id kann NULL sein (für leere Transporte) let product_id_str = row.get("product_id"); let product_id = product_id_str @@ -182,25 +187,28 @@ impl TransportWorker { let size = parse_i32(&row, "size", 0); let vehicle_id = parse_i32(&row, "vehicle_id", -1); let distance = parse_f64(&row, "distance", 0.0); + let user_id = parse_i32(&row, "user_id", -1); // Für normale Transporte: product_id muss vorhanden sein und size > 0 // Für leere Transporte: product_id ist NULL und size = 0 let is_valid = if product_id.is_some() { // Normaler Transport mit Produkt - id >= 0 && branch_id >= 0 && vehicle_id >= 0 && size > 0 + id >= 0 && target_branch_id >= 0 && vehicle_id >= 0 && size > 0 && user_id >= 0 } else { // Leerer Transport (ohne Produkt) - id >= 0 && branch_id >= 0 && vehicle_id >= 0 && size == 0 + id >= 0 && target_branch_id >= 0 && vehicle_id >= 0 && size == 0 && user_id >= 0 }; if is_valid { result.push(ArrivedTransport { id, - branch_id, + source_branch_id, + target_branch_id, product_id, size, vehicle_id, distance, + user_id, }); } } @@ -213,9 +221,6 @@ impl TransportWorker { broker: &MessageBroker, t: &ArrivedTransport, ) -> Result<(), DbError> { - // Ermittle user_id für Notifications - let user_id = Self::get_branch_user_id(pool, t.branch_id).unwrap_or(-1); - // Leere Transporte (ohne Produkt) werden anders behandelt if t.product_id.is_none() { // Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen @@ -223,16 +228,26 @@ impl TransportWorker { "[TransportWorker] Leerer Transport {} angekommen: Fahrzeug {} zurückgeholt", t.id, t.vehicle_id ); - Self::update_vehicle_after_transport(pool, t.vehicle_id, t.branch_id, t.distance)?; + Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?; Self::delete_transport(pool, t.id)?; - // Notification für leeren Transport - if user_id > 0 { - let message = format!( + // Notifications für beide Branches + if t.user_id > 0 { + // Target-Branch: Transport angekommen + let target_message = format!( r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"empty":true}}"#, - t.branch_id, user_id + t.target_branch_id, t.user_id ); - broker.publish(message); + broker.publish(target_message); + + // Source-Branch: Transport entfernt (falls vorhanden) + if let Some(source_branch_id) = t.source_branch_id { + let source_message = format!( + r#"{{"event":"transport_removed","branch_id":{},"user_id":{}}}"#, + source_branch_id, t.user_id + ); + broker.publish(source_message); + } } return Ok(()); @@ -243,31 +258,41 @@ impl TransportWorker { // tatsächlich verbleibende Menge im Transportmittel zurück. let product_id = t.product_id.unwrap(); let remaining_quantity = - Self::add_to_inventory(pool, t.branch_id, product_id, t.size)?; + Self::add_to_inventory(pool, t.target_branch_id, product_id, t.size)?; let delivered = t.size.saturating_sub(remaining_quantity); if remaining_quantity <= 0 { // Alles konnte eingelagert werden: // 2) Fahrzeug-Region & Zustand aktualisieren - Self::update_vehicle_after_transport(pool, t.vehicle_id, t.branch_id, t.distance)?; + Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?; // 3) Transport-Eintrag löschen Self::delete_transport(pool, t.id)?; - // Notifications: Transport angekommen und Inventar aktualisiert - if user_id > 0 { - let transport_message = format!( + // Notifications für beide Branches + if t.user_id > 0 { + // Target-Branch: Transport angekommen und Inventar aktualisiert + let target_transport_message = format!( r#"{{"event":"transport_arrived","branch_id":{},"user_id":{},"product_id":{},"quantity":{}}}"#, - t.branch_id, user_id, product_id, delivered + t.target_branch_id, t.user_id, product_id, delivered ); - broker.publish(transport_message); + broker.publish(target_transport_message); - let inventory_message = format!( + let target_inventory_message = format!( r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#, - t.branch_id, user_id + t.target_branch_id, t.user_id ); - broker.publish(inventory_message); + broker.publish(target_inventory_message); + + // Source-Branch: Transport entfernt (falls vorhanden) + if let Some(source_branch_id) = t.source_branch_id { + let source_message = format!( + r#"{{"event":"transport_removed","branch_id":{},"user_id":{}}}"#, + source_branch_id, t.user_id + ); + broker.publish(source_message); + } } } else { // Es konnte nur ein Teil (oder nichts) eingelagert werden: @@ -278,21 +303,20 @@ impl TransportWorker { Self::update_transport_size(pool, t.id, remaining_quantity)?; // Notification: Teilweise eingelagert, Inventar aktualisiert - if user_id > 0 { - let inventory_message = format!( + if t.user_id > 0 { + let target_inventory_message = format!( r#"{{"event":"inventory_updated","branch_id":{},"user_id":{}}}"#, - t.branch_id, user_id + t.target_branch_id, t.user_id ); - broker.publish(inventory_message); + broker.publish(target_inventory_message); } } // Nutzer informieren, dass Ware noch im Transportmittel liegt. - // Wir ermitteln hierzu den Besitzer des Ziel-Branches. - if user_id > 0 { + if t.user_id > 0 { if let Err(err) = Self::insert_transport_waiting_notification( pool, - user_id, + t.user_id, product_id, remaining_quantity, ) {