Files
yourpart-daemon/src/worker/director.rs

1226 lines
44 KiB
Rust

use crate::db::{DbConnection, DbError, Row};
use std::collections::HashMap;
use crate::message_broker::MessageBroker;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::db::ConnectionPool;
use super::base::{BaseWorker, Worker, WorkerState, DEFAULT_TAX_PERCENT, DEFAULT_TREASURY_USER_ID};
use crate::worker::sql::{
QUERY_GET_DIRECTORS,
QUERY_GET_BEST_PRODUCTION,
QUERY_INSERT_PRODUCTION,
QUERY_GET_BRANCH_CAPACITY,
QUERY_GET_INVENTORY,
QUERY_REMOVE_INVENTORY,
QUERY_ADD_SELL_LOG,
QUERY_GET_REGION_WORTH_FOR_PRODUCT,
QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE,
QUERY_INSERT_TRANSPORT,
QUERY_INSERT_EMPTY_TRANSPORT,
QUERY_GET_USER_BRANCHES,
QUERY_GET_FREE_VEHICLES_IN_REGION,
QUERY_GET_SALARY_TO_PAY,
QUERY_SET_SALARY_PAYED,
QUERY_UPDATE_SATISFACTION,
QUERY_GET_DIRECTOR_USER,
QUERY_COUNT_VEHICLES_IN_BRANCH_REGION,
QUERY_COUNT_VEHICLES_IN_REGION,
QUERY_CHECK_ROUTE,
QUERY_GET_BRANCH_REGION,
QUERY_GET_AVERAGE_WORTH,
QUERY_UPDATE_INVENTORY_QTY,
QUERY_GET_PRODUCT_COST,
QUERY_GET_USER_OFFICES,
QUERY_CUMULATIVE_TAX_NO_EXEMPT,
QUERY_CUMULATIVE_TAX_WITH_EXEMPT,
};
#[derive(Debug, Clone)]
struct Director {
id: i32,
branch_id: i32,
may_produce: bool,
may_sell: bool,
may_start_transport: bool,
}
#[derive(Debug, Clone)]
struct ProductionPlan {
falukant_user_id: i32,
money: f64,
certificate: i32,
branch_id: i32,
product_id: i32,
region_id: i32,
stock_size: i32,
used_in_stock: i32,
running_productions: i32,
running_productions_quantity: i32,
}
#[derive(Debug, Clone)]
struct InventoryItem {
id: i32,
product_id: i32,
quantity: i32,
quality: i32,
sell_cost: f64,
user_id: i32,
region_id: i32,
branch_id: i32,
worth_percent: f64, // Regionaler worth_percent-Wert für die Preisberechnung
}
#[derive(Debug, Clone)]
struct SalaryItem {
id: i32,
employer_user_id: i32,
income: i32,
}
#[derive(Debug, Clone)]
struct TransportVehicle {
id: i32,
capacity: i32,
}
pub struct DirectorWorker {
base: BaseWorker,
last_run: Option<Instant>,
}
// Maximale Anzahl paralleler Produktionen pro Branch
const MAX_PARALLEL_PRODUCTIONS: i32 = 2;
// ...existing code...
// Verfügbare Transportmittel für eine Route (source_region -> target_region)
// ...existing code...
impl DirectorWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self {
base: BaseWorker::new("DirectorWorker", pool, broker),
last_run: None,
}
}
fn run_iteration(&mut self, _state: &WorkerState) {
self.base.set_current_step("DirectorWorker iteration");
let now = Instant::now();
let should_run = match self.last_run {
None => true,
Some(last) => now.saturating_duration_since(last) >= Duration::from_secs(60),
};
if should_run {
if let Err(err) = self.perform_all_tasks() {
eprintln!("[DirectorWorker] Fehler beim Ausführen der Aufgabe: {err}");
}
self.last_run = Some(now);
}
std::thread::sleep(Duration::from_secs(1));
}
fn perform_all_tasks(&mut self) -> Result<(), DbError> {
// Produktions-/Verkaufs-/Transportlogik für alle Direktoren
self.perform_task()?;
self.pay_salary()?;
self.calculate_satisfaction()?;
Ok(())
}
fn perform_task(&mut self) -> Result<(), DbError> {
self.base
.set_current_step("Get director actions from DB");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_directors", QUERY_GET_DIRECTORS)?;
let directors_rows = conn.execute("get_directors", &[])?;
let directors: Vec<Director> = directors_rows
.into_iter()
.filter_map(Self::map_row_to_director)
.collect();
if directors.is_empty() {
// keine Info-Logs
}
for director in directors {
if director.may_produce {
self.start_productions(&director)?;
}
if director.may_start_transport {
if let Err(_) = self.start_transports_stub(&director) {}
}
if director.may_sell {
eprintln!(
"[DirectorWorker] Starte Verkaufsprüfung für Director {} (branch_id={})",
director.id, director.branch_id
);
self.start_sellings(&director)?;
}
}
Ok(())
}
fn map_row_to_director(row: Row) -> Option<Director> {
Some(Director {
id: row.get("id")?.parse().ok()?,
branch_id: row.get("branch_id")?.parse().ok()?,
may_produce: row.get("may_produce").map(|v| v == "t" || v == "true").unwrap_or(false),
may_sell: row.get("may_sell").map(|v| v == "t" || v == "true").unwrap_or(false),
may_start_transport: row
.get("may_start_transport")
.map(|v| v == "t" || v == "true")
.unwrap_or(false),
})
}
fn start_productions(&mut self, director: &Director) -> Result<(), DbError> {
self.base
.set_current_step("DirectorWorker: start_productions");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Initial: Bestes Produkt für diesen Branch ermitteln
conn.prepare("get_to_produce", QUERY_GET_BEST_PRODUCTION)?;
let rows = conn.execute("get_to_produce", &[&director.id, &director.branch_id])?;
if rows.is_empty() {
return Ok(());
}
let mut base_plan = match rows.first().and_then(Self::map_row_to_production_plan) {
Some(p) => p,
None => return Ok(()),
};
// Query zum Abfragen der aktuellen Kapazitätswerte vorbereiten
conn.prepare("get_branch_capacity", QUERY_GET_BRANCH_CAPACITY)?;
// Schleife: Starte Produktionen, bis entweder die maximale Anzahl erreicht ist
// oder kein freier Lagerplatz mehr vorhanden ist
loop {
// Aktuelle Kapazitätswerte abfragen
let capacity_rows = conn.execute("get_branch_capacity", &[&director.branch_id])?;
if capacity_rows.is_empty() {
break;
}
let row = &capacity_rows[0];
let stock_size: i32 = row
.get("stock_size")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let used_in_stock: i32 = row
.get("used_in_stock")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions: i32 = row
.get("running_productions")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions_quantity: i32 = row
.get("running_productions_quantity")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
// Prüfen, ob noch Produktionen gestartet werden können
if running_productions >= MAX_PARALLEL_PRODUCTIONS {
break;
}
// Freie Kapazität berechnen
let free_capacity = stock_size - used_in_stock - running_productions_quantity;
if free_capacity <= 0 {
break;
}
// Plan mit aktuellen Werten aktualisieren
base_plan.stock_size = stock_size;
base_plan.used_in_stock = used_in_stock;
base_plan.running_productions = running_productions;
base_plan.running_productions_quantity = running_productions_quantity;
// Eine neue Produktion starten (max. 100 Stück)
if self.create_single_production(&mut conn, &base_plan).is_err() {
break;
}
}
Ok(())
}
fn map_row_to_production_plan(row: &Row) -> Option<ProductionPlan> {
// Pflichtfelder: ohne diese können wir keinen sinnvollen Plan erstellen.
let falukant_user_id: i32 = row.get("falukant_user_id")?.parse().ok()?;
let certificate: i32 = row.get("certificate")?.parse().ok()?;
let branch_id: i32 = row.get("branch_id")?.parse().ok()?;
let product_id: i32 = row.get("product_id")?.parse().ok()?;
// Optionale/abgeleitete Felder: hier sind wir tolerant und verwenden
// Default-Werte, falls NULL oder nicht parsbar.
let money: f64 = row
.get("money")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0);
let stock_size: i32 = row
.get("stock_size")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let used_in_stock: i32 = row
.get("used_in_stock")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions: i32 = row
.get("running_productions")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions_quantity: i32 = row
.get("running_productions_quantity")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let region_id: i32 = row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
Some(ProductionPlan {
falukant_user_id,
money,
certificate,
branch_id,
product_id,
region_id,
stock_size,
used_in_stock,
running_productions,
running_productions_quantity,
})
}
/// Startet eine einzelne Produktion (max. 100 Stück) basierend auf dem aktuellen Plan.
/// Die äußere Schleife in `start_productions` sorgt dafür, dass mehrere Produktionen
/// gestartet werden können, bis entweder die maximale Anzahl erreicht ist oder
/// kein freier Lagerplatz mehr vorhanden ist.
fn create_single_production(
&mut self,
conn: &mut DbConnection,
plan: &ProductionPlan,
) -> Result<(), DbError> {
// Hole aktuelle Kapazitätswerte direkt aus der DB, um Race Conditions zu vermeiden
let capacity_rows = conn.execute("get_branch_capacity", &[&plan.branch_id])?;
if capacity_rows.is_empty() {
return Ok(());
}
let row = &capacity_rows[0];
let stock_size: i32 = row
.get("stock_size")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let used_in_stock: i32 = row
.get("used_in_stock")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions_quantity: i32 = row
.get("running_productions_quantity")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
// Berechne freie Kapazität mit aktuellen Werten
let free_capacity = stock_size - used_in_stock - running_productions_quantity;
if free_capacity <= 0 {
return Ok(());
}
let one_piece_cost = Self::calc_one_piece_cost(plan);
let max_money_production = Self::calc_max_money_production(plan, one_piece_cost);
let to_produce = (free_capacity.min(max_money_production)).clamp(0, 100);
if to_produce < 1 {
return Ok(());
}
let production_cost = to_produce as f64 * one_piece_cost;
let _ = self.base.change_falukant_user_money(
plan.falukant_user_id,
-production_cost,
"director starts production",
);
conn.prepare("insert_production", QUERY_INSERT_PRODUCTION)?;
// Eine einzelne Produktion mit max. 100 Stück anlegen
// Das aktuelle Wetter der Region wird automatisch aus der weather-Tabelle geholt
conn.execute(
"insert_production",
&[&plan.branch_id, &plan.product_id, &to_produce, &plan.region_id],
)?;
let message = format!(
r#"{{"event":"production_started","branch_id":{}}}"#,
plan.branch_id
);
self.base.broker.publish(message);
Ok(())
}
fn calc_one_piece_cost(plan: &ProductionPlan) -> f64 {
(plan.certificate * 6) as f64
}
fn calc_max_money_production(plan: &ProductionPlan, one_piece_cost: f64) -> i32 {
if one_piece_cost > 0.0 {
if plan.money > 0.0 {
(plan.money / one_piece_cost).floor() as i32
} else {
i32::MAX
}
} else {
0
}
}
fn start_transports_stub(&mut self, director: &Director) -> Result<(), DbError> {
self.base
.set_current_step("DirectorWorker: start_transports");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_to_transport", QUERY_GET_INVENTORY)?;
let rows = conn.execute("get_to_transport", &[&director.id, &director.branch_id])?;
let mut items: Vec<InventoryItem> =
rows.into_iter().filter_map(Self::map_row_to_inventory_item).collect();
// Für alle Items dieses Directors sollten die user_id-Felder identisch
// sein (Arbeitgeber des Directors).
let falukant_user_id = if items.is_empty() {
// Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln
conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?;
let user_rows = conn.execute("get_director_user", &[&director.id])?;
user_rows
.into_iter()
.next()
.and_then(|row| row.get("employer_user_id").and_then(|v| v.parse::<i32>().ok()))
.ok_or_else(|| DbError::new("Konnte employer_user_id nicht ermitteln"))?
} else {
items[0].user_id
};
// Prüfe, ob Transportmittel im aktuellen Branch vorhanden sind
// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert
conn.prepare("count_vehicles_in_branch", QUERY_COUNT_VEHICLES_IN_BRANCH_REGION)?;
let vehicle_count_rows = conn.execute(
"count_vehicles_in_branch",
&[&falukant_user_id, &director.branch_id],
)?;
let vehicles_in_branch = vehicle_count_rows
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0);
if items.is_empty() {
if vehicles_in_branch == 0 {
let _ = self.plan_empty_transports_for_vehicle_retrieval(
&mut conn,
falukant_user_id,
director.branch_id,
);
}
return Ok(());
}
if vehicles_in_branch == 0 && !items.is_empty() {
let _ = self.plan_empty_transports_for_vehicle_retrieval(
&mut conn,
falukant_user_id,
director.branch_id,
);
let vehicle_count_rows_after = conn.execute(
"count_vehicles_in_branch",
&[&falukant_user_id, &director.branch_id],
)?;
let vehicles_in_branch_after = vehicle_count_rows_after
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0);
if vehicles_in_branch_after == 0 {
return Ok(());
}
}
// Lohnende Transporte planen. Dabei werden:
// - ggf. Transport-Einträge erzeugt
// - Inventar-Mengen reduziert
for item in items.iter_mut() {
let _ = self.plan_transports_for_item(
&mut conn,
falukant_user_id,
item,
)?;
}
// Nach normalen Transporten: Wenn keine Transportmittel mehr im Branch vorhanden sind,
// aber bessere Verkaufspreise in anderen Branches existieren, plane leere Transporte
let vehicle_count_rows_final = conn.execute(
"count_vehicles_in_branch",
&[&falukant_user_id, &director.branch_id],
)?;
let vehicles_in_branch_final = vehicle_count_rows_final
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0);
if vehicles_in_branch_final == 0 {
let _ = self.plan_empty_transports_for_vehicle_retrieval(
&mut conn,
falukant_user_id,
director.branch_id,
);
}
Ok(())
}
fn start_sellings(&mut self, director: &Director) -> Result<(), DbError> {
self.base
.set_current_step("DirectorWorker: start_sellings");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_to_sell", QUERY_GET_INVENTORY)?;
let rows = conn.execute("get_to_sell", &[&director.id, &director.branch_id])?;
let mut items: Vec<InventoryItem> =
rows.into_iter().filter_map(Self::map_row_to_inventory_item).collect();
conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?;
conn.prepare("add_sell_log", QUERY_ADD_SELL_LOG)?;
if items.is_empty() {
eprintln!(
"[DirectorWorker] Verkaufsprüfung Director {} (branch_id={}): nichts zu verkaufen",
director.id, director.branch_id
);
return Ok(());
}
eprintln!(
"[DirectorWorker] Verkaufsprüfung Director {} (branch_id={}): {} Inventar-Items gefunden",
director.id, director.branch_id, items.len()
);
// Für alle Items dieses Directors sollten die user_id-Felder identisch
// sein (Arbeitgeber des Directors).
let falukant_user_id = items[0].user_id;
// Vor dem eigentlichen Verkauf versucht der Director, lohnende
// Transporte zu planen. Dabei werden:
// - ggf. Transport-Einträge erzeugt
// - Inventar-Mengen reduziert (geschieht bereits in plan_transports_for_item)
// Die zurückgegebenen Mengen werden dann lokal verkauft.
for item in items.iter_mut() {
let _shipped = self.plan_transports_for_item(
&mut conn,
falukant_user_id,
item,
)?;
// Inventar wird bereits in plan_transports_for_item reduziert
// item.quantity wurde dort bereits aktualisiert
}
// Anschließend lokale Verkäufe für die verbleibenden Mengen durchführen.
for item in items.drain(..) {
if item.quantity > 0 {
self.sell_single_inventory_item(&mut conn, &item)?;
} else {
// Falls die Menge auf 0 gesetzt wurde, das Inventar ggf. aufräumen.
conn.execute("remove_inventory", &[&item.id])?;
}
}
Ok(())
}
fn map_row_to_inventory_item(row: Row) -> Option<InventoryItem> {
Some(InventoryItem {
id: row.get("id")?.parse().ok()?,
product_id: row.get("product_id")?.parse().ok()?,
quantity: row.get("quantity")?.parse().ok()?,
quality: row.get("quality")?.parse().ok()?,
sell_cost: row.get("sell_cost")?.parse().ok()?,
user_id: row.get("user_id")?.parse().ok()?,
region_id: row.get("region_id")?.parse().ok()?,
branch_id: row.get("branch_id")?.parse().ok()?,
worth_percent: row
.get("worth_percent")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0),
})
}
// Helper: compute piece sell price from item fields
fn compute_piece_sell_price(item: &InventoryItem) -> f64 {
let base_price = item.sell_cost * (item.worth_percent / 100.0);
let min_price = base_price * 0.6;
let max_price = base_price;
let knowledge_factor = item.quality as f64;
min_price + (max_price - min_price) * (knowledge_factor / 100.0)
}
// Helper: get one_piece_cost from DB row fallback logic
fn resolve_one_piece_cost(conn: &mut DbConnection, product_id: i32, fallback: f64) -> Result<f64, DbError> {
conn.prepare("get_product_cost", QUERY_GET_PRODUCT_COST)?;
let rows = conn.execute("get_product_cost", &[&product_id])?;
if let Some(row) = rows.first()
&& let Some(sc) = row.get("sell_cost")
&& let Ok(v) = sc.parse::<f64>()
{
return Ok(v);
}
Ok(fallback)
}
// Helper: determine cumulative tax percent for a branch/user
fn get_cumulative_tax_percent(conn: &mut DbConnection, branch_id: i32, user_id: i32) -> Result<f64, DbError> {
// Default
let mut cumulative_tax_percent = DEFAULT_TAX_PERCENT;
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)
.map_err(|e| DbError::new(format!("[DirectorWorker] prepare get_branch_region: {e}")))?;
let branch_rows = conn.execute("get_branch_region", &[&branch_id])
.map_err(|e| DbError::new(format!("[DirectorWorker] exec get_branch_region branch_id={}: {e}", branch_id)))?;
let branch_region_id: Option<i32> = branch_rows.first().and_then(|r| r.get("region_id")).and_then(|v| v.parse().ok());
if let Some(region_id) = branch_region_id {
conn.prepare("get_user_offices", QUERY_GET_USER_OFFICES)
.map_err(|e| DbError::new(format!("[DirectorWorker] prepare get_user_offices: {e}")))?;
let offices = conn.execute("get_user_offices", &[&user_id])
.map_err(|e| DbError::new(format!("[DirectorWorker] exec get_user_offices user_id={}: {e}", user_id)))?;
let mut exempt_types: Vec<String> = Vec::new();
let mut has_chancellor = false;
for row in &offices {
if let Some(name) = row.get("office_name") {
match name.as_str() {
"chancellor" => { has_chancellor = true; break; }
"council" => { exempt_types.push("city".to_string()); }
"taxman" => { exempt_types.extend(["city","county"].into_iter().map(String::from)); }
"treasurerer" => { exempt_types.extend(["city","county","shire"].into_iter().map(String::from)); }
"super-state-administrator" => { exempt_types.extend(["city","county","shire","markgrave","duchy"].into_iter().map(String::from)); }
_ => {}
}
}
}
if has_chancellor {
return Ok(0.0);
}
if exempt_types.is_empty() {
conn.prepare("cumulative_tax_no_exempt", QUERY_CUMULATIVE_TAX_NO_EXEMPT)
.map_err(|e| DbError::new(format!("[DirectorWorker] prepare cumulative_tax_no_exempt: {e}")))?;
let res = conn.execute("cumulative_tax_no_exempt", &[&region_id])
.map_err(|e| DbError::new(format!("[DirectorWorker] exec cumulative_tax_no_exempt region_id={}: {e}", region_id)))?;
if let Some(row) = res.first()
&& let Some(tp) = row.get("total_percent")
{
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
}
} else {
conn.prepare("cumulative_tax_with_exempt", QUERY_CUMULATIVE_TAX_WITH_EXEMPT)
.map_err(|e| DbError::new(format!("[DirectorWorker] prepare cumulative_tax_with_exempt: {e}")))?;
let exempt_array: Vec<&str> = exempt_types.iter().map(|s| s.as_str()).collect();
let res = conn.execute("cumulative_tax_with_exempt", &[&region_id, &exempt_array])
.map_err(|e| DbError::new(format!("[DirectorWorker] exec cumulative_tax_with_exempt region_id={} exempt={:?}: {}", region_id, exempt_array, e)))?;
if let Some(row) = res.first() && let Some(tp) = row.get("total_percent") {
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
}
}
}
Ok(cumulative_tax_percent)
}
fn sell_single_inventory_item(
&mut self,
conn: &mut DbConnection,
item: &InventoryItem,
) -> Result<(), DbError> {
if item.quantity <= 0 {
conn.execute("remove_inventory", &[&item.id])?;
return Ok(());
}
// compute piece price and full sell price
let piece_price = Self::compute_piece_sell_price(item);
let sell_price = piece_price * item.quantity as f64;
let one_piece_cost = Self::resolve_one_piece_cost(conn, item.product_id, item.sell_cost)?;
let cumulative_tax_percent = Self::get_cumulative_tax_percent(conn, item.branch_id, item.user_id)?;
let revenue_cents = (sell_price * 100.0).round() as i64;
let cost_cents = (one_piece_cost * item.quantity as f64 * 100.0).round() as i64;
let profit_cents = (revenue_cents - cost_cents).max(0);
let tax_cents = ((profit_cents as f64) * cumulative_tax_percent / 100.0).round() as i64;
let payout_cents = revenue_cents - tax_cents;
if tax_cents > 0 {
let tax_amount = (tax_cents as f64) / 100.0;
let _ = self.base.change_falukant_user_money(DEFAULT_TREASURY_USER_ID, tax_amount, &format!("tax from sale product {}", item.product_id));
}
let payout_amount = (payout_cents as f64) / 100.0;
if payout_cents != 0 {
let _ = self.base.change_falukant_user_money(item.user_id, payout_amount, "sell products");
}
eprintln!(
"[DirectorWorker] Verkauf: branch_id={}, product_id={}, quantity={}, revenue={:.2}, tax%={:.2}, payout={:.2}, user_id={}",
item.branch_id,
item.product_id,
item.quantity,
sell_price,
cumulative_tax_percent,
payout_amount,
item.user_id
);
conn.execute(
"add_sell_log",
&[
&item.region_id,
&item.product_id,
&item.quantity,
&item.user_id,
],
)?;
conn.execute("remove_inventory", &[&item.id])?;
let message = format!(
r#"{{"event":"selled_items","branch_id":{}}}"#,
item.branch_id
);
self.base.broker.publish(message);
Ok(())
}
/// Plant ggf. Transporte für ein einzelnes Inventar-Item und gibt die
/// Menge zurück, die tatsächlich in Transporte umgewandelt wurde.
///
/// Logik:
/// - Ermittle regionale "worth_percent"-Werte für das Produkt in allen
/// Branch-Regionen des Users.
/// - Berechne lokalen Stückpreis (inkl. Qualität) und für jede andere
/// Region einen potentiellen Stückpreis.
/// - Prüfe für jede Zielregion:
/// * Gibt es verfügbare Transportmittel für diese Route?
/// * Ist der Mehrerlös (deltaPrice * Menge) größer als die
/// Transportkosten (max(1, totalValue * 0.01))?
/// - Wähle die Zielregion mit dem größten positiven Nettogewinn und
/// erzeuge entsprechende Transporte (begrenzt durch Fahrzeugkapazität).
fn plan_transports_for_item(
&mut self,
conn: &mut DbConnection,
falukant_user_id: i32,
item: &mut InventoryItem,
) -> Result<i32, DbError> {
// Sicherheitscheck
if item.quantity <= 0 {
return Ok(0);
}
// Load worth_percent by region for this product
let worth_by_region = Self::get_worth_by_region(conn, falukant_user_id, item.product_id)?;
if worth_by_region.is_empty() {
return Ok(0);
}
let local_percent = worth_by_region.get(&item.region_id).copied().unwrap_or(100.0);
let local_piece_price = Self::compute_piece_price_for_percent(item, local_percent);
let mut best_target_region: Option<i32> = None;
let mut best_quantity: i32 = 0;
let mut _best_remote_piece_price: f64 = 0.0;
let mut best_gain: f64 = 0.0;
// Für jede andere Region prüfen, ob sich ein Transport lohnt.
for (&region_id, &remote_percent) in &worth_by_region {
if region_id == item.region_id {
continue;
}
let remote_piece_price = Self::compute_piece_price_for_percent(item, remote_percent);
let delta_per_unit = remote_piece_price - local_piece_price;
if delta_per_unit <= 0.0 {
continue;
}
// Verfügbare Transportmittel für diese Route abfragen
let vehicles = Self::get_transport_vehicles_for_route(
conn,
falukant_user_id,
item.region_id,
region_id,
)?;
if vehicles.is_empty() {
continue;
}
// Maximale transportierbare Menge anhand der Kapazität ermitteln
let max_capacity = Self::calc_max_capacity(&vehicles);
if max_capacity <= 0 {
continue;
}
let qty = std::cmp::min(item.quantity, max_capacity);
if qty <= 0 {
continue;
}
let extra_revenue = delta_per_unit * qty as f64;
let transport_cost = Self::calc_transport_cost(remote_piece_price, qty);
let net_gain = extra_revenue - transport_cost;
if net_gain <= 0.0 {
continue;
}
if net_gain > best_gain {
best_gain = net_gain;
best_target_region = Some(region_id);
best_quantity = qty;
_best_remote_piece_price = remote_piece_price;
}
}
// Kein lohnender Transport gefunden
let target_region = match best_target_region {
Some(r) => r,
None => return Ok(0),
};
if best_quantity <= 0 {
return Ok(0);
}
// Build and insert transports for chosen route
let shipped = Self::insert_transports_for_route(conn, item, target_region, best_quantity)?;
// Inventar sofort reduzieren, nachdem Transporte erfolgreich angelegt wurden
// Dies stellt sicher, dass Inventar und Transporte immer konsistent sind
if shipped > 0 {
if shipped >= item.quantity {
// Alles wurde in Transporte umgewandelt, Inventar komplett entfernen
conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?;
conn.execute("remove_inventory", &[&item.id])?;
item.quantity = 0;
} else {
// Inventar-Menge in der DB reduzieren und im Item anpassen
let remaining_quantity = item.quantity - shipped;
Self::update_inventory_quantity(conn, item.id, remaining_quantity)?;
item.quantity = remaining_quantity;
}
}
Ok(shipped)
}
fn get_transport_vehicles_for_route(
conn: &mut DbConnection,
falukant_user_id: i32,
source_region: i32,
target_region: i32,
) -> Result<Vec<TransportVehicle>, DbError> {
// Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren
conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?;
let vehicle_count_rows = conn.execute(
"count_vehicles_in_region",
&[&falukant_user_id, &source_region],
)?;
let _vehicle_count = vehicle_count_rows
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0);
// Debug: Prüfe, ob eine Route existiert
conn.prepare("check_route", QUERY_CHECK_ROUTE)?;
let route_rows = conn.execute(
"check_route",
&[&source_region, &target_region],
)?;
let _route_exists = route_rows
.into_iter()
.next()
.and_then(|row| row.get("count").and_then(|v| v.parse::<i32>().ok()))
.unwrap_or(0) > 0;
conn.prepare(
"get_transport_vehicles_for_route",
QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE,
)?;
let rows = conn.execute(
"get_transport_vehicles_for_route",
&[&falukant_user_id, &source_region, &target_region],
)?;
let mut result = Vec::with_capacity(rows.len());
for row in rows {
let id = row
.get("vehicle_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
let capacity = row
.get("capacity")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
if id >= 0 && capacity > 0 {
result.push(TransportVehicle { id, capacity });
}
}
Ok(result)
}
// Helper: load worth_percent values for a product across all regions of a user's branches
fn get_worth_by_region(conn: &mut DbConnection, falukant_user_id: i32, product_id: i32) -> Result<HashMap<i32,f64>, DbError> {
conn.prepare("get_region_worth_for_product", QUERY_GET_REGION_WORTH_FOR_PRODUCT)?;
let rows = conn.execute("get_region_worth_for_product", &[&falukant_user_id, &product_id])?;
let mut map = HashMap::new();
for row in rows {
if let Some(rid) = row.get("region_id").and_then(|v| v.parse::<i32>().ok()) {
let percent = row.get("worth_percent").and_then(|v| v.parse::<f64>().ok()).unwrap_or(100.0);
map.insert(rid, percent);
}
}
Ok(map)
}
// Helper: compute piece price for an arbitrary worth_percent
fn compute_piece_price_for_percent(item: &InventoryItem, percent: f64) -> f64 {
let base_price = item.sell_cost * (percent / 100.0);
let min_price = base_price * 0.6;
let max_price = base_price;
let knowledge_factor = item.quality as f64;
min_price + (max_price - min_price) * (knowledge_factor / 100.0)
}
fn calc_max_capacity(vehicles: &[TransportVehicle]) -> i32 {
vehicles.iter().fold(0i32, |acc, v| acc.saturating_add(v.capacity))
}
fn calc_transport_cost(remote_piece_price: f64, qty: i32) -> f64 {
let total_value = remote_piece_price * qty as f64;
let transport_cost = total_value * 0.01_f64;
if transport_cost < 0.01 { 0.01 } else { transport_cost }
}
fn insert_transports_for_route(conn: &mut DbConnection, item: &InventoryItem, target_region: i32, desired: i32) -> Result<i32, DbError> {
let vehicles = Self::get_transport_vehicles_for_route(conn, item.user_id, item.region_id, target_region)?;
if vehicles.is_empty() { return Ok(0); }
conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?;
let mut remaining = desired;
for v in &vehicles {
if remaining <= 0 { break; }
let size = std::cmp::min(remaining, v.capacity);
if size <= 0 { continue; }
conn.execute("insert_transport", &[&item.region_id, &target_region, &item.product_id, &size, &v.id])?;
remaining -= size;
}
Ok(desired - remaining.max(0))
}
/// Plant leere Transporte, um Fahrzeuge zurückzuholen, wenn:
/// - Keine Transportmittel im aktuellen Branch vorhanden sind
/// - Aber bessere Verkaufspreise in anderen Branches existieren
/// - Freie Transportmittel in anderen Regionen verfügbar sind
fn plan_empty_transports_for_vehicle_retrieval(
&mut self,
conn: &mut DbConnection,
falukant_user_id: i32,
current_branch_id: i32,
) -> Result<(), DbError> {
// Aktuelle Branch-Region ermitteln
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
let branch_rows = conn.execute("get_branch_region", &[&current_branch_id])?;
let current_region_id = match branch_rows.into_iter().next() {
Some(row) => row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.ok_or_else(|| DbError::new("Konnte region_id nicht ermitteln"))?,
None => return Ok(()), // Branch nicht gefunden, nichts zu tun
};
// Alle anderen Branches des Users finden
conn.prepare("get_user_branches", QUERY_GET_USER_BRANCHES)?;
let branch_rows = conn.execute(
"get_user_branches",
&[&falukant_user_id, &current_region_id],
)?;
if branch_rows.is_empty() {
return Ok(());
}
// Für jeden anderen Branch prüfen, ob freie Transportmittel verfügbar sind
// und ob bessere Verkaufspreise existieren (zur Priorisierung)
conn.prepare("get_free_vehicles_in_region", QUERY_GET_FREE_VEHICLES_IN_REGION)?;
conn.prepare("get_region_worth_for_product", QUERY_GET_REGION_WORTH_FOR_PRODUCT)?;
conn.prepare("insert_empty_transport", QUERY_INSERT_EMPTY_TRANSPORT)?;
// Sammle alle Branches mit freien Transportmitteln und berechne Preisvorteil
let mut branches_with_vehicles: Vec<(i32, i32, i32, f64)> = Vec::new(); // (branch_id, region_id, vehicle_count, price_delta)
for branch_row in &branch_rows {
let target_region_id = branch_row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
let target_branch_id = branch_row
.get("branch_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
if target_region_id < 0 || target_branch_id < 0 {
continue;
}
// Prüfe auf freie Transportmittel in dieser Region
let vehicle_rows = conn.execute(
"get_free_vehicles_in_region",
&[&falukant_user_id, &target_region_id],
)?;
if vehicle_rows.is_empty() {
continue;
}
// Prüfe, ob eine Route zurück zum aktuellen Branch existiert
let vehicles = Self::get_transport_vehicles_for_route(
conn,
falukant_user_id,
target_region_id,
current_region_id,
)?;
if vehicles.is_empty() {
continue;
}
// Berechne Preisvorteil (vereinfacht: verwende worth_percent-Differenz)
// Hole worth_percent für beide Regionen (für ein beliebiges Produkt)
let mut price_delta = 0.0;
conn.prepare("get_average_worth", QUERY_GET_AVERAGE_WORTH)?;
let worth_rows = conn.execute(
"get_average_worth",
&[&current_region_id, &target_region_id],
)?;
if let Some(worth_row) = worth_rows.into_iter().next() {
let current_worth = worth_row
.get("current_worth")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0);
let target_worth = worth_row
.get("target_worth")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0);
price_delta = target_worth - current_worth;
}
branches_with_vehicles.push((
target_branch_id,
target_region_id,
vehicles.len() as i32,
price_delta,
));
}
if branches_with_vehicles.is_empty() {
return Ok(());
}
// Wähle den Branch mit dem besten Preisvorteil (oder einfach den ersten, wenn alle gleich sind)
branches_with_vehicles.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap_or(std::cmp::Ordering::Equal));
let (_target_branch_id, target_region_id, _vehicle_count, _price_delta) = branches_with_vehicles[0];
// Hole die Fahrzeuge nochmal für diesen Branch
let vehicles = Self::get_transport_vehicles_for_route(
conn,
falukant_user_id,
target_region_id,
current_region_id,
)?;
// Leere Transporte für alle verfügbaren Fahrzeuge anlegen
let mut _transport_count = 0;
for vehicle in &vehicles {
conn.execute(
"insert_empty_transport",
&[&target_region_id, &current_region_id, &vehicle.id],
)?;
_transport_count += 1;
}
Ok(())
}
fn update_inventory_quantity(
conn: &mut DbConnection,
inventory_id: i32,
new_quantity: i32,
) -> Result<(), DbError> {
conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?;
// SQL: UPDATE inventory SET quantity = $1 WHERE id = $2
conn.execute("update_inventory_qty", &[&new_quantity, &inventory_id])?;
Ok(())
}
fn pay_salary(&mut self) -> Result<(), DbError> {
self.base.set_current_step("DirectorWorker: pay_salary");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("get_salary_to_pay", QUERY_GET_SALARY_TO_PAY)?;
conn.prepare("set_salary_payed", QUERY_SET_SALARY_PAYED)?;
let rows = conn.execute("get_salary_to_pay", &[])?;
let salaries: Vec<SalaryItem> =
rows.into_iter().filter_map(Self::map_row_to_salary_item).collect();
for item in salaries {
if let Err(_err) = self.base.change_falukant_user_money(
item.employer_user_id,
-(item.income as f64),
"director payed out",
) {
}
conn.execute("set_salary_payed", &[&item.id])?;
let message =
format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, item.employer_user_id);
self.base.broker.publish(message);
}
Ok(())
}
fn map_row_to_salary_item(row: Row) -> Option<SalaryItem> {
Some(SalaryItem {
id: row.get("id")?.parse().ok()?,
employer_user_id: row.get("employer_user_id")?.parse().ok()?,
income: row.get("income")?.parse().ok()?,
})
}
fn calculate_satisfaction(&mut self) -> Result<(), DbError> {
self.base
.set_current_step("DirectorWorker: calculate_satisfaction");
let mut conn = self
.base
.pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("update_satisfaction", QUERY_UPDATE_SATISFACTION)?;
let rows = conn.execute("update_satisfaction", &[])?;
for row in rows {
if let Some(employer_id) = row
.get("employer_user_id")
.and_then(|v| v.parse::<i32>().ok())
{
let message = format!(
r#"{{"event":"directorchanged","user_id":{}}}"#,
employer_id
);
self.base.broker.publish(message);
}
}
Ok(())
}
}
impl Worker for DirectorWorker {
fn start_worker_thread(&mut self) {
let pool = self.base.pool.clone();
let broker = self.base.broker.clone();
self.base
.start_worker_with_loop(move |state: Arc<WorkerState>| {
let mut worker = DirectorWorker::new(pool.clone(), broker.clone());
while state.running_worker.load(Ordering::Relaxed) {
worker.run_iteration(&state);
}
});
}
fn stop_worker_thread(&mut self) {
self.base.stop_worker();
}
fn enable_watchdog(&mut self) {
self.base.start_watchdog();
}
}