1058 lines
34 KiB
Rust
1058 lines
34 KiB
Rust
use crate::db::{DbConnection, DbError, Row};
|
|
use std::collections::HashMap;
|
|
use crate::message_broker::MessageBroker;
|
|
use std::sync::atomic::Ordering;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use crate::db::ConnectionPool;
|
|
use super::base::{BaseWorker, Worker, WorkerState};
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct Director {
|
|
id: i32,
|
|
branch_id: i32,
|
|
may_produce: bool,
|
|
may_sell: bool,
|
|
may_start_transport: bool,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct ProductionPlan {
|
|
falukant_user_id: i32,
|
|
money: f64,
|
|
certificate: i32,
|
|
branch_id: i32,
|
|
product_id: i32,
|
|
stock_size: i32,
|
|
used_in_stock: i32,
|
|
running_productions: i32,
|
|
running_productions_quantity: i32,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct InventoryItem {
|
|
id: i32,
|
|
product_id: i32,
|
|
quantity: i32,
|
|
quality: i32,
|
|
sell_cost: f64,
|
|
user_id: i32,
|
|
region_id: i32,
|
|
branch_id: i32,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct SalaryItem {
|
|
id: i32,
|
|
employer_user_id: i32,
|
|
income: i32,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct TransportVehicle {
|
|
id: i32,
|
|
capacity: i32,
|
|
}
|
|
|
|
pub struct DirectorWorker {
|
|
base: BaseWorker,
|
|
last_run: Option<Instant>,
|
|
}
|
|
|
|
// SQL-Queries (1:1 aus director_worker.h)
|
|
const QUERY_GET_DIRECTORS: &str = r#"
|
|
SELECT
|
|
d.may_produce,
|
|
d.may_sell,
|
|
d.may_start_transport,
|
|
b.id AS branch_id,
|
|
fu.id AS falukantUserId,
|
|
d.id
|
|
FROM falukant_data.director d
|
|
JOIN falukant_data.falukant_user fu
|
|
ON fu.id = d.employer_user_id
|
|
JOIN falukant_data.character c
|
|
ON c.id = d.director_character_id
|
|
JOIN falukant_data.branch b
|
|
ON b.region_id = c.region_id
|
|
AND b.falukant_user_id = fu.id
|
|
WHERE current_time BETWEEN '08:00:00' AND '17:00:00';
|
|
"#;
|
|
|
|
const QUERY_GET_BEST_PRODUCTION: &str = r#"
|
|
SELECT
|
|
fdu.id falukant_user_id,
|
|
-- Geld explizit als Text casten, damit das Mapping im Rust-Code
|
|
-- zuverlässig funktioniert (unabhängig vom nativen DB-Typ wie `money`).
|
|
CAST(fdu.money AS text) AS money,
|
|
fdu.certificate,
|
|
ftp.id product_id,
|
|
ftp.label_tr,
|
|
(
|
|
SELECT SUM(quantity)
|
|
FROM falukant_data.stock fds
|
|
WHERE fds.branch_id = fdb.id
|
|
) AS stock_size,
|
|
COALESCE((
|
|
SELECT SUM(COALESCE(fdi.quantity, 0))
|
|
FROM falukant_data.stock fds
|
|
JOIN falukant_data.inventory fdi
|
|
ON fdi.stock_id = fds.id
|
|
WHERE fds.branch_id = fdb.id
|
|
), 0) AS used_in_stock,
|
|
(ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category)
|
|
/ (300.0 * ftp.production_time) AS worth,
|
|
fdb.id AS branch_id,
|
|
(
|
|
SELECT COUNT(id)
|
|
FROM falukant_data.production
|
|
WHERE branch_id = fdb.id
|
|
) AS running_productions,
|
|
COALESCE((
|
|
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
|
|
FROM falukant_data.production fdp
|
|
WHERE fdp.branch_id = fdb.id
|
|
), 0) AS running_productions_quantity
|
|
FROM falukant_data.director fdd
|
|
JOIN falukant_data.character fdc
|
|
ON fdc.id = fdd.director_character_id
|
|
JOIN falukant_data.falukant_user fdu
|
|
ON fdd.employer_user_id = fdu.id
|
|
JOIN falukant_data.character user_character
|
|
ON user_character.user_id = fdu.id
|
|
JOIN falukant_data.branch fdb
|
|
ON fdb.falukant_user_id = fdu.id
|
|
AND fdb.region_id = fdc.region_id
|
|
JOIN falukant_data.town_product_worth fdtpw
|
|
ON fdtpw.region_id = fdb.region_id
|
|
JOIN falukant_data.knowledge fdk_character
|
|
ON fdk_character.product_id = fdtpw.product_id
|
|
AND fdk_character.character_id = user_character.id
|
|
JOIN falukant_data.knowledge fdk_director
|
|
ON fdk_director.product_id = fdtpw.product_id
|
|
AND fdk_director.character_id = fdd.director_character_id
|
|
JOIN falukant_type.product ftp
|
|
ON ftp.id = fdtpw.product_id
|
|
AND ftp.category <= fdu.certificate
|
|
WHERE fdd.id = $1
|
|
AND fdb.id = $2
|
|
ORDER BY worth DESC
|
|
LIMIT 1;
|
|
"#;
|
|
|
|
const QUERY_INSERT_PRODUCTION: &str = r#"
|
|
INSERT INTO falukant_data.production (branch_id, product_id, quantity)
|
|
VALUES ($1, $2, $3);
|
|
"#;
|
|
|
|
const QUERY_GET_INVENTORY: &str = r#"
|
|
SELECT
|
|
i.id,
|
|
i.product_id,
|
|
i.quantity,
|
|
i.quality,
|
|
p.sell_cost,
|
|
fu.id AS user_id,
|
|
b.region_id,
|
|
b.id AS branch_id
|
|
FROM falukant_data.inventory i
|
|
JOIN falukant_data.stock s
|
|
ON s.id = i.stock_id
|
|
JOIN falukant_data.branch b
|
|
ON b.id = s.branch_id
|
|
JOIN falukant_data.falukant_user fu
|
|
ON fu.id = b.falukant_user_id
|
|
JOIN falukant_data.director d
|
|
ON d.employer_user_id = fu.id
|
|
JOIN falukant_type.product p
|
|
ON p.id = i.product_id
|
|
WHERE d.id = $1
|
|
AND b.id = $2;
|
|
"#;
|
|
|
|
const QUERY_REMOVE_INVENTORY: &str = r#"
|
|
DELETE FROM falukant_data.inventory
|
|
WHERE id = $1;
|
|
"#;
|
|
|
|
const QUERY_ADD_SELL_LOG: &str = r#"
|
|
INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id)
|
|
VALUES ($1, $2, $3, $4)
|
|
ON CONFLICT (region_id, product_id, seller_id)
|
|
DO UPDATE
|
|
SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity;
|
|
"#;
|
|
|
|
// Regionale Verkaufswürdigkeit pro Produkt/Region für alle Branches eines Users
|
|
const QUERY_GET_REGION_WORTH_FOR_PRODUCT: &str = r#"
|
|
SELECT
|
|
tpw.region_id,
|
|
tpw.product_id,
|
|
tpw.worth_percent
|
|
FROM falukant_data.town_product_worth tpw
|
|
JOIN falukant_data.branch b
|
|
ON b.region_id = tpw.region_id
|
|
WHERE b.falukant_user_id = $1
|
|
AND tpw.product_id = $2;
|
|
"#;
|
|
|
|
// Verfügbare Transportmittel für eine Route (source_region -> target_region)
|
|
const QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE: &str = r#"
|
|
SELECT
|
|
v.id AS vehicle_id,
|
|
vt.capacity AS capacity
|
|
FROM falukant_data.vehicle v
|
|
JOIN falukant_type.vehicle vt
|
|
ON vt.id = v.vehicle_type_id
|
|
JOIN falukant_data.region_distance rd
|
|
ON (
|
|
(rd.source_region_id = v.region_id AND rd.target_region_id = $3)
|
|
OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id)
|
|
)
|
|
AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
|
|
WHERE v.falukant_user_id = $1
|
|
AND v.region_id = $2;
|
|
"#;
|
|
|
|
// Transport-Eintrag anlegen
|
|
const QUERY_INSERT_TRANSPORT: &str = r#"
|
|
INSERT INTO falukant_data.transport
|
|
(source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at)
|
|
VALUES ($1, $2, $3, $4, $5, NOW(), NOW());
|
|
"#;
|
|
|
|
const QUERY_GET_SALARY_TO_PAY: &str = r#"
|
|
SELECT d.id, d.employer_user_id, d.income
|
|
FROM falukant_data.director d
|
|
WHERE DATE(d.last_salary_payout) < DATE(NOW());
|
|
"#;
|
|
|
|
const QUERY_SET_SALARY_PAYED: &str = r#"
|
|
UPDATE falukant_data.director
|
|
SET last_salary_payout = NOW()
|
|
WHERE id = $1;
|
|
"#;
|
|
|
|
const QUERY_UPDATE_SATISFACTION: &str = r#"
|
|
WITH new_sats AS (
|
|
SELECT
|
|
d.id,
|
|
ROUND(
|
|
d.income::numeric
|
|
/
|
|
(
|
|
c.title_of_nobility
|
|
* POWER(1.231, AVG(k.knowledge) / 1.5)
|
|
)
|
|
* 100
|
|
) AS new_satisfaction
|
|
FROM falukant_data.director d
|
|
JOIN falukant_data.knowledge k
|
|
ON d.director_character_id = k.character_id
|
|
JOIN falukant_data.character c
|
|
ON c.id = d.director_character_id
|
|
GROUP BY d.id, c.title_of_nobility, d.income
|
|
)
|
|
UPDATE falukant_data.director dir
|
|
SET satisfaction = ns.new_satisfaction
|
|
FROM new_sats ns
|
|
WHERE dir.id = ns.id
|
|
AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction
|
|
RETURNING dir.employer_user_id;
|
|
"#;
|
|
|
|
impl DirectorWorker {
|
|
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
|
|
Self {
|
|
base: BaseWorker::new("DirectorWorker", pool, broker),
|
|
last_run: None,
|
|
}
|
|
}
|
|
|
|
fn run_iteration(&mut self, state: &WorkerState) {
|
|
self.base.set_current_step("DirectorWorker iteration");
|
|
|
|
let now = Instant::now();
|
|
let should_run = match self.last_run {
|
|
None => true,
|
|
Some(last) => now.saturating_duration_since(last) >= Duration::from_secs(60),
|
|
};
|
|
|
|
if should_run {
|
|
if let Err(err) = self.perform_all_tasks() {
|
|
eprintln!("[DirectorWorker] Fehler beim Ausführen der Aufgabe: {err}");
|
|
}
|
|
self.last_run = Some(now);
|
|
}
|
|
|
|
std::thread::sleep(Duration::from_secs(1));
|
|
|
|
if !state.running_worker.load(Ordering::Relaxed) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
fn perform_all_tasks(&mut self) -> Result<(), DbError> {
|
|
// Produktions-/Verkaufs-/Transportlogik für alle Direktoren
|
|
self.perform_task()?;
|
|
self.pay_salary()?;
|
|
self.calculate_satisfaction()?;
|
|
Ok(())
|
|
}
|
|
|
|
fn perform_task(&mut self) -> Result<(), DbError> {
|
|
self.base
|
|
.set_current_step("Get director actions from DB");
|
|
|
|
let mut conn = self
|
|
.base
|
|
.pool
|
|
.get()
|
|
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
|
|
|
conn.prepare("get_directors", QUERY_GET_DIRECTORS)?;
|
|
let directors_rows = conn.execute("get_directors", &[])?;
|
|
|
|
let directors: Vec<Director> = directors_rows
|
|
.into_iter()
|
|
.filter_map(Self::map_row_to_director)
|
|
.collect();
|
|
|
|
if directors.is_empty() {
|
|
eprintln!("[DirectorWorker] Keine Direktoren für Aktionen gefunden (Zeitfenster oder DB-Daten).");
|
|
}
|
|
|
|
for director in directors {
|
|
if director.may_produce {
|
|
eprintln!(
|
|
"[DirectorWorker] Starte Produktionsprüfung für Director {} (branch_id={})",
|
|
director.id, director.branch_id
|
|
);
|
|
self.start_productions(&director)?;
|
|
}
|
|
if director.may_start_transport {
|
|
eprintln!(
|
|
"[DirectorWorker] Starte Transportprüfung für Director {} (branch_id={})",
|
|
director.id, director.branch_id
|
|
);
|
|
self.start_transports_stub(&director);
|
|
}
|
|
if director.may_sell {
|
|
eprintln!(
|
|
"[DirectorWorker] Starte Verkaufsprüfung für Director {} (branch_id={})",
|
|
director.id, director.branch_id
|
|
);
|
|
self.start_sellings(&director)?;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn map_row_to_director(row: Row) -> Option<Director> {
|
|
Some(Director {
|
|
id: row.get("id")?.parse().ok()?,
|
|
branch_id: row.get("branch_id")?.parse().ok()?,
|
|
may_produce: row.get("may_produce").map(|v| v == "t" || v == "true").unwrap_or(false),
|
|
may_sell: row.get("may_sell").map(|v| v == "t" || v == "true").unwrap_or(false),
|
|
may_start_transport: row
|
|
.get("may_start_transport")
|
|
.map(|v| v == "t" || v == "true")
|
|
.unwrap_or(false),
|
|
})
|
|
}
|
|
|
|
fn start_productions(&mut self, director: &Director) -> Result<(), DbError> {
|
|
self.base
|
|
.set_current_step("DirectorWorker: start_productions");
|
|
|
|
let mut conn = self
|
|
.base
|
|
.pool
|
|
.get()
|
|
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
|
|
|
conn.prepare("get_to_produce", QUERY_GET_BEST_PRODUCTION)?;
|
|
let rows = conn.execute("get_to_produce", &[&director.id, &director.branch_id])?;
|
|
if rows.is_empty() {
|
|
eprintln!(
|
|
"[DirectorWorker] Keine Produktionskandidaten für Director {} gefunden.",
|
|
director.id
|
|
);
|
|
return Ok(());
|
|
}
|
|
|
|
let plan = match Self::map_row_to_production_plan(&rows[0]) {
|
|
Some(p) => p,
|
|
None => {
|
|
eprintln!(
|
|
"[DirectorWorker] Produktionsplan für Director {} konnte nicht gemappt werden.",
|
|
director.id
|
|
);
|
|
return Ok(());
|
|
}
|
|
};
|
|
|
|
eprintln!(
|
|
"[DirectorWorker] Produktionsplan: director_user_id={}, branch_id={}, product_id={}, money={}, certificate={}, stock_size={}, used_in_stock={}, running_productions={}, running_qty={}",
|
|
plan.falukant_user_id,
|
|
plan.branch_id,
|
|
plan.product_id,
|
|
plan.money,
|
|
plan.certificate,
|
|
plan.stock_size,
|
|
plan.used_in_stock,
|
|
plan.running_productions,
|
|
plan.running_productions_quantity
|
|
);
|
|
|
|
self.create_production_batches(&mut conn, &plan)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn map_row_to_production_plan(row: &Row) -> Option<ProductionPlan> {
|
|
// Pflichtfelder: ohne diese können wir keinen sinnvollen Plan erstellen.
|
|
let falukant_user_id: i32 = row.get("falukant_user_id")?.parse().ok()?;
|
|
let certificate: i32 = row.get("certificate")?.parse().ok()?;
|
|
let branch_id: i32 = row.get("branch_id")?.parse().ok()?;
|
|
let product_id: i32 = row.get("product_id")?.parse().ok()?;
|
|
|
|
// Optionale/abgeleitete Felder: hier sind wir tolerant und verwenden
|
|
// Default-Werte, falls NULL oder nicht parsbar.
|
|
let money: f64 = row
|
|
.get("money")
|
|
.and_then(|v| v.parse::<f64>().ok())
|
|
.unwrap_or(0.0);
|
|
let stock_size: i32 = row
|
|
.get("stock_size")
|
|
.and_then(|v| v.parse::<i32>().ok())
|
|
.unwrap_or(0);
|
|
let used_in_stock: i32 = row
|
|
.get("used_in_stock")
|
|
.and_then(|v| v.parse::<i32>().ok())
|
|
.unwrap_or(0);
|
|
let running_productions: i32 = row
|
|
.get("running_productions")
|
|
.and_then(|v| v.parse::<i32>().ok())
|
|
.unwrap_or(0);
|
|
let running_productions_quantity: i32 = row
|
|
.get("running_productions_quantity")
|
|
.and_then(|v| v.parse::<i32>().ok())
|
|
.unwrap_or(0);
|
|
|
|
Some(ProductionPlan {
|
|
falukant_user_id,
|
|
money,
|
|
certificate,
|
|
branch_id,
|
|
product_id,
|
|
stock_size,
|
|
used_in_stock,
|
|
running_productions,
|
|
running_productions_quantity,
|
|
})
|
|
}
|
|
|
|
fn create_production_batches(
|
|
&mut self,
|
|
conn: &mut DbConnection,
|
|
plan: &ProductionPlan,
|
|
) -> Result<(), DbError> {
|
|
let running = plan.running_productions;
|
|
// Maximal zwei parallele Produktionen pro Branch zulassen.
|
|
if running >= 2 {
|
|
eprintln!(
|
|
"[DirectorWorker] Bereits zu viele laufende Produktionen ({}), keine neue Produktion.",
|
|
running
|
|
);
|
|
return Ok(());
|
|
}
|
|
|
|
// Freie Lagerkapazität: Gesamtbestand minus bereits belegter Bestand
|
|
// (Inventar) minus bereits eingeplante Produktionsmengen.
|
|
let free_capacity =
|
|
plan.stock_size - plan.used_in_stock - plan.running_productions_quantity;
|
|
|
|
// Stückkosten monetär berechnen. Da money ein f64 ist, arbeiten wir hier ebenfalls
|
|
// mit Gleitkomma und runden erst am Ende auf eine ganze Stückzahl ab.
|
|
let one_piece_cost = (plan.certificate * 6) as f64;
|
|
let mut max_money_production: i32 = 0;
|
|
if one_piece_cost > 0.0 {
|
|
if plan.money > 0.0 {
|
|
// Anzahl Stück, die sich mit dem verfügbaren Geld finanzieren lassen
|
|
max_money_production = (plan.money / one_piece_cost).floor() as i32;
|
|
} else {
|
|
// Falls das Geld aus der DB unerwartet als 0 eingelesen wurde, aber
|
|
// eigentlich ausreichend Guthaben vorhanden ist (bekannter Migrationsfall),
|
|
// lassen wir die Geldbegrenzung vorläufig fallen und begrenzen nur über
|
|
// Lagerkapazität und Hard-Limit 300.
|
|
eprintln!(
|
|
"[DirectorWorker] Warnung: money=0 für falukant_user_id={}, \
|
|
verwende nur Lagerkapazität als Limit.",
|
|
plan.falukant_user_id
|
|
);
|
|
max_money_production = i32::MAX;
|
|
}
|
|
}
|
|
|
|
// Maximale Produktionsmenge begrenzen:
|
|
// - nie mehr als der freie Lagerplatz (`free_capacity`)
|
|
// - nie mehr als durch das verfügbare Geld finanzierbar
|
|
// - absolut maximal 100 Einheiten pro Start
|
|
let to_produce = free_capacity
|
|
.min(max_money_production)
|
|
.min(100)
|
|
.max(0);
|
|
|
|
eprintln!(
|
|
"[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}",
|
|
free_capacity,
|
|
one_piece_cost,
|
|
max_money_production,
|
|
to_produce
|
|
);
|
|
|
|
if to_produce < 1 {
|
|
eprintln!(
|
|
"[DirectorWorker] Keine Produktion gestartet: free_capacity={}, max_money_production={}, running_productions={}, running_qty={}",
|
|
free_capacity,
|
|
max_money_production,
|
|
plan.running_productions,
|
|
plan.running_productions_quantity
|
|
);
|
|
return Ok(());
|
|
}
|
|
|
|
let production_cost = to_produce as f64 * one_piece_cost;
|
|
|
|
if let Err(err) = self.base.change_falukant_user_money(
|
|
plan.falukant_user_id,
|
|
-production_cost,
|
|
"director starts production",
|
|
) {
|
|
eprintln!(
|
|
"[DirectorWorker] Fehler bei change_falukant_user_money: {err}"
|
|
);
|
|
}
|
|
|
|
conn.prepare("insert_production", QUERY_INSERT_PRODUCTION)?;
|
|
|
|
// Anzahl neuer Produktions-Jobs so begrenzen, dass zusammen mit den
|
|
// bereits laufenden maximal 2 parallele Produktionen pro Branch aktiv
|
|
// sind. Jeder Job wird in Batches von max. 100 Stück aufgeteilt.
|
|
let mut remaining_qty = to_produce;
|
|
let mut inserted_total = 0;
|
|
let mut started_jobs = 0;
|
|
let max_new_jobs = (2 - running).max(0);
|
|
|
|
while remaining_qty > 0 && started_jobs < max_new_jobs {
|
|
let batch = remaining_qty.min(100);
|
|
conn.execute(
|
|
"insert_production",
|
|
&[&plan.branch_id, &plan.product_id, &batch],
|
|
)?;
|
|
remaining_qty -= batch;
|
|
inserted_total += batch;
|
|
started_jobs += 1;
|
|
}
|
|
|
|
eprintln!(
|
|
"[DirectorWorker] Produktion angelegt: branch_id={}, product_id={}, quantity={}",
|
|
plan.branch_id, plan.product_id, inserted_total
|
|
);
|
|
|
|
let message = format!(
|
|
r#"{{"event":"production_started","branch_id":{}}}"#,
|
|
plan.branch_id
|
|
);
|
|
self.base.broker.publish(message);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn start_transports_stub(&self, _director: &Director) {
|
|
// TODO: Transportlogik bei Bedarf aus dem C++-Code nachziehen.
|
|
}
|
|
|
|
fn start_sellings(&mut self, director: &Director) -> Result<(), DbError> {
|
|
self.base
|
|
.set_current_step("DirectorWorker: start_sellings");
|
|
|
|
let mut conn = self
|
|
.base
|
|
.pool
|
|
.get()
|
|
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
|
|
|
conn.prepare("get_to_sell", QUERY_GET_INVENTORY)?;
|
|
let rows = conn.execute("get_to_sell", &[&director.id, &director.branch_id])?;
|
|
|
|
let mut items: Vec<InventoryItem> =
|
|
rows.into_iter().filter_map(Self::map_row_to_inventory_item).collect();
|
|
|
|
conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?;
|
|
conn.prepare("add_sell_log", QUERY_ADD_SELL_LOG)?;
|
|
|
|
// Falls es nichts zu verkaufen gibt, können wir sofort zurückkehren.
|
|
if items.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
// Für alle Items dieses Directors sollten die user_id-Felder identisch
|
|
// sein (Arbeitgeber des Directors).
|
|
let falukant_user_id = items[0].user_id;
|
|
|
|
// Vor dem eigentlichen Verkauf versucht der Director, lohnende
|
|
// Transporte zu planen. Dabei werden:
|
|
// - ggf. Transport-Einträge erzeugt
|
|
// - Inventar-Mengen reduziert
|
|
// Die zurückgegebenen Mengen werden dann lokal verkauft.
|
|
for item in items.iter_mut() {
|
|
let shipped = self.plan_transports_for_item(
|
|
&mut conn,
|
|
falukant_user_id,
|
|
item,
|
|
)?;
|
|
|
|
if shipped > 0 {
|
|
if shipped >= item.quantity {
|
|
// Alles wurde in Transporte umgewandelt, lokal nichts mehr zu verkaufen.
|
|
item.quantity = 0;
|
|
} else {
|
|
// Inventar-Menge in der DB reduzieren und im Item anpassen.
|
|
let remaining = item.quantity - shipped;
|
|
Self::update_inventory_quantity(&mut conn, item.id, remaining)?;
|
|
item.quantity = remaining;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Anschließend lokale Verkäufe für die verbleibenden Mengen durchführen.
|
|
for item in items.drain(..) {
|
|
if item.quantity > 0 {
|
|
self.sell_single_inventory_item(&mut conn, &item)?;
|
|
} else {
|
|
// Falls die Menge auf 0 gesetzt wurde, das Inventar ggf. aufräumen.
|
|
conn.execute("remove_inventory", &[&item.id])?;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn map_row_to_inventory_item(row: Row) -> Option<InventoryItem> {
|
|
Some(InventoryItem {
|
|
id: row.get("id")?.parse().ok()?,
|
|
product_id: row.get("product_id")?.parse().ok()?,
|
|
quantity: row.get("quantity")?.parse().ok()?,
|
|
quality: row.get("quality")?.parse().ok()?,
|
|
sell_cost: row.get("sell_cost")?.parse().ok()?,
|
|
user_id: row.get("user_id")?.parse().ok()?,
|
|
region_id: row.get("region_id")?.parse().ok()?,
|
|
branch_id: row.get("branch_id")?.parse().ok()?,
|
|
})
|
|
}
|
|
|
|
fn sell_single_inventory_item(
|
|
&mut self,
|
|
conn: &mut DbConnection,
|
|
item: &InventoryItem,
|
|
) -> Result<(), DbError> {
|
|
if item.quantity <= 0 {
|
|
conn.execute("remove_inventory", &[&item.id])?;
|
|
return Ok(());
|
|
}
|
|
|
|
let min_price = item.sell_cost * 0.6;
|
|
let piece_sell_price =
|
|
min_price + (item.sell_cost - min_price) * (item.quality as f64 / 100.0);
|
|
let sell_price = piece_sell_price * item.quantity as f64;
|
|
|
|
if let Err(err) = self.base.change_falukant_user_money(
|
|
item.user_id,
|
|
sell_price,
|
|
"sell products",
|
|
) {
|
|
eprintln!(
|
|
"[DirectorWorker] Fehler bei change_falukant_user_money (sell products): {err}"
|
|
);
|
|
}
|
|
|
|
conn.execute(
|
|
"add_sell_log",
|
|
&[
|
|
&item.region_id,
|
|
&item.product_id,
|
|
&item.quantity,
|
|
&item.user_id,
|
|
],
|
|
)?;
|
|
|
|
conn.execute("remove_inventory", &[&item.id])?;
|
|
|
|
let message = format!(
|
|
r#"{{"event":"selled_items","branch_id":{}}}"#,
|
|
item.branch_id
|
|
);
|
|
self.base.broker.publish(message);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Plant ggf. Transporte für ein einzelnes Inventar-Item und gibt die
|
|
/// Menge zurück, die tatsächlich in Transporte umgewandelt wurde.
|
|
///
|
|
/// Logik:
|
|
/// - Ermittle regionale "worth_percent"-Werte für das Produkt in allen
|
|
/// Branch-Regionen des Users.
|
|
/// - Berechne lokalen Stückpreis (inkl. Qualität) und für jede andere
|
|
/// Region einen potentiellen Stückpreis.
|
|
/// - Prüfe für jede Zielregion:
|
|
/// * Gibt es verfügbare Transportmittel für diese Route?
|
|
/// * Ist der Mehrerlös (deltaPrice * Menge) größer als die
|
|
/// Transportkosten (max(1, totalValue * 0.01))?
|
|
/// - Wähle die Zielregion mit dem größten positiven Nettogewinn und
|
|
/// erzeuge entsprechende Transporte (begrenzt durch Fahrzeugkapazität).
|
|
fn plan_transports_for_item(
|
|
&mut self,
|
|
conn: &mut DbConnection,
|
|
falukant_user_id: i32,
|
|
item: &mut InventoryItem,
|
|
) -> Result<i32, DbError> {
|
|
// Sicherheitscheck
|
|
if item.quantity <= 0 {
|
|
return Ok(0);
|
|
}
|
|
|
|
// Regionale worth_percent-Werte für dieses Produkt laden
|
|
conn.prepare(
|
|
"get_region_worth_for_product",
|
|
QUERY_GET_REGION_WORTH_FOR_PRODUCT,
|
|
)?;
|
|
let rows = conn.execute(
|
|
"get_region_worth_for_product",
|
|
&[&falukant_user_id, &item.product_id],
|
|
)?;
|
|
|
|
if rows.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let mut worth_by_region: HashMap<i32, f64> = HashMap::new();
|
|
for row in rows {
|
|
let region_id = row
|
|
.get("region_id")
|
|
.and_then(|v| v.parse::<i32>().ok())
|
|
.unwrap_or(-1);
|
|
let percent = row
|
|
.get("worth_percent")
|
|
.and_then(|v| v.parse::<f64>().ok())
|
|
.unwrap_or(100.0);
|
|
|
|
if region_id >= 0 {
|
|
worth_by_region.insert(region_id, percent);
|
|
}
|
|
}
|
|
|
|
if worth_by_region.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
// Lokalen Stückpreis berechnen
|
|
let local_percent = worth_by_region
|
|
.get(&item.region_id)
|
|
.copied()
|
|
.unwrap_or(100.0);
|
|
|
|
let local_sell_cost = item.sell_cost * local_percent / 100.0;
|
|
let local_min_price = local_sell_cost * 0.6;
|
|
let quality_factor = item.quality as f64 / 100.0;
|
|
let local_piece_price =
|
|
local_min_price + (local_sell_cost - local_min_price) * quality_factor;
|
|
|
|
let mut best_target_region: Option<i32> = None;
|
|
let mut best_quantity: i32 = 0;
|
|
let mut best_remote_piece_price: f64 = 0.0;
|
|
let mut best_gain: f64 = 0.0;
|
|
|
|
// Für jede andere Region prüfen, ob sich ein Transport lohnt.
|
|
for (®ion_id, &remote_percent) in &worth_by_region {
|
|
if region_id == item.region_id {
|
|
continue;
|
|
}
|
|
|
|
// Remote-Stückpreis
|
|
let remote_sell_cost = item.sell_cost * remote_percent / 100.0;
|
|
let remote_min_price = remote_sell_cost * 0.6;
|
|
let remote_piece_price =
|
|
remote_min_price + (remote_sell_cost - remote_min_price) * quality_factor;
|
|
|
|
let delta_per_unit = remote_piece_price - local_piece_price;
|
|
if delta_per_unit <= 0.0 {
|
|
continue;
|
|
}
|
|
|
|
// Verfügbare Transportmittel für diese Route abfragen
|
|
let vehicles = Self::get_transport_vehicles_for_route(
|
|
conn,
|
|
falukant_user_id,
|
|
item.region_id,
|
|
region_id,
|
|
)?;
|
|
|
|
if vehicles.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
// Maximale transportierbare Menge anhand der Kapazität ermitteln
|
|
let mut max_capacity: i32 = 0;
|
|
for v in &vehicles {
|
|
max_capacity = max_capacity.saturating_add(v.capacity);
|
|
}
|
|
|
|
if max_capacity <= 0 {
|
|
continue;
|
|
}
|
|
|
|
let qty = std::cmp::min(item.quantity, max_capacity);
|
|
if qty <= 0 {
|
|
continue;
|
|
}
|
|
|
|
let extra_revenue = delta_per_unit * qty as f64;
|
|
let total_value = remote_piece_price * qty as f64;
|
|
let transport_cost = total_value * 0.01_f64;
|
|
// Kostenformel: max(0.01, totalValue * 0.01)
|
|
let transport_cost = if transport_cost < 0.01 {
|
|
0.01
|
|
} else {
|
|
transport_cost
|
|
};
|
|
|
|
let net_gain = extra_revenue - transport_cost;
|
|
if net_gain <= 0.0 {
|
|
continue;
|
|
}
|
|
|
|
if net_gain > best_gain {
|
|
best_gain = net_gain;
|
|
best_target_region = Some(region_id);
|
|
best_quantity = qty;
|
|
best_remote_piece_price = remote_piece_price;
|
|
}
|
|
}
|
|
|
|
// Kein lohnender Transport gefunden
|
|
let target_region = match best_target_region {
|
|
Some(r) => r,
|
|
None => return Ok(0),
|
|
};
|
|
|
|
if best_quantity <= 0 {
|
|
return Ok(0);
|
|
}
|
|
|
|
// Nochmals verfügbare Transportmittel für die gewählte Route laden
|
|
let vehicles = Self::get_transport_vehicles_for_route(
|
|
conn,
|
|
falukant_user_id,
|
|
item.region_id,
|
|
target_region,
|
|
)?;
|
|
|
|
if vehicles.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
// Transporte anlegen, begrenzt durch best_quantity und Kapazitäten
|
|
conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?;
|
|
|
|
let mut remaining = best_quantity;
|
|
for v in &vehicles {
|
|
if remaining <= 0 {
|
|
break;
|
|
}
|
|
let size = std::cmp::min(remaining, v.capacity);
|
|
if size <= 0 {
|
|
continue;
|
|
}
|
|
|
|
conn.execute(
|
|
"insert_transport",
|
|
&[
|
|
&item.region_id,
|
|
&target_region,
|
|
&item.product_id,
|
|
&size,
|
|
&v.id,
|
|
],
|
|
)?;
|
|
|
|
remaining -= size;
|
|
}
|
|
|
|
let shipped = best_quantity - remaining.max(0);
|
|
|
|
// Optional: Logging zur Nachvollziehbarkeit
|
|
if shipped > 0 {
|
|
eprintln!(
|
|
"[DirectorWorker] Transport geplant: {} Einheiten von Produkt {} von Region {} nach Region {} (Stückpreis lokal {:.2}, remote {:.2})",
|
|
shipped, item.product_id, item.region_id, target_region, local_piece_price, best_remote_piece_price
|
|
);
|
|
}
|
|
|
|
Ok(shipped)
|
|
}
|
|
|
|
fn get_transport_vehicles_for_route(
|
|
conn: &mut DbConnection,
|
|
falukant_user_id: i32,
|
|
source_region: i32,
|
|
target_region: i32,
|
|
) -> Result<Vec<TransportVehicle>, DbError> {
|
|
conn.prepare(
|
|
"get_transport_vehicles_for_route",
|
|
QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE,
|
|
)?;
|
|
let rows = conn.execute(
|
|
"get_transport_vehicles_for_route",
|
|
&[&falukant_user_id, &source_region, &target_region],
|
|
)?;
|
|
|
|
let mut result = Vec::with_capacity(rows.len());
|
|
for row in rows {
|
|
let id = row
|
|
.get("vehicle_id")
|
|
.and_then(|v| v.parse::<i32>().ok())
|
|
.unwrap_or(-1);
|
|
let capacity = row
|
|
.get("capacity")
|
|
.and_then(|v| v.parse::<i32>().ok())
|
|
.unwrap_or(0);
|
|
|
|
if id >= 0 && capacity > 0 {
|
|
result.push(TransportVehicle { id, capacity });
|
|
}
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
fn update_inventory_quantity(
|
|
conn: &mut DbConnection,
|
|
inventory_id: i32,
|
|
new_quantity: i32,
|
|
) -> Result<(), DbError> {
|
|
const QUERY_UPDATE_INVENTORY_QTY: &str = r#"
|
|
UPDATE falukant_data.inventory
|
|
SET quantity = $2
|
|
WHERE id = $1;
|
|
"#;
|
|
|
|
conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?;
|
|
conn.execute("update_inventory_qty", &[&inventory_id, &new_quantity])?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn pay_salary(&mut self) -> Result<(), DbError> {
|
|
self.base.set_current_step("DirectorWorker: pay_salary");
|
|
|
|
let mut conn = self
|
|
.base
|
|
.pool
|
|
.get()
|
|
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
|
|
|
conn.prepare("get_salary_to_pay", QUERY_GET_SALARY_TO_PAY)?;
|
|
conn.prepare("set_salary_payed", QUERY_SET_SALARY_PAYED)?;
|
|
|
|
let rows = conn.execute("get_salary_to_pay", &[])?;
|
|
let salaries: Vec<SalaryItem> =
|
|
rows.into_iter().filter_map(Self::map_row_to_salary_item).collect();
|
|
|
|
for item in salaries {
|
|
if let Err(err) = self.base.change_falukant_user_money(
|
|
item.employer_user_id,
|
|
-(item.income as f64),
|
|
"director payed out",
|
|
) {
|
|
eprintln!(
|
|
"[DirectorWorker] Fehler bei change_falukant_user_money (director payed out): {err}"
|
|
);
|
|
}
|
|
|
|
conn.execute("set_salary_payed", &[&item.id])?;
|
|
|
|
let message =
|
|
format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, item.employer_user_id);
|
|
self.base.broker.publish(message);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn map_row_to_salary_item(row: Row) -> Option<SalaryItem> {
|
|
Some(SalaryItem {
|
|
id: row.get("id")?.parse().ok()?,
|
|
employer_user_id: row.get("employer_user_id")?.parse().ok()?,
|
|
income: row.get("income")?.parse().ok()?,
|
|
})
|
|
}
|
|
|
|
fn calculate_satisfaction(&mut self) -> Result<(), DbError> {
|
|
self.base
|
|
.set_current_step("DirectorWorker: calculate_satisfaction");
|
|
|
|
let mut conn = self
|
|
.base
|
|
.pool
|
|
.get()
|
|
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
|
|
|
conn.prepare("update_satisfaction", QUERY_UPDATE_SATISFACTION)?;
|
|
let rows = conn.execute("update_satisfaction", &[])?;
|
|
|
|
for row in rows {
|
|
if let Some(employer_id) = row
|
|
.get("employer_user_id")
|
|
.and_then(|v| v.parse::<i32>().ok())
|
|
{
|
|
let message = format!(
|
|
r#"{{"event":"directorchanged","user_id":{}}}"#,
|
|
employer_id
|
|
);
|
|
self.base.broker.publish(message);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl Worker for DirectorWorker {
|
|
fn start_worker_thread(&mut self) {
|
|
let pool = self.base.pool.clone();
|
|
let broker = self.base.broker.clone();
|
|
|
|
self.base
|
|
.start_worker_with_loop(move |state: Arc<WorkerState>| {
|
|
let mut worker = DirectorWorker::new(pool.clone(), broker.clone());
|
|
while state.running_worker.load(Ordering::Relaxed) {
|
|
worker.run_iteration(&state);
|
|
}
|
|
});
|
|
}
|
|
|
|
fn stop_worker_thread(&mut self) {
|
|
self.base.stop_worker();
|
|
}
|
|
|
|
fn enable_watchdog(&mut self) {
|
|
self.base.start_watchdog();
|
|
}
|
|
}
|
|
|
|
|