use crate::db::{DbConnection, DbError, Row}; use std::collections::HashMap; use crate::message_broker::MessageBroker; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use crate::db::ConnectionPool; use super::base::{BaseWorker, Worker, WorkerState}; #[derive(Debug, Clone)] struct Director { id: i32, branch_id: i32, may_produce: bool, may_sell: bool, may_start_transport: bool, } #[derive(Debug, Clone)] struct ProductionPlan { falukant_user_id: i32, money: f64, certificate: i32, branch_id: i32, product_id: i32, region_id: i32, stock_size: i32, used_in_stock: i32, running_productions: i32, running_productions_quantity: i32, } #[derive(Debug, Clone)] struct InventoryItem { id: i32, product_id: i32, quantity: i32, quality: i32, sell_cost: f64, user_id: i32, region_id: i32, branch_id: i32, worth_percent: f64, // Regionaler worth_percent-Wert für die Preisberechnung } #[derive(Debug, Clone)] struct SalaryItem { id: i32, employer_user_id: i32, income: i32, } #[derive(Debug, Clone)] struct TransportVehicle { id: i32, capacity: i32, } pub struct DirectorWorker { base: BaseWorker, last_run: Option, } // Maximale Anzahl paralleler Produktionen pro Branch const MAX_PARALLEL_PRODUCTIONS: i32 = 2; // SQL-Queries (1:1 aus director_worker.h) const QUERY_GET_DIRECTORS: &str = r#" SELECT d.may_produce, d.may_sell, d.may_start_transport, b.id AS branch_id, fu.id AS falukantUserId, d.id FROM falukant_data.director d JOIN falukant_data.falukant_user fu ON fu.id = d.employer_user_id JOIN falukant_data.character c ON c.id = d.director_character_id JOIN falukant_data.branch b ON b.region_id = c.region_id AND b.falukant_user_id = fu.id WHERE current_time BETWEEN '08:00:00' AND '17:00:00'; "#; const QUERY_GET_BEST_PRODUCTION: &str = r#" SELECT fdu.id falukant_user_id, -- Geld explizit als Text casten, damit das Mapping im Rust-Code -- zuverlässig funktioniert (unabhängig vom nativen DB-Typ wie `money`). CAST(fdu.money AS text) AS money, fdu.certificate, ftp.id product_id, ftp.label_tr, fdb.region_id, ( SELECT SUM(quantity) FROM falukant_data.stock fds WHERE fds.branch_id = fdb.id ) AS stock_size, COALESCE(( SELECT SUM(COALESCE(fdi.quantity, 0)) FROM falukant_data.stock fds JOIN falukant_data.inventory fdi ON fdi.stock_id = fds.id WHERE fds.branch_id = fdb.id ), 0) AS used_in_stock, (ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category) / (300.0 * ftp.production_time) AS worth, fdb.id AS branch_id, ( SELECT COUNT(id) FROM falukant_data.production WHERE branch_id = fdb.id ) AS running_productions, COALESCE(( SELECT SUM(COALESCE(fdp.quantity, 0)) quantity FROM falukant_data.production fdp WHERE fdp.branch_id = fdb.id ), 0) AS running_productions_quantity FROM falukant_data.director fdd JOIN falukant_data.character fdc ON fdc.id = fdd.director_character_id JOIN falukant_data.falukant_user fdu ON fdd.employer_user_id = fdu.id JOIN falukant_data.character user_character ON user_character.user_id = fdu.id JOIN falukant_data.branch fdb ON fdb.falukant_user_id = fdu.id AND fdb.region_id = fdc.region_id JOIN falukant_data.town_product_worth fdtpw ON fdtpw.region_id = fdb.region_id JOIN falukant_data.knowledge fdk_character ON fdk_character.product_id = fdtpw.product_id AND fdk_character.character_id = user_character.id JOIN falukant_data.knowledge fdk_director ON fdk_director.product_id = fdtpw.product_id AND fdk_director.character_id = fdd.director_character_id JOIN falukant_type.product ftp ON ftp.id = fdtpw.product_id AND ftp.category <= fdu.certificate WHERE fdd.id = $1 AND fdb.id = $2 ORDER BY worth DESC LIMIT 1; "#; const QUERY_INSERT_PRODUCTION: &str = r#" INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id) VALUES ($1, $2, $3, ( SELECT weather_type_id FROM falukant_data.weather WHERE region_id = $4 )); "#; // Query zum Abfragen der aktuellen Lager- und Produktionswerte für einen Branch // (ohne den kompletten Produktionsplan neu zu berechnen) const QUERY_GET_BRANCH_CAPACITY: &str = r#" SELECT ( SELECT SUM(quantity) FROM falukant_data.stock fds WHERE fds.branch_id = $1 ) AS stock_size, COALESCE(( SELECT SUM(COALESCE(fdi.quantity, 0)) FROM falukant_data.stock fds JOIN falukant_data.inventory fdi ON fdi.stock_id = fds.id WHERE fds.branch_id = $1 ), 0) AS used_in_stock, ( SELECT COUNT(id) FROM falukant_data.production WHERE branch_id = $1 ) AS running_productions, COALESCE(( SELECT SUM(COALESCE(fdp.quantity, 0)) quantity FROM falukant_data.production fdp WHERE fdp.branch_id = $1 ), 0) AS running_productions_quantity; "#; const QUERY_GET_INVENTORY: &str = r#" SELECT i.id, i.product_id, i.quantity, i.quality, p.sell_cost, fu.id AS user_id, b.region_id, b.id AS branch_id, COALESCE(tpw.worth_percent, 100.0) AS worth_percent FROM falukant_data.inventory i JOIN falukant_data.stock s ON s.id = i.stock_id JOIN falukant_data.branch b ON b.id = s.branch_id JOIN falukant_data.falukant_user fu ON fu.id = b.falukant_user_id JOIN falukant_data.director d ON d.employer_user_id = fu.id JOIN falukant_type.product p ON p.id = i.product_id LEFT JOIN falukant_data.town_product_worth tpw ON tpw.region_id = b.region_id AND tpw.product_id = i.product_id WHERE d.id = $1 AND b.id = $2; "#; const QUERY_REMOVE_INVENTORY: &str = r#" DELETE FROM falukant_data.inventory WHERE id = $1; "#; const QUERY_ADD_SELL_LOG: &str = r#" INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id) VALUES ($1, $2, $3, $4) ON CONFLICT (region_id, product_id, seller_id) DO UPDATE SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity; "#; // Regionale Verkaufswürdigkeit pro Produkt/Region für alle Branches eines Users const QUERY_GET_REGION_WORTH_FOR_PRODUCT: &str = r#" SELECT tpw.region_id, tpw.product_id, tpw.worth_percent FROM falukant_data.town_product_worth tpw JOIN falukant_data.branch b ON b.region_id = tpw.region_id WHERE b.falukant_user_id = $1 AND tpw.product_id = $2; "#; // Verfügbare Transportmittel für eine Route (source_region -> target_region) const QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE: &str = r#" SELECT v.id AS vehicle_id, vt.capacity AS capacity FROM falukant_data.vehicle v JOIN falukant_type.vehicle vt ON vt.id = v.vehicle_type_id JOIN falukant_data.region_distance rd ON ( (rd.source_region_id = v.region_id AND rd.target_region_id = $3) OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id) ) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL) WHERE v.falukant_user_id = $1 AND v.region_id = $2; "#; // Transport-Eintrag anlegen const QUERY_INSERT_TRANSPORT: &str = r#" INSERT INTO falukant_data.transport (source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, NOW(), NOW()); "#; // Leere Transporte (product_id = NULL, size = 0) zum Zurückholen von Fahrzeugen const QUERY_INSERT_EMPTY_TRANSPORT: &str = r#" INSERT INTO falukant_data.transport (source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at) VALUES ($1, $2, NULL, 0, $3, NOW(), NOW()); "#; // Alle Branches des Users mit ihren Regionen const QUERY_GET_USER_BRANCHES: &str = r#" SELECT DISTINCT b.region_id, b.id AS branch_id FROM falukant_data.branch b WHERE b.falukant_user_id = $1 AND b.region_id != $2; "#; // Freie Transportmittel in einer Region (nicht in aktiven Transporten) // Ein Transport ist aktiv, wenn er noch in der Tabelle existiert const QUERY_GET_FREE_VEHICLES_IN_REGION: &str = r#" SELECT v.id AS vehicle_id, vt.capacity AS capacity FROM falukant_data.vehicle v JOIN falukant_type.vehicle vt ON vt.id = v.vehicle_type_id WHERE v.falukant_user_id = $1 AND v.region_id = $2 AND v.id NOT IN ( SELECT DISTINCT t.vehicle_id FROM falukant_data.transport t WHERE t.vehicle_id IS NOT NULL ); "#; const QUERY_GET_SALARY_TO_PAY: &str = r#" SELECT d.id, d.employer_user_id, d.income FROM falukant_data.director d WHERE DATE(d.last_salary_payout) < DATE(NOW()); "#; const QUERY_SET_SALARY_PAYED: &str = r#" UPDATE falukant_data.director SET last_salary_payout = NOW() WHERE id = $1; "#; const QUERY_UPDATE_SATISFACTION: &str = r#" WITH new_sats AS ( SELECT d.id, ROUND( d.income::numeric / ( c.title_of_nobility * POWER(1.231, AVG(k.knowledge) / 1.5) ) * 100 ) AS new_satisfaction FROM falukant_data.director d JOIN falukant_data.knowledge k ON d.director_character_id = k.character_id JOIN falukant_data.character c ON c.id = d.director_character_id GROUP BY d.id, c.title_of_nobility, d.income ) UPDATE falukant_data.director dir SET satisfaction = ns.new_satisfaction FROM new_sats ns WHERE dir.id = ns.id AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction RETURNING dir.employer_user_id; "#; impl DirectorWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("DirectorWorker", pool, broker), last_run: None, } } fn run_iteration(&mut self, state: &WorkerState) { self.base.set_current_step("DirectorWorker iteration"); let now = Instant::now(); let should_run = match self.last_run { None => true, Some(last) => now.saturating_duration_since(last) >= Duration::from_secs(60), }; if should_run { if let Err(err) = self.perform_all_tasks() { eprintln!("[DirectorWorker] Fehler beim Ausführen der Aufgabe: {err}"); } self.last_run = Some(now); } std::thread::sleep(Duration::from_secs(1)); if !state.running_worker.load(Ordering::Relaxed) { return; } } fn perform_all_tasks(&mut self) -> Result<(), DbError> { // Produktions-/Verkaufs-/Transportlogik für alle Direktoren self.perform_task()?; self.pay_salary()?; self.calculate_satisfaction()?; Ok(()) } fn perform_task(&mut self) -> Result<(), DbError> { self.base .set_current_step("Get director actions from DB"); let mut conn = self .base .pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_directors", QUERY_GET_DIRECTORS)?; let directors_rows = conn.execute("get_directors", &[])?; let directors: Vec = directors_rows .into_iter() .filter_map(Self::map_row_to_director) .collect(); if directors.is_empty() { eprintln!("[DirectorWorker] Keine Direktoren für Aktionen gefunden (Zeitfenster oder DB-Daten)."); } for director in directors { if director.may_produce { eprintln!( "[DirectorWorker] Starte Produktionsprüfung für Director {} (branch_id={})", director.id, director.branch_id ); self.start_productions(&director)?; } if director.may_start_transport { eprintln!( "[DirectorWorker] Starte Transportprüfung für Director {} (branch_id={})", director.id, director.branch_id ); if let Err(err) = self.start_transports_stub(&director) { eprintln!( "[DirectorWorker] Fehler bei start_transports für Director {}: {err}", director.id ); } } if director.may_sell { eprintln!( "[DirectorWorker] Starte Verkaufsprüfung für Director {} (branch_id={})", director.id, director.branch_id ); self.start_sellings(&director)?; } } Ok(()) } fn map_row_to_director(row: Row) -> Option { Some(Director { id: row.get("id")?.parse().ok()?, branch_id: row.get("branch_id")?.parse().ok()?, may_produce: row.get("may_produce").map(|v| v == "t" || v == "true").unwrap_or(false), may_sell: row.get("may_sell").map(|v| v == "t" || v == "true").unwrap_or(false), may_start_transport: row .get("may_start_transport") .map(|v| v == "t" || v == "true") .unwrap_or(false), }) } fn start_productions(&mut self, director: &Director) -> Result<(), DbError> { self.base .set_current_step("DirectorWorker: start_productions"); let mut conn = self .base .pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; // Initial: Bestes Produkt für diesen Branch ermitteln conn.prepare("get_to_produce", QUERY_GET_BEST_PRODUCTION)?; let rows = conn.execute("get_to_produce", &[&director.id, &director.branch_id])?; if rows.is_empty() { eprintln!( "[DirectorWorker] Keine Produktionskandidaten für Director {} gefunden.", director.id ); return Ok(()); } let mut base_plan = match Self::map_row_to_production_plan(&rows[0]) { Some(p) => p, None => { eprintln!( "[DirectorWorker] Produktionsplan für Director {} konnte nicht gemappt werden.", director.id ); return Ok(()); } }; eprintln!( "[DirectorWorker] Produktionsplan: director_user_id={}, branch_id={}, product_id={}, money={}, certificate={}", base_plan.falukant_user_id, base_plan.branch_id, base_plan.product_id, base_plan.money, base_plan.certificate ); // Query zum Abfragen der aktuellen Kapazitätswerte vorbereiten conn.prepare("get_branch_capacity", QUERY_GET_BRANCH_CAPACITY)?; // Schleife: Starte Produktionen, bis entweder die maximale Anzahl erreicht ist // oder kein freier Lagerplatz mehr vorhanden ist loop { // Aktuelle Kapazitätswerte abfragen let capacity_rows = conn.execute("get_branch_capacity", &[&director.branch_id])?; if capacity_rows.is_empty() { break; } let row = &capacity_rows[0]; let stock_size: i32 = row .get("stock_size") .and_then(|v| v.parse::().ok()) .unwrap_or(0); let used_in_stock: i32 = row .get("used_in_stock") .and_then(|v| v.parse::().ok()) .unwrap_or(0); let running_productions: i32 = row .get("running_productions") .and_then(|v| v.parse::().ok()) .unwrap_or(0); let running_productions_quantity: i32 = row .get("running_productions_quantity") .and_then(|v| v.parse::().ok()) .unwrap_or(0); // Prüfen, ob noch Produktionen gestartet werden können if running_productions >= MAX_PARALLEL_PRODUCTIONS { eprintln!( "[DirectorWorker] Maximale Anzahl an Produktionen ({}) erreicht für Branch {}.", MAX_PARALLEL_PRODUCTIONS, director.branch_id ); break; } // Freie Kapazität berechnen let free_capacity = stock_size - used_in_stock - running_productions_quantity; if free_capacity <= 0 { eprintln!( "[DirectorWorker] Kein freier Lagerplatz mehr für Branch {} (stock_size={}, used={}, running_qty={}).", director.branch_id, stock_size, used_in_stock, running_productions_quantity ); break; } // Plan mit aktuellen Werten aktualisieren base_plan.stock_size = stock_size; base_plan.used_in_stock = used_in_stock; base_plan.running_productions = running_productions; base_plan.running_productions_quantity = running_productions_quantity; // Eine neue Produktion starten (max. 100 Stück) if let Err(err) = self.create_single_production(&mut conn, &base_plan) { eprintln!( "[DirectorWorker] Fehler beim Starten einer Produktion: {err}" ); break; } } Ok(()) } fn map_row_to_production_plan(row: &Row) -> Option { // Pflichtfelder: ohne diese können wir keinen sinnvollen Plan erstellen. let falukant_user_id: i32 = row.get("falukant_user_id")?.parse().ok()?; let certificate: i32 = row.get("certificate")?.parse().ok()?; let branch_id: i32 = row.get("branch_id")?.parse().ok()?; let product_id: i32 = row.get("product_id")?.parse().ok()?; // Optionale/abgeleitete Felder: hier sind wir tolerant und verwenden // Default-Werte, falls NULL oder nicht parsbar. let money: f64 = row .get("money") .and_then(|v| v.parse::().ok()) .unwrap_or(0.0); let stock_size: i32 = row .get("stock_size") .and_then(|v| v.parse::().ok()) .unwrap_or(0); let used_in_stock: i32 = row .get("used_in_stock") .and_then(|v| v.parse::().ok()) .unwrap_or(0); let running_productions: i32 = row .get("running_productions") .and_then(|v| v.parse::().ok()) .unwrap_or(0); let running_productions_quantity: i32 = row .get("running_productions_quantity") .and_then(|v| v.parse::().ok()) .unwrap_or(0); let region_id: i32 = row .get("region_id") .and_then(|v| v.parse::().ok()) .unwrap_or(0); Some(ProductionPlan { falukant_user_id, money, certificate, branch_id, product_id, region_id, stock_size, used_in_stock, running_productions, running_productions_quantity, }) } /// Startet eine einzelne Produktion (max. 100 Stück) basierend auf dem aktuellen Plan. /// Die äußere Schleife in `start_productions` sorgt dafür, dass mehrere Produktionen /// gestartet werden können, bis entweder die maximale Anzahl erreicht ist oder /// kein freier Lagerplatz mehr vorhanden ist. fn create_single_production( &mut self, conn: &mut DbConnection, plan: &ProductionPlan, ) -> Result<(), DbError> { // Freie Lagerkapazität: Gesamtbestand minus bereits belegter Bestand // (Inventar) minus bereits eingeplante Produktionsmengen. let free_capacity = plan.stock_size - plan.used_in_stock - plan.running_productions_quantity; // Stückkosten monetär berechnen. Da money ein f64 ist, arbeiten wir hier ebenfalls // mit Gleitkomma und runden erst am Ende auf eine ganze Stückzahl ab. let one_piece_cost = (plan.certificate * 6) as f64; let mut max_money_production: i32 = 0; if one_piece_cost > 0.0 { if plan.money > 0.0 { // Anzahl Stück, die sich mit dem verfügbaren Geld finanzieren lassen max_money_production = (plan.money / one_piece_cost).floor() as i32; } else { // Falls das Geld aus der DB unerwartet als 0 eingelesen wurde, aber // eigentlich ausreichend Guthaben vorhanden ist (bekannter Migrationsfall), // lassen wir die Geldbegrenzung vorläufig fallen und begrenzen nur über // Lagerkapazität und Hard-Limit 100. eprintln!( "[DirectorWorker] Warnung: money=0 für falukant_user_id={}, \ verwende nur Lagerkapazität als Limit.", plan.falukant_user_id ); max_money_production = i32::MAX; } } // Maximale Produktionsmenge begrenzen: // - nie mehr als der freie Lagerplatz (`free_capacity`) // - nie mehr als durch das verfügbare Geld finanzierbar // - absolut maximal 100 Einheiten pro Produktion let to_produce = free_capacity .min(max_money_production) .min(100) .max(0); eprintln!( "[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}", free_capacity, one_piece_cost, max_money_production, to_produce, plan.running_productions ); if to_produce < 1 { eprintln!( "[DirectorWorker] Keine Produktion gestartet: free_capacity={}, max_money_production={}, running_productions={}, running_qty={}", free_capacity, max_money_production, plan.running_productions, plan.running_productions_quantity ); return Ok(()); } let production_cost = to_produce as f64 * one_piece_cost; if let Err(err) = self.base.change_falukant_user_money( plan.falukant_user_id, -production_cost, "director starts production", ) { eprintln!( "[DirectorWorker] Fehler bei change_falukant_user_money: {err}" ); } conn.prepare("insert_production", QUERY_INSERT_PRODUCTION)?; // Eine einzelne Produktion mit max. 100 Stück anlegen // Das aktuelle Wetter der Region wird automatisch aus der weather-Tabelle geholt conn.execute( "insert_production", &[&plan.branch_id, &plan.product_id, &to_produce, &plan.region_id], )?; eprintln!( "[DirectorWorker] Produktion angelegt: branch_id={}, product_id={}, quantity={}", plan.branch_id, plan.product_id, to_produce ); let message = format!( r#"{{"event":"production_started","branch_id":{}}}"#, plan.branch_id ); self.base.broker.publish(message); Ok(()) } fn start_transports_stub(&mut self, director: &Director) -> Result<(), DbError> { self.base .set_current_step("DirectorWorker: start_transports"); let mut conn = self .base .pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_to_transport", QUERY_GET_INVENTORY)?; let rows = conn.execute("get_to_transport", &[&director.id, &director.branch_id])?; let mut items: Vec = rows.into_iter().filter_map(Self::map_row_to_inventory_item).collect(); eprintln!( "[DirectorWorker] Transportprüfung für Director {} (branch_id={}): {} Inventar-Items gefunden", director.id, director.branch_id, items.len() ); // Für alle Items dieses Directors sollten die user_id-Felder identisch // sein (Arbeitgeber des Directors). let falukant_user_id = if items.is_empty() { // Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln const QUERY_GET_DIRECTOR_USER: &str = r#" SELECT employer_user_id FROM falukant_data.director WHERE id = $1; "#; conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?; let user_rows = conn.execute("get_director_user", &[&director.id])?; user_rows .into_iter() .next() .and_then(|row| row.get("employer_user_id").and_then(|v| v.parse::().ok())) .ok_or_else(|| DbError::new("Konnte employer_user_id nicht ermitteln"))? } else { items[0].user_id }; // Prüfe, ob Transportmittel im aktuellen Branch vorhanden sind // Ein Transport ist aktiv, wenn er noch in der Tabelle existiert const QUERY_COUNT_VEHICLES_IN_BRANCH_REGION: &str = r#" SELECT COUNT(*) AS count FROM falukant_data.vehicle v JOIN falukant_data.branch b ON b.region_id = v.region_id WHERE v.falukant_user_id = $1 AND b.id = $2 AND v.id NOT IN ( SELECT DISTINCT t.vehicle_id FROM falukant_data.transport t WHERE t.vehicle_id IS NOT NULL ); "#; conn.prepare("count_vehicles_in_branch", QUERY_COUNT_VEHICLES_IN_BRANCH_REGION)?; let vehicle_count_rows = conn.execute( "count_vehicles_in_branch", &[&falukant_user_id, &director.branch_id], )?; let vehicles_in_branch = vehicle_count_rows .into_iter() .next() .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0); // Falls es nichts zu transportieren gibt, prüfe auf leere Transporte if items.is_empty() { eprintln!( "[DirectorWorker] Keine Inventar-Items für Transporte gefunden für Director {} (branch_id={})", director.id, director.branch_id ); // Wenn keine Transportmittel im Branch vorhanden sind, versuche leere Transporte zu planen if vehicles_in_branch == 0 { eprintln!( "[DirectorWorker] Keine Transportmittel im Branch {} vorhanden, prüfe auf leere Transporte zum Zurückholen", director.branch_id ); if let Err(err) = self.plan_empty_transports_for_vehicle_retrieval( &mut conn, falukant_user_id, director.branch_id, ) { eprintln!( "[DirectorWorker] Fehler beim Planen leerer Transporte: {err}" ); } } return Ok(()); } // Wenn keine Transportmittel im Branch vorhanden sind, aber Items vorhanden sind, // versuche leere Transporte zu planen, um Fahrzeuge zurückzuholen if vehicles_in_branch == 0 && !items.is_empty() { eprintln!( "[DirectorWorker] Keine Transportmittel im Branch {} vorhanden, aber {} Items vorhanden. Prüfe auf leere Transporte zum Zurückholen", director.branch_id, items.len() ); if let Err(err) = self.plan_empty_transports_for_vehicle_retrieval( &mut conn, falukant_user_id, director.branch_id, ) { eprintln!( "[DirectorWorker] Fehler beim Planen leerer Transporte: {err}" ); } // Nach dem Planen leerer Transporte erneut prüfen, ob jetzt Transportmittel vorhanden sind let vehicle_count_rows_after = conn.execute( "count_vehicles_in_branch", &[&falukant_user_id, &director.branch_id], )?; let vehicles_in_branch_after = vehicle_count_rows_after .into_iter() .next() .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0); if vehicles_in_branch_after == 0 { eprintln!( "[DirectorWorker] Nach Planen leerer Transporte immer noch keine Transportmittel im Branch {} vorhanden. Überspringe normale Transportplanung.", director.branch_id ); return Ok(()); } } // Lohnende Transporte planen. Dabei werden: // - ggf. Transport-Einträge erzeugt // - Inventar-Mengen reduziert for item in items.iter_mut() { eprintln!( "[DirectorWorker] Prüfe Transport für Item: product_id={}, quantity={}, quality={}, region_id={}, branch_id={}", item.product_id, item.quantity, item.quality, item.region_id, item.branch_id ); let shipped = self.plan_transports_for_item( &mut conn, falukant_user_id, item, )?; // Inventar wird bereits in plan_transports_for_item reduziert if shipped > 0 { eprintln!( "[DirectorWorker] Transport geplant: {} Einheiten von Produkt {} transportiert (Inventar bereits reduziert)", shipped, item.product_id ); } else { eprintln!( "[DirectorWorker] Kein lohnender Transport gefunden für Produkt {} (region_id={})", item.product_id, item.region_id ); } } // Nach normalen Transporten: Wenn keine Transportmittel mehr im Branch vorhanden sind, // aber bessere Verkaufspreise in anderen Branches existieren, plane leere Transporte let vehicle_count_rows_final = conn.execute( "count_vehicles_in_branch", &[&falukant_user_id, &director.branch_id], )?; let vehicles_in_branch_final = vehicle_count_rows_final .into_iter() .next() .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0); if vehicles_in_branch_final == 0 { eprintln!( "[DirectorWorker] Nach Transporten keine Transportmittel mehr im Branch {} vorhanden, prüfe auf leere Transporte zum Zurückholen", director.branch_id ); if let Err(err) = self.plan_empty_transports_for_vehicle_retrieval( &mut conn, falukant_user_id, director.branch_id, ) { eprintln!( "[DirectorWorker] Fehler beim Planen leerer Transporte: {err}" ); } } Ok(()) } fn start_sellings(&mut self, director: &Director) -> Result<(), DbError> { self.base .set_current_step("DirectorWorker: start_sellings"); let mut conn = self .base .pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_to_sell", QUERY_GET_INVENTORY)?; let rows = conn.execute("get_to_sell", &[&director.id, &director.branch_id])?; let mut items: Vec = rows.into_iter().filter_map(Self::map_row_to_inventory_item).collect(); conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?; conn.prepare("add_sell_log", QUERY_ADD_SELL_LOG)?; // Falls es nichts zu verkaufen gibt, können wir sofort zurückkehren. if items.is_empty() { return Ok(()); } // Für alle Items dieses Directors sollten die user_id-Felder identisch // sein (Arbeitgeber des Directors). let falukant_user_id = items[0].user_id; // Vor dem eigentlichen Verkauf versucht der Director, lohnende // Transporte zu planen. Dabei werden: // - ggf. Transport-Einträge erzeugt // - Inventar-Mengen reduziert (geschieht bereits in plan_transports_for_item) // Die zurückgegebenen Mengen werden dann lokal verkauft. for item in items.iter_mut() { let _shipped = self.plan_transports_for_item( &mut conn, falukant_user_id, item, )?; // Inventar wird bereits in plan_transports_for_item reduziert // item.quantity wurde dort bereits aktualisiert } // Anschließend lokale Verkäufe für die verbleibenden Mengen durchführen. for item in items.drain(..) { if item.quantity > 0 { self.sell_single_inventory_item(&mut conn, &item)?; } else { // Falls die Menge auf 0 gesetzt wurde, das Inventar ggf. aufräumen. conn.execute("remove_inventory", &[&item.id])?; } } Ok(()) } fn map_row_to_inventory_item(row: Row) -> Option { Some(InventoryItem { id: row.get("id")?.parse().ok()?, product_id: row.get("product_id")?.parse().ok()?, quantity: row.get("quantity")?.parse().ok()?, quality: row.get("quality")?.parse().ok()?, sell_cost: row.get("sell_cost")?.parse().ok()?, user_id: row.get("user_id")?.parse().ok()?, region_id: row.get("region_id")?.parse().ok()?, branch_id: row.get("branch_id")?.parse().ok()?, worth_percent: row .get("worth_percent") .and_then(|v| v.parse::().ok()) .unwrap_or(100.0), }) } fn sell_single_inventory_item( &mut self, conn: &mut DbConnection, item: &InventoryItem, ) -> Result<(), DbError> { if item.quantity <= 0 { conn.execute("remove_inventory", &[&item.id])?; return Ok(()); } // Neue Preisberechnung gemäß Spezifikation: // 1. Basispreis = product.sellCost * (worthPercent / 100) let base_price = item.sell_cost * (item.worth_percent / 100.0); // 2. min = basePrice * 0.6, max = basePrice let min_price = base_price * 0.6; let max_price = base_price; // 3. price = min + (max - min) * (knowledgeFactor / 100) // knowledgeFactor ist hier item.quality let knowledge_factor = item.quality as f64; let piece_sell_price = min_price + (max_price - min_price) * (knowledge_factor / 100.0); let sell_price = piece_sell_price * item.quantity as f64; if let Err(err) = self.base.change_falukant_user_money( item.user_id, sell_price, "sell products", ) { eprintln!( "[DirectorWorker] Fehler bei change_falukant_user_money (sell products): {err}" ); } conn.execute( "add_sell_log", &[ &item.region_id, &item.product_id, &item.quantity, &item.user_id, ], )?; conn.execute("remove_inventory", &[&item.id])?; let message = format!( r#"{{"event":"selled_items","branch_id":{}}}"#, item.branch_id ); self.base.broker.publish(message); Ok(()) } /// Plant ggf. Transporte für ein einzelnes Inventar-Item und gibt die /// Menge zurück, die tatsächlich in Transporte umgewandelt wurde. /// /// Logik: /// - Ermittle regionale "worth_percent"-Werte für das Produkt in allen /// Branch-Regionen des Users. /// - Berechne lokalen Stückpreis (inkl. Qualität) und für jede andere /// Region einen potentiellen Stückpreis. /// - Prüfe für jede Zielregion: /// * Gibt es verfügbare Transportmittel für diese Route? /// * Ist der Mehrerlös (deltaPrice * Menge) größer als die /// Transportkosten (max(1, totalValue * 0.01))? /// - Wähle die Zielregion mit dem größten positiven Nettogewinn und /// erzeuge entsprechende Transporte (begrenzt durch Fahrzeugkapazität). fn plan_transports_for_item( &mut self, conn: &mut DbConnection, falukant_user_id: i32, item: &mut InventoryItem, ) -> Result { // Sicherheitscheck if item.quantity <= 0 { return Ok(0); } // Regionale worth_percent-Werte für dieses Produkt laden conn.prepare( "get_region_worth_for_product", QUERY_GET_REGION_WORTH_FOR_PRODUCT, )?; let rows = conn.execute( "get_region_worth_for_product", &[&falukant_user_id, &item.product_id], )?; if rows.is_empty() { return Ok(0); } let mut worth_by_region: HashMap = HashMap::new(); for row in rows { let region_id = row .get("region_id") .and_then(|v| v.parse::().ok()) .unwrap_or(-1); let percent = row .get("worth_percent") .and_then(|v| v.parse::().ok()) .unwrap_or(100.0); if region_id >= 0 { worth_by_region.insert(region_id, percent); } } if worth_by_region.is_empty() { eprintln!( "[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden", item.product_id ); return Ok(0); } eprintln!( "[DirectorWorker] Gefundene Regionen für Produkt {}: {} Regionen", item.product_id, worth_by_region.len() ); // Lokalen Stückpreis berechnen (neue Preisberechnung) let local_percent = worth_by_region .get(&item.region_id) .copied() .unwrap_or(100.0); // 1. Basispreis = product.sellCost * (worthPercent / 100) let local_base_price = item.sell_cost * (local_percent / 100.0); // 2. min = basePrice * 0.6, max = basePrice let local_min_price = local_base_price * 0.6; let local_max_price = local_base_price; // 3. price = min + (max - min) * (knowledgeFactor / 100) let knowledge_factor = item.quality as f64; let local_piece_price = local_min_price + (local_max_price - local_min_price) * (knowledge_factor / 100.0); eprintln!( "[DirectorWorker] Lokaler Preis für Produkt {}: {:.2} (worth_percent={:.2}, quality={})", item.product_id, local_piece_price, local_percent, item.quality ); let mut best_target_region: Option = None; let mut best_quantity: i32 = 0; let mut best_remote_piece_price: f64 = 0.0; let mut best_gain: f64 = 0.0; // Für jede andere Region prüfen, ob sich ein Transport lohnt. for (®ion_id, &remote_percent) in &worth_by_region { if region_id == item.region_id { continue; } // Remote-Stückpreis berechnen (neue Preisberechnung) // 1. Basispreis = product.sellCost * (worthPercent / 100) let remote_base_price = item.sell_cost * (remote_percent / 100.0); // 2. min = basePrice * 0.6, max = basePrice let remote_min_price = remote_base_price * 0.6; let remote_max_price = remote_base_price; // 3. price = min + (max - min) * (knowledgeFactor / 100) let knowledge_factor = item.quality as f64; let remote_piece_price = remote_min_price + (remote_max_price - remote_min_price) * (knowledge_factor / 100.0); let delta_per_unit = remote_piece_price - local_piece_price; eprintln!( "[DirectorWorker] Region {}: Preis {:.2}, Delta {:.2}", region_id, remote_piece_price, delta_per_unit ); if delta_per_unit <= 0.0 { eprintln!( "[DirectorWorker] Region {}: Kein Preisvorteil (Delta <= 0)", region_id ); continue; } // Verfügbare Transportmittel für diese Route abfragen let vehicles = Self::get_transport_vehicles_for_route( conn, falukant_user_id, item.region_id, region_id, )?; eprintln!( "[DirectorWorker] Region {}: {} verfügbare Transportmittel", region_id, vehicles.len() ); if vehicles.is_empty() { eprintln!( "[DirectorWorker] Region {}: Keine verfügbaren Transportmittel", region_id ); continue; } // Maximale transportierbare Menge anhand der Kapazität ermitteln let mut max_capacity: i32 = 0; for v in &vehicles { max_capacity = max_capacity.saturating_add(v.capacity); } if max_capacity <= 0 { continue; } let qty = std::cmp::min(item.quantity, max_capacity); if qty <= 0 { continue; } let extra_revenue = delta_per_unit * qty as f64; let total_value = remote_piece_price * qty as f64; let transport_cost = total_value * 0.01_f64; // Kostenformel: max(0.01, totalValue * 0.01) let transport_cost = if transport_cost < 0.01 { 0.01 } else { transport_cost }; let net_gain = extra_revenue - transport_cost; eprintln!( "[DirectorWorker] Region {}: extra_revenue={:.2}, transport_cost={:.2}, net_gain={:.2}, qty={}", region_id, extra_revenue, transport_cost, net_gain, qty ); if net_gain <= 0.0 { eprintln!( "[DirectorWorker] Region {}: Netto-Gewinn <= 0, überspringe", region_id ); continue; } if net_gain > best_gain { eprintln!( "[DirectorWorker] Region {}: Neuer bester Transport (Gewinn {:.2})", region_id, net_gain ); best_gain = net_gain; best_target_region = Some(region_id); best_quantity = qty; best_remote_piece_price = remote_piece_price; } } // Kein lohnender Transport gefunden let target_region = match best_target_region { Some(r) => r, None => return Ok(0), }; if best_quantity <= 0 { return Ok(0); } // Nochmals verfügbare Transportmittel für die gewählte Route laden let vehicles = Self::get_transport_vehicles_for_route( conn, falukant_user_id, item.region_id, target_region, )?; if vehicles.is_empty() { return Ok(0); } // Transporte anlegen, begrenzt durch best_quantity und Kapazitäten conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?; let mut remaining = best_quantity; for v in &vehicles { if remaining <= 0 { break; } let size = std::cmp::min(remaining, v.capacity); if size <= 0 { continue; } conn.execute( "insert_transport", &[ &item.region_id, &target_region, &item.product_id, &size, &v.id, ], )?; remaining -= size; } let shipped = best_quantity - remaining.max(0); // Inventar sofort reduzieren, nachdem Transporte erfolgreich angelegt wurden // Dies stellt sicher, dass Inventar und Transporte immer konsistent sind if shipped > 0 { if shipped >= item.quantity { // Alles wurde in Transporte umgewandelt, Inventar komplett entfernen conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?; conn.execute("remove_inventory", &[&item.id])?; item.quantity = 0; } else { // Inventar-Menge in der DB reduzieren und im Item anpassen let remaining_quantity = item.quantity - shipped; Self::update_inventory_quantity(conn, item.id, remaining_quantity)?; item.quantity = remaining_quantity; } eprintln!( "[DirectorWorker] Transport geplant: {} Einheiten von Produkt {} von Region {} nach Region {} (Stückpreis lokal {:.2}, remote {:.2}). Inventar reduziert.", shipped, item.product_id, item.region_id, target_region, local_piece_price, best_remote_piece_price ); } Ok(shipped) } fn get_transport_vehicles_for_route( conn: &mut DbConnection, falukant_user_id: i32, source_region: i32, target_region: i32, ) -> Result, DbError> { // Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren const QUERY_COUNT_VEHICLES_IN_REGION: &str = r#" SELECT COUNT(*) AS count FROM falukant_data.vehicle v WHERE v.falukant_user_id = $1 AND v.region_id = $2; "#; conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?; let vehicle_count_rows = conn.execute( "count_vehicles_in_region", &[&falukant_user_id, &source_region], )?; let vehicle_count = vehicle_count_rows .into_iter() .next() .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0); eprintln!( "[DirectorWorker] Fahrzeuge in Region {} für User {}: {}", source_region, falukant_user_id, vehicle_count ); // Debug: Prüfe, ob eine Route existiert const QUERY_CHECK_ROUTE: &str = r#" SELECT COUNT(*) AS count FROM falukant_data.region_distance rd WHERE (rd.source_region_id = $1 AND rd.target_region_id = $2) OR (rd.source_region_id = $2 AND rd.target_region_id = $1); "#; conn.prepare("check_route", QUERY_CHECK_ROUTE)?; let route_rows = conn.execute( "check_route", &[&source_region, &target_region], )?; let route_exists = route_rows .into_iter() .next() .and_then(|row| row.get("count").and_then(|v| v.parse::().ok())) .unwrap_or(0) > 0; eprintln!( "[DirectorWorker] Route von Region {} nach Region {} existiert: {}", source_region, target_region, route_exists ); conn.prepare( "get_transport_vehicles_for_route", QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE, )?; let rows = conn.execute( "get_transport_vehicles_for_route", &[&falukant_user_id, &source_region, &target_region], )?; let mut result = Vec::with_capacity(rows.len()); for row in rows { let id = row .get("vehicle_id") .and_then(|v| v.parse::().ok()) .unwrap_or(-1); let capacity = row .get("capacity") .and_then(|v| v.parse::().ok()) .unwrap_or(0); if id >= 0 && capacity > 0 { result.push(TransportVehicle { id, capacity }); } } eprintln!( "[DirectorWorker] Gefundene Transportmittel für Route {} -> {}: {}", source_region, target_region, result.len() ); Ok(result) } /// Plant leere Transporte, um Fahrzeuge zurückzuholen, wenn: /// - Keine Transportmittel im aktuellen Branch vorhanden sind /// - Aber bessere Verkaufspreise in anderen Branches existieren /// - Freie Transportmittel in anderen Regionen verfügbar sind fn plan_empty_transports_for_vehicle_retrieval( &mut self, conn: &mut DbConnection, falukant_user_id: i32, current_branch_id: i32, ) -> Result<(), DbError> { // Aktuelle Branch-Region ermitteln const QUERY_GET_BRANCH_REGION: &str = r#" SELECT region_id FROM falukant_data.branch WHERE id = $1; "#; conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?; let branch_rows = conn.execute("get_branch_region", &[¤t_branch_id])?; let current_region_id = match branch_rows.into_iter().next() { Some(row) => row .get("region_id") .and_then(|v| v.parse::().ok()) .ok_or_else(|| DbError::new("Konnte region_id nicht ermitteln"))?, None => return Ok(()), // Branch nicht gefunden, nichts zu tun }; // Alle anderen Branches des Users finden conn.prepare("get_user_branches", QUERY_GET_USER_BRANCHES)?; let branch_rows = conn.execute( "get_user_branches", &[&falukant_user_id, ¤t_region_id], )?; if branch_rows.is_empty() { eprintln!( "[DirectorWorker] Keine anderen Branches für User {} gefunden", falukant_user_id ); return Ok(()); } // Für jeden anderen Branch prüfen, ob freie Transportmittel verfügbar sind // und ob bessere Verkaufspreise existieren (zur Priorisierung) conn.prepare("get_free_vehicles_in_region", QUERY_GET_FREE_VEHICLES_IN_REGION)?; conn.prepare("get_region_worth_for_product", QUERY_GET_REGION_WORTH_FOR_PRODUCT)?; conn.prepare("insert_empty_transport", QUERY_INSERT_EMPTY_TRANSPORT)?; // Sammle alle Branches mit freien Transportmitteln und berechne Preisvorteil let mut branches_with_vehicles: Vec<(i32, i32, i32, f64)> = Vec::new(); // (branch_id, region_id, vehicle_count, price_delta) for branch_row in &branch_rows { let target_region_id = branch_row .get("region_id") .and_then(|v| v.parse::().ok()) .unwrap_or(-1); let target_branch_id = branch_row .get("branch_id") .and_then(|v| v.parse::().ok()) .unwrap_or(-1); if target_region_id < 0 || target_branch_id < 0 { continue; } // Prüfe auf freie Transportmittel in dieser Region let vehicle_rows = conn.execute( "get_free_vehicles_in_region", &[&falukant_user_id, &target_region_id], )?; if vehicle_rows.is_empty() { continue; } // Prüfe, ob eine Route zurück zum aktuellen Branch existiert let vehicles = Self::get_transport_vehicles_for_route( conn, falukant_user_id, target_region_id, current_region_id, )?; if vehicles.is_empty() { continue; } // Berechne Preisvorteil (vereinfacht: verwende worth_percent-Differenz) // Hole worth_percent für beide Regionen (für ein beliebiges Produkt) let mut price_delta = 0.0; const QUERY_GET_AVERAGE_WORTH: &str = r#" SELECT AVG(CASE WHEN region_id = $1 THEN worth_percent ELSE NULL END) AS current_worth, AVG(CASE WHEN region_id = $2 THEN worth_percent ELSE NULL END) AS target_worth FROM falukant_data.town_product_worth WHERE region_id IN ($1, $2); "#; conn.prepare("get_average_worth", QUERY_GET_AVERAGE_WORTH)?; let worth_rows = conn.execute( "get_average_worth", &[¤t_region_id, &target_region_id], )?; if let Some(worth_row) = worth_rows.into_iter().next() { let current_worth = worth_row .get("current_worth") .and_then(|v| v.parse::().ok()) .unwrap_or(100.0); let target_worth = worth_row .get("target_worth") .and_then(|v| v.parse::().ok()) .unwrap_or(100.0); price_delta = target_worth - current_worth; } branches_with_vehicles.push(( target_branch_id, target_region_id, vehicles.len() as i32, price_delta, )); } if branches_with_vehicles.is_empty() { eprintln!( "[DirectorWorker] Keine Branches mit freien Transportmitteln gefunden" ); return Ok(()); } // Wähle den Branch mit dem besten Preisvorteil (oder einfach den ersten, wenn alle gleich sind) branches_with_vehicles.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap_or(std::cmp::Ordering::Equal)); let (target_branch_id, target_region_id, vehicle_count, price_delta) = branches_with_vehicles[0]; eprintln!( "[DirectorWorker] Bester Branch für Fahrzeug-Rückholung: Branch {} (Region {}), {} Fahrzeuge, Preisvorteil: {:.2}%", target_branch_id, target_region_id, vehicle_count, price_delta ); // Hole die Fahrzeuge nochmal für diesen Branch let vehicles = Self::get_transport_vehicles_for_route( conn, falukant_user_id, target_region_id, current_region_id, )?; // Leere Transporte für alle verfügbaren Fahrzeuge anlegen let mut transport_count = 0; for vehicle in &vehicles { conn.execute( "insert_empty_transport", &[&target_region_id, ¤t_region_id, &vehicle.id], )?; transport_count += 1; } eprintln!( "[DirectorWorker] {} leere Transporte geplant: Region {} -> Region {}", transport_count, target_region_id, current_region_id ); Ok(()) } fn update_inventory_quantity( conn: &mut DbConnection, inventory_id: i32, new_quantity: i32, ) -> Result<(), DbError> { const QUERY_UPDATE_INVENTORY_QTY: &str = r#" UPDATE falukant_data.inventory SET quantity = $2 WHERE id = $1; "#; conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?; conn.execute("update_inventory_qty", &[&inventory_id, &new_quantity])?; Ok(()) } fn pay_salary(&mut self) -> Result<(), DbError> { self.base.set_current_step("DirectorWorker: pay_salary"); let mut conn = self .base .pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_salary_to_pay", QUERY_GET_SALARY_TO_PAY)?; conn.prepare("set_salary_payed", QUERY_SET_SALARY_PAYED)?; let rows = conn.execute("get_salary_to_pay", &[])?; let salaries: Vec = rows.into_iter().filter_map(Self::map_row_to_salary_item).collect(); for item in salaries { if let Err(err) = self.base.change_falukant_user_money( item.employer_user_id, -(item.income as f64), "director payed out", ) { eprintln!( "[DirectorWorker] Fehler bei change_falukant_user_money (director payed out): {err}" ); } conn.execute("set_salary_payed", &[&item.id])?; let message = format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, item.employer_user_id); self.base.broker.publish(message); } Ok(()) } fn map_row_to_salary_item(row: Row) -> Option { Some(SalaryItem { id: row.get("id")?.parse().ok()?, employer_user_id: row.get("employer_user_id")?.parse().ok()?, income: row.get("income")?.parse().ok()?, }) } fn calculate_satisfaction(&mut self) -> Result<(), DbError> { self.base .set_current_step("DirectorWorker: calculate_satisfaction"); let mut conn = self .base .pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("update_satisfaction", QUERY_UPDATE_SATISFACTION)?; let rows = conn.execute("update_satisfaction", &[])?; for row in rows { if let Some(employer_id) = row .get("employer_user_id") .and_then(|v| v.parse::().ok()) { let message = format!( r#"{{"event":"directorchanged","user_id":{}}}"#, employer_id ); self.base.broker.publish(message); } } Ok(()) } } impl Worker for DirectorWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { let mut worker = DirectorWorker::new(pool.clone(), broker.clone()); while state.running_worker.load(Ordering::Relaxed) { worker.run_iteration(&state); } }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } }