Files
yourpart-daemon/src/worker/director.rs

1682 lines
59 KiB
Rust

use crate::db::{DbConnection, DbError, Row};
use std::collections::HashMap;
use crate::message_broker::MessageBroker;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::db::ConnectionPool;
use super::base::{BaseWorker, Worker, WorkerState};
#[derive(Debug, Clone)]
struct Director {
id: i32,
branch_id: i32,
may_produce: bool,
may_sell: bool,
may_start_transport: bool,
}
#[derive(Debug, Clone)]
struct ProductionPlan {
falukant_user_id: i32,
money: f64,
certificate: i32,
branch_id: i32,
product_id: i32,
region_id: i32,
stock_size: i32,
used_in_stock: i32,
running_productions: i32,
running_productions_quantity: i32,
}
#[derive(Debug, Clone)]
struct InventoryItem {
id: i32,
product_id: i32,
quantity: i32,
quality: i32,
sell_cost: f64,
user_id: i32,
region_id: i32,
branch_id: i32,
worth_percent: f64, // Regionaler worth_percent-Wert für die Preisberechnung
}
#[derive(Debug, Clone)]
struct SalaryItem {
id: i32,
employer_user_id: i32,
income: i32,
}
#[derive(Debug, Clone)]
struct TransportVehicle {
id: i32,
capacity: i32,
}
pub struct DirectorWorker {
base: BaseWorker,
last_run: Option<Instant>,
}
// Maximale Anzahl paralleler Produktionen pro Branch
const MAX_PARALLEL_PRODUCTIONS: i32 = 2;
// SQL-Queries (1:1 aus director_worker.h)
const QUERY_GET_DIRECTORS: &str = r#"
SELECT
d.may_produce,
d.may_sell,
d.may_start_transport,
b.id AS branch_id,
fu.id AS falukantUserId,
d.id
FROM falukant_data.director d
JOIN falukant_data.falukant_user fu
ON fu.id = d.employer_user_id
JOIN falukant_data.character c
ON c.id = d.director_character_id
JOIN falukant_data.branch b
ON b.region_id = c.region_id
AND b.falukant_user_id = fu.id
WHERE current_time BETWEEN '08:00:00' AND '17:00:00';
"#;
const QUERY_GET_BEST_PRODUCTION: &str = r#"
SELECT
fdu.id falukant_user_id,
-- Geld explizit als Text casten, damit das Mapping im Rust-Code
-- zuverlässig funktioniert (unabhängig vom nativen DB-Typ wie `money`).
CAST(fdu.money AS text) AS money,
fdu.certificate,
ftp.id product_id,
ftp.label_tr,
fdb.region_id,
(
SELECT SUM(quantity)
FROM falukant_data.stock fds
WHERE fds.branch_id = fdb.id
) AS stock_size,
COALESCE((
SELECT SUM(COALESCE(fdi.quantity, 0))
FROM falukant_data.stock fds
JOIN falukant_data.inventory fdi
ON fdi.stock_id = fds.id
WHERE fds.branch_id = fdb.id
), 0) AS used_in_stock,
(ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category)
/ (300.0 * ftp.production_time) AS worth,
fdb.id AS branch_id,
(
SELECT COUNT(id)
FROM falukant_data.production
WHERE branch_id = fdb.id
) AS running_productions,
COALESCE((
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
FROM falukant_data.production fdp
WHERE fdp.branch_id = fdb.id
), 0) AS running_productions_quantity
FROM falukant_data.director fdd
JOIN falukant_data.character fdc
ON fdc.id = fdd.director_character_id
JOIN falukant_data.falukant_user fdu
ON fdd.employer_user_id = fdu.id
JOIN falukant_data.character user_character
ON user_character.user_id = fdu.id
JOIN falukant_data.branch fdb
ON fdb.falukant_user_id = fdu.id
AND fdb.region_id = fdc.region_id
JOIN falukant_data.town_product_worth fdtpw
ON fdtpw.region_id = fdb.region_id
JOIN falukant_data.knowledge fdk_character
ON fdk_character.product_id = fdtpw.product_id
AND fdk_character.character_id = user_character.id
JOIN falukant_data.knowledge fdk_director
ON fdk_director.product_id = fdtpw.product_id
AND fdk_director.character_id = fdd.director_character_id
JOIN falukant_type.product ftp
ON ftp.id = fdtpw.product_id
AND ftp.category <= fdu.certificate
WHERE fdd.id = $1
AND fdb.id = $2
ORDER BY worth DESC
LIMIT 1;
"#;
const QUERY_INSERT_PRODUCTION: &str = r#"
INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id)
VALUES ($1, $2, $3, (
SELECT weather_type_id
FROM falukant_data.weather
WHERE region_id = $4
));
"#;
// Query zum Abfragen der aktuellen Lager- und Produktionswerte für einen Branch
// (ohne den kompletten Produktionsplan neu zu berechnen)
const QUERY_GET_BRANCH_CAPACITY: &str = r#"
SELECT
(
SELECT SUM(quantity)
FROM falukant_data.stock fds
WHERE fds.branch_id = $1
) AS stock_size,
COALESCE((
SELECT SUM(COALESCE(fdi.quantity, 0))
FROM falukant_data.stock fds
JOIN falukant_data.inventory fdi
ON fdi.stock_id = fds.id
WHERE fds.branch_id = $1
), 0) AS used_in_stock,
(
SELECT COUNT(id)
FROM falukant_data.production
WHERE branch_id = $1
) AS running_productions,
COALESCE((
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
FROM falukant_data.production fdp
WHERE fdp.branch_id = $1
), 0) AS running_productions_quantity;
"#;
const QUERY_GET_INVENTORY: &str = r#"
SELECT
i.id,
i.product_id,
i.quantity,
i.quality,
p.sell_cost,
fu.id AS user_id,
b.region_id,
b.id AS branch_id,
COALESCE(tpw.worth_percent, 100.0) AS worth_percent
FROM falukant_data.inventory i
JOIN falukant_data.stock s
ON s.id = i.stock_id
JOIN falukant_data.branch b
ON b.id = s.branch_id
JOIN falukant_data.falukant_user fu
ON fu.id = b.falukant_user_id
JOIN falukant_data.director d
ON d.employer_user_id = fu.id
JOIN falukant_type.product p
ON p.id = i.product_id
LEFT JOIN falukant_data.town_product_worth tpw
ON tpw.region_id = b.region_id
AND tpw.product_id = i.product_id
WHERE d.id = $1
AND b.id = $2;
"#;
const QUERY_REMOVE_INVENTORY: &str = r#"
DELETE FROM falukant_data.inventory
WHERE id = $1;
"#;
const QUERY_ADD_SELL_LOG: &str = r#"
INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (region_id, product_id, seller_id)
DO UPDATE
SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity;
"#;
// Regionale Verkaufswürdigkeit pro Produkt/Region für alle Branches eines Users
const QUERY_GET_REGION_WORTH_FOR_PRODUCT: &str = r#"
SELECT
tpw.region_id,
tpw.product_id,
tpw.worth_percent
FROM falukant_data.town_product_worth tpw
JOIN falukant_data.branch b
ON b.region_id = tpw.region_id
WHERE b.falukant_user_id = $1
AND tpw.product_id = $2;
"#;
// Verfügbare Transportmittel für eine Route (source_region -> target_region)
const QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE: &str = r#"
SELECT
v.id AS vehicle_id,
vt.capacity AS capacity
FROM falukant_data.vehicle v
JOIN falukant_type.vehicle vt
ON vt.id = v.vehicle_type_id
JOIN falukant_data.region_distance rd
ON (
(rd.source_region_id = v.region_id AND rd.target_region_id = $3)
OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id)
)
AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
WHERE v.falukant_user_id = $1
AND v.region_id = $2;
"#;
// Transport-Eintrag anlegen
const QUERY_INSERT_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport
(source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW());
"#;
// Leere Transporte (product_id = NULL, size = 0) zum Zurückholen von Fahrzeugen
const QUERY_INSERT_EMPTY_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport
(source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at)
VALUES ($1, $2, NULL, 0, $3, NOW(), NOW());
"#;
// Alle Branches des Users mit ihren Regionen
const QUERY_GET_USER_BRANCHES: &str = r#"
SELECT DISTINCT
b.region_id,
b.id AS branch_id
FROM falukant_data.branch b
WHERE b.falukant_user_id = $1
AND b.region_id != $2;
"#;
// Freie Transportmittel in einer Region (nicht in aktiven Transporten)
// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert
const QUERY_GET_FREE_VEHICLES_IN_REGION: &str = r#"
SELECT
v.id AS vehicle_id,
vt.capacity AS capacity
FROM falukant_data.vehicle v
JOIN falukant_type.vehicle vt
ON vt.id = v.vehicle_type_id
WHERE v.falukant_user_id = $1
AND v.region_id = $2
AND v.id NOT IN (
SELECT DISTINCT t.vehicle_id
FROM falukant_data.transport t
WHERE t.vehicle_id IS NOT NULL
);
"#;
const QUERY_GET_SALARY_TO_PAY: &str = r#"
SELECT d.id, d.employer_user_id, d.income
FROM falukant_data.director d
WHERE DATE(d.last_salary_payout) < DATE(NOW());
"#;
const QUERY_SET_SALARY_PAYED: &str = r#"
UPDATE falukant_data.director
SET last_salary_payout = NOW()
WHERE id = $1;
"#;
const QUERY_UPDATE_SATISFACTION: &str = r#"
WITH new_sats AS (
SELECT
d.id,
ROUND(
d.income::numeric
/
(
c.title_of_nobility
* POWER(1.231, AVG(k.knowledge) / 1.5)
)
* 100
) AS new_satisfaction
FROM falukant_data.director d
JOIN falukant_data.knowledge k
ON d.director_character_id = k.character_id
JOIN falukant_data.character c
ON c.id = d.director_character_id
GROUP BY d.id, c.title_of_nobility, d.income
)
UPDATE falukant_data.director dir
SET satisfaction = ns.new_satisfaction
FROM new_sats ns
WHERE dir.id = ns.id
AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction
RETURNING dir.employer_user_id;
"#;
impl DirectorWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self {
base: BaseWorker::new("DirectorWorker", pool, broker),
last_run: None,
}
}
fn run_iteration(&mut self, state: &WorkerState) {
self.base.set_current_step("DirectorWorker iteration");
let now = Instant::now();
let should_run = match self.last_run {
None => true,
Some(last) => now.saturating_duration_since(last) >= Duration::from_secs(60),
};
if should_run {
if let Err(err) = self.perform_all_tasks() {
eprintln!("[DirectorWorker] Fehler beim Ausführen der Aufgabe: {err}");
}
self.last_run = Some(now);
}
std::thread::sleep(Duration::from_secs(1));
if !state.running_worker.load(Ordering::Relaxed) {
return;
}
}
fn perform_all_tasks(&mut self) -> Result<(), DbError> {
// Produktions-/Verkaufs-/Transportlogik für alle Direktoren
self.perform_task()?;
self.pay_salary()?;
self.calculate_satisfaction()?;
Ok(())
}
fn perform_task(&mut self) -> Result<(), DbError> {
self.base
.set_current_step("Get director actions from DB");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_directors", QUERY_GET_DIRECTORS)?;
let directors_rows = conn.execute("get_directors", &[])?;
let directors: Vec<Director> = directors_rows
.into_iter()
.filter_map(Self::map_row_to_director)
.collect();
if directors.is_empty() {
eprintln!("[DirectorWorker] Keine Direktoren für Aktionen gefunden (Zeitfenster oder DB-Daten).");
}
for director in directors {
if director.may_produce {
eprintln!(
"[DirectorWorker] Starte Produktionsprüfung für Director {} (branch_id={})",
director.id, director.branch_id
);
self.start_productions(&director)?;
}
if director.may_start_transport {
eprintln!(
"[DirectorWorker] Starte Transportprüfung für Director {} (branch_id={})",
director.id, director.branch_id
);
if let Err(err) = self.start_transports_stub(&director) {
eprintln!(
"[DirectorWorker] Fehler bei start_transports für Director {}: {err}",
director.id
);
}
}
if director.may_sell {
eprintln!(
"[DirectorWorker] Starte Verkaufsprüfung für Director {} (branch_id={})",
director.id, director.branch_id
);
self.start_sellings(&director)?;
}
}
Ok(())
}
fn map_row_to_director(row: Row) -> Option<Director> {
Some(Director {
id: row.get("id")?.parse().ok()?,
branch_id: row.get("branch_id")?.parse().ok()?,
may_produce: row.get("may_produce").map(|v| v == "t" || v == "true").unwrap_or(false),
may_sell: row.get("may_sell").map(|v| v == "t" || v == "true").unwrap_or(false),
may_start_transport: row
.get("may_start_transport")
.map(|v| v == "t" || v == "true")
.unwrap_or(false),
})
}
fn start_productions(&mut self, director: &Director) -> Result<(), DbError> {
self.base
.set_current_step("DirectorWorker: start_productions");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Initial: Bestes Produkt für diesen Branch ermitteln
conn.prepare("get_to_produce", QUERY_GET_BEST_PRODUCTION)?;
let rows = conn.execute("get_to_produce", &[&director.id, &director.branch_id])?;
if rows.is_empty() {
eprintln!(
"[DirectorWorker] Keine Produktionskandidaten für Director {} gefunden.",
director.id
);
return Ok(());
}
let mut base_plan = match Self::map_row_to_production_plan(&rows[0]) {
Some(p) => p,
None => {
eprintln!(
"[DirectorWorker] Produktionsplan für Director {} konnte nicht gemappt werden.",
director.id
);
return Ok(());
}
};
eprintln!(
"[DirectorWorker] Produktionsplan: director_user_id={}, branch_id={}, product_id={}, money={}, certificate={}",
base_plan.falukant_user_id,
base_plan.branch_id,
base_plan.product_id,
base_plan.money,
base_plan.certificate
);
// Query zum Abfragen der aktuellen Kapazitätswerte vorbereiten
conn.prepare("get_branch_capacity", QUERY_GET_BRANCH_CAPACITY)?;
// Schleife: Starte Produktionen, bis entweder die maximale Anzahl erreicht ist
// oder kein freier Lagerplatz mehr vorhanden ist
loop {
// Aktuelle Kapazitätswerte abfragen
let capacity_rows = conn.execute("get_branch_capacity", &[&director.branch_id])?;
if capacity_rows.is_empty() {
break;
}
let row = &capacity_rows[0];
let stock_size: i32 = row
.get("stock_size")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let used_in_stock: i32 = row
.get("used_in_stock")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions: i32 = row
.get("running_productions")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions_quantity: i32 = row
.get("running_productions_quantity")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
// Prüfen, ob noch Produktionen gestartet werden können
if running_productions >= MAX_PARALLEL_PRODUCTIONS {
eprintln!(
"[DirectorWorker] Maximale Anzahl an Produktionen ({}) erreicht für Branch {}.",
MAX_PARALLEL_PRODUCTIONS,
director.branch_id
);
break;
}
// Freie Kapazität berechnen
let free_capacity = stock_size - used_in_stock - running_productions_quantity;
if free_capacity <= 0 {
eprintln!(
"[DirectorWorker] Kein freier Lagerplatz mehr für Branch {} (stock_size={}, used={}, running_qty={}).",
director.branch_id,
stock_size,
used_in_stock,
running_productions_quantity
);
break;
}
// Plan mit aktuellen Werten aktualisieren
base_plan.stock_size = stock_size;
base_plan.used_in_stock = used_in_stock;
base_plan.running_productions = running_productions;
base_plan.running_productions_quantity = running_productions_quantity;
// Eine neue Produktion starten (max. 100 Stück)
if let Err(err) = self.create_single_production(&mut conn, &base_plan) {
eprintln!(
"[DirectorWorker] Fehler beim Starten einer Produktion: {err}"
);
break;
}
}
Ok(())
}
fn map_row_to_production_plan(row: &Row) -> Option<ProductionPlan> {
// Pflichtfelder: ohne diese können wir keinen sinnvollen Plan erstellen.
let falukant_user_id: i32 = row.get("falukant_user_id")?.parse().ok()?;
let certificate: i32 = row.get("certificate")?.parse().ok()?;
let branch_id: i32 = row.get("branch_id")?.parse().ok()?;
let product_id: i32 = row.get("product_id")?.parse().ok()?;
// Optionale/abgeleitete Felder: hier sind wir tolerant und verwenden
// Default-Werte, falls NULL oder nicht parsbar.
let money: f64 = row
.get("money")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0);
let stock_size: i32 = row
.get("stock_size")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let used_in_stock: i32 = row
.get("used_in_stock")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions: i32 = row
.get("running_productions")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions_quantity: i32 = row
.get("running_productions_quantity")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let region_id: i32 = row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
Some(ProductionPlan {
falukant_user_id,
money,
certificate,
branch_id,
product_id,
region_id,
stock_size,
used_in_stock,
running_productions,
running_productions_quantity,
})
}
/// Startet eine einzelne Produktion (max. 100 Stück) basierend auf dem aktuellen Plan.
/// Die äußere Schleife in `start_productions` sorgt dafür, dass mehrere Produktionen
/// gestartet werden können, bis entweder die maximale Anzahl erreicht ist oder
/// kein freier Lagerplatz mehr vorhanden ist.
fn create_single_production(
&mut self,
conn: &mut DbConnection,
plan: &ProductionPlan,
) -> Result<(), DbError> {
// Freie Lagerkapazität: Gesamtbestand minus bereits belegter Bestand
// (Inventar) minus bereits eingeplante Produktionsmengen.
let free_capacity =
plan.stock_size - plan.used_in_stock - plan.running_productions_quantity;
// Stückkosten monetär berechnen. Da money ein f64 ist, arbeiten wir hier ebenfalls
// mit Gleitkomma und runden erst am Ende auf eine ganze Stückzahl ab.
let one_piece_cost = (plan.certificate * 6) as f64;
let mut max_money_production: i32 = 0;
if one_piece_cost > 0.0 {
if plan.money > 0.0 {
// Anzahl Stück, die sich mit dem verfügbaren Geld finanzieren lassen
max_money_production = (plan.money / one_piece_cost).floor() as i32;
} else {
// Falls das Geld aus der DB unerwartet als 0 eingelesen wurde, aber
// eigentlich ausreichend Guthaben vorhanden ist (bekannter Migrationsfall),
// lassen wir die Geldbegrenzung vorläufig fallen und begrenzen nur über
// Lagerkapazität und Hard-Limit 100.
eprintln!(
"[DirectorWorker] Warnung: money=0 für falukant_user_id={}, \
verwende nur Lagerkapazität als Limit.",
plan.falukant_user_id
);
max_money_production = i32::MAX;
}
}
// Maximale Produktionsmenge begrenzen:
// - nie mehr als der freie Lagerplatz (`free_capacity`)
// - nie mehr als durch das verfügbare Geld finanzierbar
// - absolut maximal 100 Einheiten pro Produktion
let to_produce = free_capacity
.min(max_money_production)
.min(100)
.max(0);
eprintln!(
"[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}",
free_capacity,
one_piece_cost,
max_money_production,
to_produce,
plan.running_productions
);
if to_produce < 1 {
eprintln!(
"[DirectorWorker] Keine Produktion gestartet: free_capacity={}, max_money_production={}, running_productions={}, running_qty={}",
free_capacity,
max_money_production,
plan.running_productions,
plan.running_productions_quantity
);
return Ok(());
}
let production_cost = to_produce as f64 * one_piece_cost;
if let Err(err) = self.base.change_falukant_user_money(
plan.falukant_user_id,
-production_cost,
"director starts production",
) {
eprintln!(
"[DirectorWorker] Fehler bei change_falukant_user_money: {err}"
);
}
conn.prepare("insert_production", QUERY_INSERT_PRODUCTION)?;
// Eine einzelne Produktion mit max. 100 Stück anlegen
// Das aktuelle Wetter der Region wird automatisch aus der weather-Tabelle geholt
conn.execute(
"insert_production",
&[&plan.branch_id, &plan.product_id, &to_produce, &plan.region_id],
)?;
eprintln!(
"[DirectorWorker] Produktion angelegt: branch_id={}, product_id={}, quantity={}",
plan.branch_id, plan.product_id, to_produce
);
let message = format!(
r#"{{"event":"production_started","branch_id":{}}}"#,
plan.branch_id
);
self.base.broker.publish(message);
Ok(())
}
fn start_transports_stub(&mut self, director: &Director) -> Result<(), DbError> {
self.base
.set_current_step("DirectorWorker: start_transports");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_to_transport", QUERY_GET_INVENTORY)?;
let rows = conn.execute("get_to_transport", &[&director.id, &director.branch_id])?;
let mut items: Vec<InventoryItem> =
rows.into_iter().filter_map(Self::map_row_to_inventory_item).collect();
eprintln!(
"[DirectorWorker] Transportprüfung für Director {} (branch_id={}): {} Inventar-Items gefunden",
director.id, director.branch_id, items.len()
);
// Für alle Items dieses Directors sollten die user_id-Felder identisch
// sein (Arbeitgeber des Directors).
let falukant_user_id = if items.is_empty() {
// Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln
const QUERY_GET_DIRECTOR_USER: &str = r#"
SELECT employer_user_id
FROM falukant_data.director
WHERE id = $1;
"#;
conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?;
let user_rows = conn.execute("get_director_user", &[&director.id])?;
user_rows
.into_iter()
.next()
.and_then(|row| row.get("employer_user_id").and_then(|v| v.parse::<i32>().ok()))
.ok_or_else(|| DbError::new("Konnte employer_user_id nicht ermitteln"))?
} else {
items[0].user_id
};
// Prüfe, ob Transportmittel im aktuellen Branch vorhanden sind
// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert
const QUERY_COUNT_VEHICLES_IN_BRANCH_REGION: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.vehicle v
JOIN falukant_data.branch b
ON b.region_id = v.region_id
WHERE v.falukant_user_id = $1
AND b.id = $2
AND v.id NOT IN (
SELECT DISTINCT t.vehicle_id
FROM falukant_data.transport t
WHERE t.vehicle_id IS NOT NULL
);
"#;
conn.prepare("count_vehicles_in_branch", QUERY_COUNT_VEHICLES_IN_BRANCH_REGION)?;
let vehicle_count_rows = conn.execute(
"count_vehicles_in_branch",
&[&falukant_user_id, &director.branch_id],
)?;
let vehicles_in_branch = vehicle_count_rows
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0);
// Falls es nichts zu transportieren gibt, prüfe auf leere Transporte
if items.is_empty() {
eprintln!(
"[DirectorWorker] Keine Inventar-Items für Transporte gefunden für Director {} (branch_id={})",
director.id, director.branch_id
);
// Wenn keine Transportmittel im Branch vorhanden sind, versuche leere Transporte zu planen
if vehicles_in_branch == 0 {
eprintln!(
"[DirectorWorker] Keine Transportmittel im Branch {} vorhanden, prüfe auf leere Transporte zum Zurückholen",
director.branch_id
);
if let Err(err) = self.plan_empty_transports_for_vehicle_retrieval(
&mut conn,
falukant_user_id,
director.branch_id,
) {
eprintln!(
"[DirectorWorker] Fehler beim Planen leerer Transporte: {err}"
);
}
}
return Ok(());
}
// Wenn keine Transportmittel im Branch vorhanden sind, aber Items vorhanden sind,
// versuche leere Transporte zu planen, um Fahrzeuge zurückzuholen
if vehicles_in_branch == 0 && !items.is_empty() {
eprintln!(
"[DirectorWorker] Keine Transportmittel im Branch {} vorhanden, aber {} Items vorhanden. Prüfe auf leere Transporte zum Zurückholen",
director.branch_id, items.len()
);
if let Err(err) = self.plan_empty_transports_for_vehicle_retrieval(
&mut conn,
falukant_user_id,
director.branch_id,
) {
eprintln!(
"[DirectorWorker] Fehler beim Planen leerer Transporte: {err}"
);
}
// Nach dem Planen leerer Transporte erneut prüfen, ob jetzt Transportmittel vorhanden sind
let vehicle_count_rows_after = conn.execute(
"count_vehicles_in_branch",
&[&falukant_user_id, &director.branch_id],
)?;
let vehicles_in_branch_after = vehicle_count_rows_after
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0);
if vehicles_in_branch_after == 0 {
eprintln!(
"[DirectorWorker] Nach Planen leerer Transporte immer noch keine Transportmittel im Branch {} vorhanden. Überspringe normale Transportplanung.",
director.branch_id
);
return Ok(());
}
}
// Lohnende Transporte planen. Dabei werden:
// - ggf. Transport-Einträge erzeugt
// - Inventar-Mengen reduziert
for item in items.iter_mut() {
eprintln!(
"[DirectorWorker] Prüfe Transport für Item: product_id={}, quantity={}, quality={}, region_id={}, branch_id={}",
item.product_id, item.quantity, item.quality, item.region_id, item.branch_id
);
let shipped = self.plan_transports_for_item(
&mut conn,
falukant_user_id,
item,
)?;
if shipped > 0 {
eprintln!(
"[DirectorWorker] Transport geplant: {} Einheiten von Produkt {} transportiert",
shipped, item.product_id
);
if shipped >= item.quantity {
// Alles wurde in Transporte umgewandelt, Inventar komplett entfernen.
conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?;
conn.execute("remove_inventory", &[&item.id])?;
item.quantity = 0;
} else {
// Inventar-Menge in der DB reduzieren und im Item anpassen.
let remaining = item.quantity - shipped;
Self::update_inventory_quantity(&mut conn, item.id, remaining)?;
item.quantity = remaining;
}
} else {
eprintln!(
"[DirectorWorker] Kein lohnender Transport gefunden für Produkt {} (region_id={})",
item.product_id, item.region_id
);
}
}
// Nach normalen Transporten: Wenn keine Transportmittel mehr im Branch vorhanden sind,
// aber bessere Verkaufspreise in anderen Branches existieren, plane leere Transporte
let vehicle_count_rows_final = conn.execute(
"count_vehicles_in_branch",
&[&falukant_user_id, &director.branch_id],
)?;
let vehicles_in_branch_final = vehicle_count_rows_final
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0);
if vehicles_in_branch_final == 0 {
eprintln!(
"[DirectorWorker] Nach Transporten keine Transportmittel mehr im Branch {} vorhanden, prüfe auf leere Transporte zum Zurückholen",
director.branch_id
);
if let Err(err) = self.plan_empty_transports_for_vehicle_retrieval(
&mut conn,
falukant_user_id,
director.branch_id,
) {
eprintln!(
"[DirectorWorker] Fehler beim Planen leerer Transporte: {err}"
);
}
}
Ok(())
}
fn start_sellings(&mut self, director: &Director) -> Result<(), DbError> {
self.base
.set_current_step("DirectorWorker: start_sellings");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_to_sell", QUERY_GET_INVENTORY)?;
let rows = conn.execute("get_to_sell", &[&director.id, &director.branch_id])?;
let mut items: Vec<InventoryItem> =
rows.into_iter().filter_map(Self::map_row_to_inventory_item).collect();
conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?;
conn.prepare("add_sell_log", QUERY_ADD_SELL_LOG)?;
// Falls es nichts zu verkaufen gibt, können wir sofort zurückkehren.
if items.is_empty() {
return Ok(());
}
// Für alle Items dieses Directors sollten die user_id-Felder identisch
// sein (Arbeitgeber des Directors).
let falukant_user_id = items[0].user_id;
// Vor dem eigentlichen Verkauf versucht der Director, lohnende
// Transporte zu planen. Dabei werden:
// - ggf. Transport-Einträge erzeugt
// - Inventar-Mengen reduziert
// Die zurückgegebenen Mengen werden dann lokal verkauft.
for item in items.iter_mut() {
let shipped = self.plan_transports_for_item(
&mut conn,
falukant_user_id,
item,
)?;
if shipped > 0 {
if shipped >= item.quantity {
// Alles wurde in Transporte umgewandelt, lokal nichts mehr zu verkaufen.
item.quantity = 0;
} else {
// Inventar-Menge in der DB reduzieren und im Item anpassen.
let remaining = item.quantity - shipped;
Self::update_inventory_quantity(&mut conn, item.id, remaining)?;
item.quantity = remaining;
}
}
}
// Anschließend lokale Verkäufe für die verbleibenden Mengen durchführen.
for item in items.drain(..) {
if item.quantity > 0 {
self.sell_single_inventory_item(&mut conn, &item)?;
} else {
// Falls die Menge auf 0 gesetzt wurde, das Inventar ggf. aufräumen.
conn.execute("remove_inventory", &[&item.id])?;
}
}
Ok(())
}
fn map_row_to_inventory_item(row: Row) -> Option<InventoryItem> {
Some(InventoryItem {
id: row.get("id")?.parse().ok()?,
product_id: row.get("product_id")?.parse().ok()?,
quantity: row.get("quantity")?.parse().ok()?,
quality: row.get("quality")?.parse().ok()?,
sell_cost: row.get("sell_cost")?.parse().ok()?,
user_id: row.get("user_id")?.parse().ok()?,
region_id: row.get("region_id")?.parse().ok()?,
branch_id: row.get("branch_id")?.parse().ok()?,
worth_percent: row
.get("worth_percent")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0),
})
}
fn sell_single_inventory_item(
&mut self,
conn: &mut DbConnection,
item: &InventoryItem,
) -> Result<(), DbError> {
if item.quantity <= 0 {
conn.execute("remove_inventory", &[&item.id])?;
return Ok(());
}
// Neue Preisberechnung gemäß Spezifikation:
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let base_price = item.sell_cost * (item.worth_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let min_price = base_price * 0.6;
let max_price = base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
// knowledgeFactor ist hier item.quality
let knowledge_factor = item.quality as f64;
let piece_sell_price = min_price + (max_price - min_price) * (knowledge_factor / 100.0);
let sell_price = piece_sell_price * item.quantity as f64;
if let Err(err) = self.base.change_falukant_user_money(
item.user_id,
sell_price,
"sell products",
) {
eprintln!(
"[DirectorWorker] Fehler bei change_falukant_user_money (sell products): {err}"
);
}
conn.execute(
"add_sell_log",
&[
&item.region_id,
&item.product_id,
&item.quantity,
&item.user_id,
],
)?;
conn.execute("remove_inventory", &[&item.id])?;
let message = format!(
r#"{{"event":"selled_items","branch_id":{}}}"#,
item.branch_id
);
self.base.broker.publish(message);
Ok(())
}
/// Plant ggf. Transporte für ein einzelnes Inventar-Item und gibt die
/// Menge zurück, die tatsächlich in Transporte umgewandelt wurde.
///
/// Logik:
/// - Ermittle regionale "worth_percent"-Werte für das Produkt in allen
/// Branch-Regionen des Users.
/// - Berechne lokalen Stückpreis (inkl. Qualität) und für jede andere
/// Region einen potentiellen Stückpreis.
/// - Prüfe für jede Zielregion:
/// * Gibt es verfügbare Transportmittel für diese Route?
/// * Ist der Mehrerlös (deltaPrice * Menge) größer als die
/// Transportkosten (max(1, totalValue * 0.01))?
/// - Wähle die Zielregion mit dem größten positiven Nettogewinn und
/// erzeuge entsprechende Transporte (begrenzt durch Fahrzeugkapazität).
fn plan_transports_for_item(
&mut self,
conn: &mut DbConnection,
falukant_user_id: i32,
item: &mut InventoryItem,
) -> Result<i32, DbError> {
// Sicherheitscheck
if item.quantity <= 0 {
return Ok(0);
}
// Regionale worth_percent-Werte für dieses Produkt laden
conn.prepare(
"get_region_worth_for_product",
QUERY_GET_REGION_WORTH_FOR_PRODUCT,
)?;
let rows = conn.execute(
"get_region_worth_for_product",
&[&falukant_user_id, &item.product_id],
)?;
if rows.is_empty() {
return Ok(0);
}
let mut worth_by_region: HashMap<i32, f64> = HashMap::new();
for row in rows {
let region_id = row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
let percent = row
.get("worth_percent")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0);
if region_id >= 0 {
worth_by_region.insert(region_id, percent);
}
}
if worth_by_region.is_empty() {
eprintln!(
"[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden",
item.product_id
);
return Ok(0);
}
eprintln!(
"[DirectorWorker] Gefundene Regionen für Produkt {}: {} Regionen",
item.product_id, worth_by_region.len()
);
// Lokalen Stückpreis berechnen (neue Preisberechnung)
let local_percent = worth_by_region
.get(&item.region_id)
.copied()
.unwrap_or(100.0);
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let local_base_price = item.sell_cost * (local_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let local_min_price = local_base_price * 0.6;
let local_max_price = local_base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
let knowledge_factor = item.quality as f64;
let local_piece_price = local_min_price + (local_max_price - local_min_price) * (knowledge_factor / 100.0);
eprintln!(
"[DirectorWorker] Lokaler Preis für Produkt {}: {:.2} (worth_percent={:.2}, quality={})",
item.product_id, local_piece_price, local_percent, item.quality
);
let mut best_target_region: Option<i32> = None;
let mut best_quantity: i32 = 0;
let mut best_remote_piece_price: f64 = 0.0;
let mut best_gain: f64 = 0.0;
// Für jede andere Region prüfen, ob sich ein Transport lohnt.
for (&region_id, &remote_percent) in &worth_by_region {
if region_id == item.region_id {
continue;
}
// Remote-Stückpreis berechnen (neue Preisberechnung)
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let remote_base_price = item.sell_cost * (remote_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let remote_min_price = remote_base_price * 0.6;
let remote_max_price = remote_base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
let knowledge_factor = item.quality as f64;
let remote_piece_price = remote_min_price + (remote_max_price - remote_min_price) * (knowledge_factor / 100.0);
let delta_per_unit = remote_piece_price - local_piece_price;
eprintln!(
"[DirectorWorker] Region {}: Preis {:.2}, Delta {:.2}",
region_id, remote_piece_price, delta_per_unit
);
if delta_per_unit <= 0.0 {
eprintln!(
"[DirectorWorker] Region {}: Kein Preisvorteil (Delta <= 0)",
region_id
);
continue;
}
// Verfügbare Transportmittel für diese Route abfragen
let vehicles = Self::get_transport_vehicles_for_route(
conn,
falukant_user_id,
item.region_id,
region_id,
)?;
eprintln!(
"[DirectorWorker] Region {}: {} verfügbare Transportmittel",
region_id, vehicles.len()
);
if vehicles.is_empty() {
eprintln!(
"[DirectorWorker] Region {}: Keine verfügbaren Transportmittel",
region_id
);
continue;
}
// Maximale transportierbare Menge anhand der Kapazität ermitteln
let mut max_capacity: i32 = 0;
for v in &vehicles {
max_capacity = max_capacity.saturating_add(v.capacity);
}
if max_capacity <= 0 {
continue;
}
let qty = std::cmp::min(item.quantity, max_capacity);
if qty <= 0 {
continue;
}
let extra_revenue = delta_per_unit * qty as f64;
let total_value = remote_piece_price * qty as f64;
let transport_cost = total_value * 0.01_f64;
// Kostenformel: max(0.01, totalValue * 0.01)
let transport_cost = if transport_cost < 0.01 {
0.01
} else {
transport_cost
};
let net_gain = extra_revenue - transport_cost;
eprintln!(
"[DirectorWorker] Region {}: extra_revenue={:.2}, transport_cost={:.2}, net_gain={:.2}, qty={}",
region_id, extra_revenue, transport_cost, net_gain, qty
);
if net_gain <= 0.0 {
eprintln!(
"[DirectorWorker] Region {}: Netto-Gewinn <= 0, überspringe",
region_id
);
continue;
}
if net_gain > best_gain {
eprintln!(
"[DirectorWorker] Region {}: Neuer bester Transport (Gewinn {:.2})",
region_id, net_gain
);
best_gain = net_gain;
best_target_region = Some(region_id);
best_quantity = qty;
best_remote_piece_price = remote_piece_price;
}
}
// Kein lohnender Transport gefunden
let target_region = match best_target_region {
Some(r) => r,
None => return Ok(0),
};
if best_quantity <= 0 {
return Ok(0);
}
// Nochmals verfügbare Transportmittel für die gewählte Route laden
let vehicles = Self::get_transport_vehicles_for_route(
conn,
falukant_user_id,
item.region_id,
target_region,
)?;
if vehicles.is_empty() {
return Ok(0);
}
// Transporte anlegen, begrenzt durch best_quantity und Kapazitäten
conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?;
let mut remaining = best_quantity;
for v in &vehicles {
if remaining <= 0 {
break;
}
let size = std::cmp::min(remaining, v.capacity);
if size <= 0 {
continue;
}
conn.execute(
"insert_transport",
&[
&item.region_id,
&target_region,
&item.product_id,
&size,
&v.id,
],
)?;
remaining -= size;
}
let shipped = best_quantity - remaining.max(0);
// Optional: Logging zur Nachvollziehbarkeit
if shipped > 0 {
eprintln!(
"[DirectorWorker] Transport geplant: {} Einheiten von Produkt {} von Region {} nach Region {} (Stückpreis lokal {:.2}, remote {:.2})",
shipped, item.product_id, item.region_id, target_region, local_piece_price, best_remote_piece_price
);
}
Ok(shipped)
}
fn get_transport_vehicles_for_route(
conn: &mut DbConnection,
falukant_user_id: i32,
source_region: i32,
target_region: i32,
) -> Result<Vec<TransportVehicle>, DbError> {
// Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren
const QUERY_COUNT_VEHICLES_IN_REGION: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.vehicle v
WHERE v.falukant_user_id = $1
AND v.region_id = $2;
"#;
conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?;
let vehicle_count_rows = conn.execute(
"count_vehicles_in_region",
&[&falukant_user_id, &source_region],
)?;
let vehicle_count = vehicle_count_rows
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0);
eprintln!(
"[DirectorWorker] Fahrzeuge in Region {} für User {}: {}",
source_region, falukant_user_id, vehicle_count
);
// Debug: Prüfe, ob eine Route existiert
const QUERY_CHECK_ROUTE: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.region_distance rd
WHERE (rd.source_region_id = $1 AND rd.target_region_id = $2)
OR (rd.source_region_id = $2 AND rd.target_region_id = $1);
"#;
conn.prepare("check_route", QUERY_CHECK_ROUTE)?;
let route_rows = conn.execute(
"check_route",
&[&source_region, &target_region],
)?;
let route_exists = route_rows
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0) > 0;
eprintln!(
"[DirectorWorker] Route von Region {} nach Region {} existiert: {}",
source_region, target_region, route_exists
);
conn.prepare(
"get_transport_vehicles_for_route",
QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE,
)?;
let rows = conn.execute(
"get_transport_vehicles_for_route",
&[&falukant_user_id, &source_region, &target_region],
)?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
let id = row
.get("vehicle_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
let capacity = row
.get("capacity")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
if id >= 0 && capacity > 0 {
result.push(TransportVehicle { id, capacity });
}
}
eprintln!(
"[DirectorWorker] Gefundene Transportmittel für Route {} -> {}: {}",
source_region, target_region, result.len()
);
Ok(result)
}
/// Plant leere Transporte, um Fahrzeuge zurückzuholen, wenn:
/// - Keine Transportmittel im aktuellen Branch vorhanden sind
/// - Aber bessere Verkaufspreise in anderen Branches existieren
/// - Freie Transportmittel in anderen Regionen verfügbar sind
fn plan_empty_transports_for_vehicle_retrieval(
&mut self,
conn: &mut DbConnection,
falukant_user_id: i32,
current_branch_id: i32,
) -> Result<(), DbError> {
// Aktuelle Branch-Region ermitteln
const QUERY_GET_BRANCH_REGION: &str = r#"
SELECT region_id
FROM falukant_data.branch
WHERE id = $1;
"#;
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
let branch_rows = conn.execute("get_branch_region", &[&current_branch_id])?;
let current_region_id = match branch_rows.into_iter().next() {
Some(row) => row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.ok_or_else(|| DbError::new("Konnte region_id nicht ermitteln"))?,
None => return Ok(()), // Branch nicht gefunden, nichts zu tun
};
// Alle anderen Branches des Users finden
conn.prepare("get_user_branches", QUERY_GET_USER_BRANCHES)?;
let branch_rows = conn.execute(
"get_user_branches",
&[&falukant_user_id, &current_region_id],
)?;
if branch_rows.is_empty() {
eprintln!(
"[DirectorWorker] Keine anderen Branches für User {} gefunden",
falukant_user_id
);
return Ok(());
}
// Für jeden anderen Branch prüfen, ob freie Transportmittel verfügbar sind
// und ob bessere Verkaufspreise existieren (zur Priorisierung)
conn.prepare("get_free_vehicles_in_region", QUERY_GET_FREE_VEHICLES_IN_REGION)?;
conn.prepare("get_region_worth_for_product", QUERY_GET_REGION_WORTH_FOR_PRODUCT)?;
conn.prepare("insert_empty_transport", QUERY_INSERT_EMPTY_TRANSPORT)?;
// Sammle alle Branches mit freien Transportmitteln und berechne Preisvorteil
let mut branches_with_vehicles: Vec<(i32, i32, i32, f64)> = Vec::new(); // (branch_id, region_id, vehicle_count, price_delta)
for branch_row in &branch_rows {
let target_region_id = branch_row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
let target_branch_id = branch_row
.get("branch_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
if target_region_id < 0 || target_branch_id < 0 {
continue;
}
// Prüfe auf freie Transportmittel in dieser Region
let vehicle_rows = conn.execute(
"get_free_vehicles_in_region",
&[&falukant_user_id, &target_region_id],
)?;
if vehicle_rows.is_empty() {
continue;
}
// Prüfe, ob eine Route zurück zum aktuellen Branch existiert
let vehicles = Self::get_transport_vehicles_for_route(
conn,
falukant_user_id,
target_region_id,
current_region_id,
)?;
if vehicles.is_empty() {
continue;
}
// Berechne Preisvorteil (vereinfacht: verwende worth_percent-Differenz)
// Hole worth_percent für beide Regionen (für ein beliebiges Produkt)
let mut price_delta = 0.0;
const QUERY_GET_AVERAGE_WORTH: &str = r#"
SELECT
AVG(CASE WHEN region_id = $1 THEN worth_percent ELSE NULL END) AS current_worth,
AVG(CASE WHEN region_id = $2 THEN worth_percent ELSE NULL END) AS target_worth
FROM falukant_data.town_product_worth
WHERE region_id IN ($1, $2);
"#;
conn.prepare("get_average_worth", QUERY_GET_AVERAGE_WORTH)?;
let worth_rows = conn.execute(
"get_average_worth",
&[&current_region_id, &target_region_id],
)?;
if let Some(worth_row) = worth_rows.into_iter().next() {
let current_worth = worth_row
.get("current_worth")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0);
let target_worth = worth_row
.get("target_worth")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0);
price_delta = target_worth - current_worth;
}
branches_with_vehicles.push((
target_branch_id,
target_region_id,
vehicles.len() as i32,
price_delta,
));
}
if branches_with_vehicles.is_empty() {
eprintln!(
"[DirectorWorker] Keine Branches mit freien Transportmitteln gefunden"
);
return Ok(());
}
// Wähle den Branch mit dem besten Preisvorteil (oder einfach den ersten, wenn alle gleich sind)
branches_with_vehicles.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap_or(std::cmp::Ordering::Equal));
let (target_branch_id, target_region_id, vehicle_count, price_delta) = branches_with_vehicles[0];
eprintln!(
"[DirectorWorker] Bester Branch für Fahrzeug-Rückholung: Branch {} (Region {}), {} Fahrzeuge, Preisvorteil: {:.2}%",
target_branch_id, target_region_id, vehicle_count, price_delta
);
// Hole die Fahrzeuge nochmal für diesen Branch
let vehicles = Self::get_transport_vehicles_for_route(
conn,
falukant_user_id,
target_region_id,
current_region_id,
)?;
// Leere Transporte für alle verfügbaren Fahrzeuge anlegen
let mut transport_count = 0;
for vehicle in &vehicles {
conn.execute(
"insert_empty_transport",
&[&target_region_id, &current_region_id, &vehicle.id],
)?;
transport_count += 1;
}
eprintln!(
"[DirectorWorker] {} leere Transporte geplant: Region {} -> Region {}",
transport_count, target_region_id, current_region_id
);
Ok(())
}
fn update_inventory_quantity(
conn: &mut DbConnection,
inventory_id: i32,
new_quantity: i32,
) -> Result<(), DbError> {
const QUERY_UPDATE_INVENTORY_QTY: &str = r#"
UPDATE falukant_data.inventory
SET quantity = $2
WHERE id = $1;
"#;
conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?;
conn.execute("update_inventory_qty", &[&inventory_id, &new_quantity])?;
Ok(())
}
fn pay_salary(&mut self) -> Result<(), DbError> {
self.base.set_current_step("DirectorWorker: pay_salary");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_salary_to_pay", QUERY_GET_SALARY_TO_PAY)?;
conn.prepare("set_salary_payed", QUERY_SET_SALARY_PAYED)?;
let rows = conn.execute("get_salary_to_pay", &[])?;
let salaries: Vec<SalaryItem> =
rows.into_iter().filter_map(Self::map_row_to_salary_item).collect();
for item in salaries {
if let Err(err) = self.base.change_falukant_user_money(
item.employer_user_id,
-(item.income as f64),
"director payed out",
) {
eprintln!(
"[DirectorWorker] Fehler bei change_falukant_user_money (director payed out): {err}"
);
}
conn.execute("set_salary_payed", &[&item.id])?;
let message =
format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, item.employer_user_id);
self.base.broker.publish(message);
}
Ok(())
}
fn map_row_to_salary_item(row: Row) -> Option<SalaryItem> {
Some(SalaryItem {
id: row.get("id")?.parse().ok()?,
employer_user_id: row.get("employer_user_id")?.parse().ok()?,
income: row.get("income")?.parse().ok()?,
})
}
fn calculate_satisfaction(&mut self) -> Result<(), DbError> {
self.base
.set_current_step("DirectorWorker: calculate_satisfaction");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("update_satisfaction", QUERY_UPDATE_SATISFACTION)?;
let rows = conn.execute("update_satisfaction", &[])?;
for row in rows {
if let Some(employer_id) = row
.get("employer_user_id")
.and_then(|v| v.parse::<i32>().ok())
{
let message = format!(
r#"{{"event":"directorchanged","user_id":{}}}"#,
employer_id
);
self.base.broker.publish(message);
}
}
Ok(())
}
}
impl Worker for DirectorWorker {
fn start_worker_thread(&mut self) {
let pool = self.base.pool.clone();
let broker = self.base.broker.clone();
self.base
.start_worker_with_loop(move |state: Arc<WorkerState>| {
let mut worker = DirectorWorker::new(pool.clone(), broker.clone());
while state.running_worker.load(Ordering::Relaxed) {
worker.run_iteration(&state);
}
});
}
fn stop_worker_thread(&mut self) {
self.base.stop_worker();
}
fn enable_watchdog(&mut self) {
self.base.start_watchdog();
}
}