Add branch capacity query and production limits in DirectorWorker: Introduced a new SQL query to fetch current stock and production values for a branch, enabling dynamic production management. Implemented logic to limit concurrent productions to a maximum of two and updated production creation to handle single production requests efficiently. Enhanced logging for better traceability of production activities.

This commit is contained in:
Torsten Schulz (local)
2025-12-01 16:23:40 +01:00
parent 513862a157
commit 9ee8c970c7

View File

@@ -60,6 +60,9 @@ pub struct DirectorWorker {
last_run: Option<Instant>, last_run: Option<Instant>,
} }
// Maximale Anzahl paralleler Produktionen pro Branch
const MAX_PARALLEL_PRODUCTIONS: i32 = 2;
// SQL-Queries (1:1 aus director_worker.h) // SQL-Queries (1:1 aus director_worker.h)
const QUERY_GET_DIRECTORS: &str = r#" const QUERY_GET_DIRECTORS: &str = r#"
SELECT SELECT
@@ -146,6 +149,34 @@ const QUERY_INSERT_PRODUCTION: &str = r#"
VALUES ($1, $2, $3); VALUES ($1, $2, $3);
"#; "#;
// Query zum Abfragen der aktuellen Lager- und Produktionswerte für einen Branch
// (ohne den kompletten Produktionsplan neu zu berechnen)
const QUERY_GET_BRANCH_CAPACITY: &str = r#"
SELECT
(
SELECT SUM(quantity)
FROM falukant_data.stock fds
WHERE fds.branch_id = $1
) AS stock_size,
COALESCE((
SELECT SUM(COALESCE(fdi.quantity, 0))
FROM falukant_data.stock fds
JOIN falukant_data.inventory fdi
ON fdi.stock_id = fds.id
WHERE fds.branch_id = $1
), 0) AS used_in_stock,
(
SELECT COUNT(id)
FROM falukant_data.production
WHERE branch_id = $1
) AS running_productions,
COALESCE((
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
FROM falukant_data.production fdp
WHERE fdp.branch_id = $1
), 0) AS running_productions_quantity;
"#;
const QUERY_GET_INVENTORY: &str = r#" const QUERY_GET_INVENTORY: &str = r#"
SELECT SELECT
i.id, i.id,
@@ -373,6 +404,7 @@ impl DirectorWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Initial: Bestes Produkt für diesen Branch ermitteln
conn.prepare("get_to_produce", QUERY_GET_BEST_PRODUCTION)?; conn.prepare("get_to_produce", QUERY_GET_BEST_PRODUCTION)?;
let rows = conn.execute("get_to_produce", &[&director.id, &director.branch_id])?; let rows = conn.execute("get_to_produce", &[&director.id, &director.branch_id])?;
if rows.is_empty() { if rows.is_empty() {
@@ -383,7 +415,7 @@ impl DirectorWorker {
return Ok(()); return Ok(());
} }
let plan = match Self::map_row_to_production_plan(&rows[0]) { let mut base_plan = match Self::map_row_to_production_plan(&rows[0]) {
Some(p) => p, Some(p) => p,
None => { None => {
eprintln!( eprintln!(
@@ -395,19 +427,82 @@ impl DirectorWorker {
}; };
eprintln!( eprintln!(
"[DirectorWorker] Produktionsplan: director_user_id={}, branch_id={}, product_id={}, money={}, certificate={}, stock_size={}, used_in_stock={}, running_productions={}, running_qty={}", "[DirectorWorker] Produktionsplan: director_user_id={}, branch_id={}, product_id={}, money={}, certificate={}",
plan.falukant_user_id, base_plan.falukant_user_id,
plan.branch_id, base_plan.branch_id,
plan.product_id, base_plan.product_id,
plan.money, base_plan.money,
plan.certificate, base_plan.certificate
plan.stock_size,
plan.used_in_stock,
plan.running_productions,
plan.running_productions_quantity
); );
self.create_production_batches(&mut conn, &plan)?; // Query zum Abfragen der aktuellen Kapazitätswerte vorbereiten
conn.prepare("get_branch_capacity", QUERY_GET_BRANCH_CAPACITY)?;
// Schleife: Starte Produktionen, bis entweder die maximale Anzahl erreicht ist
// oder kein freier Lagerplatz mehr vorhanden ist
loop {
// Aktuelle Kapazitätswerte abfragen
let capacity_rows = conn.execute("get_branch_capacity", &[&director.branch_id])?;
if capacity_rows.is_empty() {
break;
}
let row = &capacity_rows[0];
let stock_size: i32 = row
.get("stock_size")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let used_in_stock: i32 = row
.get("used_in_stock")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions: i32 = row
.get("running_productions")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
let running_productions_quantity: i32 = row
.get("running_productions_quantity")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0);
// Prüfen, ob noch Produktionen gestartet werden können
if running_productions >= MAX_PARALLEL_PRODUCTIONS {
eprintln!(
"[DirectorWorker] Maximale Anzahl an Produktionen ({}) erreicht für Branch {}.",
MAX_PARALLEL_PRODUCTIONS,
director.branch_id
);
break;
}
// Freie Kapazität berechnen
let free_capacity = stock_size - used_in_stock - running_productions_quantity;
if free_capacity <= 0 {
eprintln!(
"[DirectorWorker] Kein freier Lagerplatz mehr für Branch {} (stock_size={}, used={}, running_qty={}).",
director.branch_id,
stock_size,
used_in_stock,
running_productions_quantity
);
break;
}
// Plan mit aktuellen Werten aktualisieren
base_plan.stock_size = stock_size;
base_plan.used_in_stock = used_in_stock;
base_plan.running_productions = running_productions;
base_plan.running_productions_quantity = running_productions_quantity;
// Eine neue Produktion starten (max. 100 Stück)
if let Err(err) = self.create_single_production(&mut conn, &base_plan) {
eprintln!(
"[DirectorWorker] Fehler beim Starten einer Produktion: {err}"
);
break;
}
}
Ok(()) Ok(())
} }
@@ -454,21 +549,15 @@ impl DirectorWorker {
}) })
} }
fn create_production_batches( /// Startet eine einzelne Produktion (max. 100 Stück) basierend auf dem aktuellen Plan.
/// Die äußere Schleife in `start_productions` sorgt dafür, dass mehrere Produktionen
/// gestartet werden können, bis entweder die maximale Anzahl erreicht ist oder
/// kein freier Lagerplatz mehr vorhanden ist.
fn create_single_production(
&mut self, &mut self,
conn: &mut DbConnection, conn: &mut DbConnection,
plan: &ProductionPlan, plan: &ProductionPlan,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
let running = plan.running_productions;
// Maximal zwei parallele Produktionen pro Branch zulassen.
if running >= 2 {
eprintln!(
"[DirectorWorker] Bereits zu viele laufende Produktionen ({}), keine neue Produktion.",
running
);
return Ok(());
}
// Freie Lagerkapazität: Gesamtbestand minus bereits belegter Bestand // Freie Lagerkapazität: Gesamtbestand minus bereits belegter Bestand
// (Inventar) minus bereits eingeplante Produktionsmengen. // (Inventar) minus bereits eingeplante Produktionsmengen.
let free_capacity = let free_capacity =
@@ -486,7 +575,7 @@ impl DirectorWorker {
// Falls das Geld aus der DB unerwartet als 0 eingelesen wurde, aber // Falls das Geld aus der DB unerwartet als 0 eingelesen wurde, aber
// eigentlich ausreichend Guthaben vorhanden ist (bekannter Migrationsfall), // eigentlich ausreichend Guthaben vorhanden ist (bekannter Migrationsfall),
// lassen wir die Geldbegrenzung vorläufig fallen und begrenzen nur über // lassen wir die Geldbegrenzung vorläufig fallen und begrenzen nur über
// Lagerkapazität und Hard-Limit 300. // Lagerkapazität und Hard-Limit 100.
eprintln!( eprintln!(
"[DirectorWorker] Warnung: money=0 für falukant_user_id={}, \ "[DirectorWorker] Warnung: money=0 für falukant_user_id={}, \
verwende nur Lagerkapazität als Limit.", verwende nur Lagerkapazität als Limit.",
@@ -499,18 +588,19 @@ impl DirectorWorker {
// Maximale Produktionsmenge begrenzen: // Maximale Produktionsmenge begrenzen:
// - nie mehr als der freie Lagerplatz (`free_capacity`) // - nie mehr als der freie Lagerplatz (`free_capacity`)
// - nie mehr als durch das verfügbare Geld finanzierbar // - nie mehr als durch das verfügbare Geld finanzierbar
// - absolut maximal 100 Einheiten pro Start // - absolut maximal 100 Einheiten pro Produktion
let to_produce = free_capacity let to_produce = free_capacity
.min(max_money_production) .min(max_money_production)
.min(100) .min(100)
.max(0); .max(0);
eprintln!( eprintln!(
"[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}", "[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}",
free_capacity, free_capacity,
one_piece_cost, one_piece_cost,
max_money_production, max_money_production,
to_produce to_produce,
plan.running_productions
); );
if to_produce < 1 { if to_produce < 1 {
@@ -538,28 +628,15 @@ impl DirectorWorker {
conn.prepare("insert_production", QUERY_INSERT_PRODUCTION)?; conn.prepare("insert_production", QUERY_INSERT_PRODUCTION)?;
// Anzahl neuer Produktions-Jobs so begrenzen, dass zusammen mit den // Eine einzelne Produktion mit max. 100 Stück anlegen
// bereits laufenden maximal 2 parallele Produktionen pro Branch aktiv
// sind. Jeder Job wird in Batches von max. 100 Stück aufgeteilt.
let mut remaining_qty = to_produce;
let mut inserted_total = 0;
let mut started_jobs = 0;
let max_new_jobs = (2 - running).max(0);
while remaining_qty > 0 && started_jobs < max_new_jobs {
let batch = remaining_qty.min(100);
conn.execute( conn.execute(
"insert_production", "insert_production",
&[&plan.branch_id, &plan.product_id, &batch], &[&plan.branch_id, &plan.product_id, &to_produce],
)?; )?;
remaining_qty -= batch;
inserted_total += batch;
started_jobs += 1;
}
eprintln!( eprintln!(
"[DirectorWorker] Produktion angelegt: branch_id={}, product_id={}, quantity={}", "[DirectorWorker] Produktion angelegt: branch_id={}, product_id={}, quantity={}",
plan.branch_id, plan.product_id, inserted_total plan.branch_id, plan.product_id, to_produce
); );
let message = format!( let message = format!(