diff --git a/src/worker/base.rs b/src/worker/base.rs index 419af42..2e5145d 100644 --- a/src/worker/base.rs +++ b/src/worker/base.rs @@ -1,4 +1,5 @@ use crate::db::{ConnectionPool, DbError}; +use crate::worker::sql::{QUERY_UPDATE_MONEY, QUERY_GET_MONEY}; use crate::message_broker::MessageBroker; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -147,11 +148,7 @@ impl BaseWorker { .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; // Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection - const QUERY_UPDATE_MONEY: &str = r#" - SELECT falukant_data.update_money($1, $2, $3); - "#; - - conn.prepare("update_money", QUERY_UPDATE_MONEY)?; + conn.prepare("update_money", QUERY_UPDATE_MONEY)?; // Validate float to avoid passing NaN/inf which the postgres client // may fail to serialize with an unclear error message. @@ -165,9 +162,6 @@ impl BaseWorker { // We must ensure the resulting money fits in numeric(10,2). // numeric(10,2) max absolute value is < 10^8 (100_000_000) before rounding. // Fetch current money for the user and clamp the delta if needed. - const QUERY_GET_MONEY: &str = r#" - SELECT money FROM falukant_data.falukant_user WHERE id = $1; - "#; conn.prepare("get_money_for_clamp", QUERY_GET_MONEY)?; let rows = conn.execute("get_money_for_clamp", &[&falukant_user_id])?; @@ -178,13 +172,11 @@ impl BaseWorker { .unwrap_or(0.0); // compute tentative result - let tentative = current_money + money_change; + let tentative = current_money + money_change; // numeric(10,2) allows values with absolute < 10^8 (100_000_000) const MAX_ABS: f64 = 100_000_000.0 - 0.01; // leave room for scale - let _allowed = MAX_ABS - current_money; - let adjusted_money_change = if tentative >= MAX_ABS { let clipped = MAX_ABS - current_money; eprintln!( @@ -203,19 +195,12 @@ impl BaseWorker { money_change }; - // Keep only important clamp logging: when clipping occurs we log it above. - // Send exact types matching the DB function signature: let uid_i32: i32 = falukant_user_id; let money_str = format!("{:.2}", adjusted_money_change); - // Note: we intentionally avoid parameterized call due to serialization - // issues in this environment and instead execute a literal SQL below. - - // Execute update; avoid noisy logging here. - - // Use a literal SQL call because parameterized execution keeps failing - // with "error serializing parameter 1" in this environment. + // Note: we intentionally avoid parameterized call due to serialization + // issues in this environment and instead execute a literal SQL below. fn escape_sql_literal(s: &str) -> String { s.replace('\'', "''") } diff --git a/src/worker/director.rs b/src/worker/director.rs index 598be24..d3fd284 100644 --- a/src/worker/director.rs +++ b/src/worker/director.rs @@ -7,6 +7,31 @@ use std::time::{Duration, Instant}; use crate::db::ConnectionPool; use super::base::{BaseWorker, Worker, WorkerState, DEFAULT_TAX_PERCENT, DEFAULT_TREASURY_USER_ID}; +use crate::worker::sql::{ + QUERY_GET_DIRECTORS, + QUERY_GET_BEST_PRODUCTION, + QUERY_INSERT_PRODUCTION, + QUERY_GET_BRANCH_CAPACITY, + QUERY_GET_INVENTORY, + QUERY_REMOVE_INVENTORY, + QUERY_ADD_SELL_LOG, + QUERY_GET_REGION_WORTH_FOR_PRODUCT, + QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE, + QUERY_INSERT_TRANSPORT, + QUERY_INSERT_EMPTY_TRANSPORT, + QUERY_GET_USER_BRANCHES, + QUERY_GET_FREE_VEHICLES_IN_REGION, + QUERY_GET_SALARY_TO_PAY, + QUERY_SET_SALARY_PAYED, + QUERY_UPDATE_SATISFACTION, + QUERY_GET_DIRECTOR_USER, + QUERY_COUNT_VEHICLES_IN_BRANCH_REGION, + QUERY_COUNT_VEHICLES_IN_REGION, + QUERY_CHECK_ROUTE, + QUERY_GET_BRANCH_REGION, + QUERY_GET_AVERAGE_WORTH, + QUERY_UPDATE_INVENTORY_QTY, +}; #[derive(Debug, Clone)] struct Director { @@ -65,279 +90,10 @@ pub struct DirectorWorker { // Maximale Anzahl paralleler Produktionen pro Branch const MAX_PARALLEL_PRODUCTIONS: i32 = 2; -// SQL-Queries (1:1 aus director_worker.h) -const QUERY_GET_DIRECTORS: &str = r#" - SELECT - d.may_produce, - d.may_sell, - d.may_start_transport, - b.id AS branch_id, - fu.id AS falukantUserId, - d.id - FROM falukant_data.director d - JOIN falukant_data.falukant_user fu - ON fu.id = d.employer_user_id - JOIN falukant_data.character c - ON c.id = d.director_character_id - JOIN falukant_data.branch b - ON b.region_id = c.region_id - AND b.falukant_user_id = fu.id - WHERE current_time BETWEEN '08:00:00' AND '17:00:00'; -"#; - -const QUERY_GET_BEST_PRODUCTION: &str = r#" - SELECT - fdu.id falukant_user_id, - -- Geld explizit als Text casten, damit das Mapping im Rust-Code - -- zuverlässig funktioniert (unabhängig vom nativen DB-Typ wie `money`). - CAST(fdu.money AS text) AS money, - fdu.certificate, - ftp.id product_id, - ftp.label_tr, - fdb.region_id, - ( - SELECT SUM(quantity) - FROM falukant_data.stock fds - WHERE fds.branch_id = fdb.id - ) AS stock_size, - COALESCE(( - SELECT SUM(COALESCE(fdi.quantity, 0)) - FROM falukant_data.stock fds - JOIN falukant_data.inventory fdi - ON fdi.stock_id = fds.id - WHERE fds.branch_id = fdb.id - ), 0) AS used_in_stock, - (ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category) - / (300.0 * ftp.production_time) AS worth, - fdb.id AS branch_id, - ( - SELECT COUNT(id) - FROM falukant_data.production - WHERE branch_id = fdb.id - ) AS running_productions, - COALESCE(( - SELECT SUM(COALESCE(fdp.quantity, 0)) quantity - FROM falukant_data.production fdp - WHERE fdp.branch_id = fdb.id - ), 0) AS running_productions_quantity - FROM falukant_data.director fdd - JOIN falukant_data.character fdc - ON fdc.id = fdd.director_character_id - JOIN falukant_data.falukant_user fdu - ON fdd.employer_user_id = fdu.id - JOIN falukant_data.character user_character - ON user_character.user_id = fdu.id - JOIN falukant_data.branch fdb - ON fdb.falukant_user_id = fdu.id - AND fdb.region_id = fdc.region_id - JOIN falukant_data.town_product_worth fdtpw - ON fdtpw.region_id = fdb.region_id - JOIN falukant_data.knowledge fdk_character - ON fdk_character.product_id = fdtpw.product_id - AND fdk_character.character_id = user_character.id - JOIN falukant_data.knowledge fdk_director - ON fdk_director.product_id = fdtpw.product_id - AND fdk_director.character_id = fdd.director_character_id - JOIN falukant_type.product ftp - ON ftp.id = fdtpw.product_id - AND ftp.category <= fdu.certificate - WHERE fdd.id = $1 - AND fdb.id = $2 - ORDER BY worth DESC - LIMIT 1; -"#; - -const QUERY_INSERT_PRODUCTION: &str = r#" - INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id) - VALUES ($1, $2, $3, ( - SELECT weather_type_id - FROM falukant_data.weather - WHERE region_id = $4 - )); -"#; - -// Query zum Abfragen der aktuellen Lager- und Produktionswerte für einen Branch -// (ohne den kompletten Produktionsplan neu zu berechnen) -const QUERY_GET_BRANCH_CAPACITY: &str = r#" - SELECT - ( - SELECT SUM(quantity) - FROM falukant_data.stock fds - WHERE fds.branch_id = $1 - ) AS stock_size, - COALESCE(( - SELECT SUM(COALESCE(fdi.quantity, 0)) - FROM falukant_data.stock fds - JOIN falukant_data.inventory fdi - ON fdi.stock_id = fds.id - WHERE fds.branch_id = $1 - ), 0) AS used_in_stock, - ( - SELECT COUNT(id) - FROM falukant_data.production - WHERE branch_id = $1 - ) AS running_productions, - COALESCE(( - SELECT SUM(COALESCE(fdp.quantity, 0)) quantity - FROM falukant_data.production fdp - WHERE fdp.branch_id = $1 - ), 0) AS running_productions_quantity; -"#; - -const QUERY_GET_INVENTORY: &str = r#" - SELECT - i.id, - i.product_id, - i.quantity, - i.quality, - p.sell_cost, - fu.id AS user_id, - b.region_id, - b.id AS branch_id, - COALESCE(tpw.worth_percent, 100.0) AS worth_percent - FROM falukant_data.inventory i - JOIN falukant_data.stock s - ON s.id = i.stock_id - JOIN falukant_data.branch b - ON b.id = s.branch_id - JOIN falukant_data.falukant_user fu - ON fu.id = b.falukant_user_id - JOIN falukant_data.director d - ON d.employer_user_id = fu.id - JOIN falukant_type.product p - ON p.id = i.product_id - LEFT JOIN falukant_data.town_product_worth tpw - ON tpw.region_id = b.region_id - AND tpw.product_id = i.product_id - WHERE d.id = $1 - AND b.id = $2; -"#; - -const QUERY_REMOVE_INVENTORY: &str = r#" - DELETE FROM falukant_data.inventory - WHERE id = $1; -"#; - -const QUERY_ADD_SELL_LOG: &str = r#" - INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id) - VALUES ($1, $2, $3, $4) - ON CONFLICT (region_id, product_id, seller_id) - DO UPDATE - SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity; -"#; - -// Regionale Verkaufswürdigkeit pro Produkt/Region für alle Branches eines Users -const QUERY_GET_REGION_WORTH_FOR_PRODUCT: &str = r#" - SELECT - tpw.region_id, - tpw.product_id, - tpw.worth_percent - FROM falukant_data.town_product_worth tpw - JOIN falukant_data.branch b - ON b.region_id = tpw.region_id - WHERE b.falukant_user_id = $1 - AND tpw.product_id = $2; -"#; +// ...existing code... // Verfügbare Transportmittel für eine Route (source_region -> target_region) -const QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE: &str = r#" - SELECT - v.id AS vehicle_id, - vt.capacity AS capacity - FROM falukant_data.vehicle v - JOIN falukant_type.vehicle vt - ON vt.id = v.vehicle_type_id - JOIN falukant_data.region_distance rd - ON ( - (rd.source_region_id = v.region_id AND rd.target_region_id = $3) - OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id) - ) - AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL) - WHERE v.falukant_user_id = $1 - AND v.region_id = $2; -"#; - -// Transport-Eintrag anlegen -const QUERY_INSERT_TRANSPORT: &str = r#" - INSERT INTO falukant_data.transport - (source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, NOW(), NOW()); -"#; - -// Leere Transporte (product_id = NULL, size = 0) zum Zurückholen von Fahrzeugen -const QUERY_INSERT_EMPTY_TRANSPORT: &str = r#" - INSERT INTO falukant_data.transport - (source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at) - VALUES ($1, $2, NULL, 0, $3, NOW(), NOW()); -"#; - -// Alle Branches des Users mit ihren Regionen -const QUERY_GET_USER_BRANCHES: &str = r#" - SELECT DISTINCT - b.region_id, - b.id AS branch_id - FROM falukant_data.branch b - WHERE b.falukant_user_id = $1 - AND b.region_id != $2; -"#; - -// Freie Transportmittel in einer Region (nicht in aktiven Transporten) -// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert -const QUERY_GET_FREE_VEHICLES_IN_REGION: &str = r#" - SELECT - v.id AS vehicle_id, - vt.capacity AS capacity - FROM falukant_data.vehicle v - JOIN falukant_type.vehicle vt - ON vt.id = v.vehicle_type_id - WHERE v.falukant_user_id = $1 - AND v.region_id = $2 - AND v.id NOT IN ( - SELECT DISTINCT t.vehicle_id - FROM falukant_data.transport t - WHERE t.vehicle_id IS NOT NULL - ); -"#; - -const QUERY_GET_SALARY_TO_PAY: &str = r#" - SELECT d.id, d.employer_user_id, d.income - FROM falukant_data.director d - WHERE DATE(d.last_salary_payout) < DATE(NOW()); -"#; - -const QUERY_SET_SALARY_PAYED: &str = r#" - UPDATE falukant_data.director - SET last_salary_payout = NOW() - WHERE id = $1; -"#; - -const QUERY_UPDATE_SATISFACTION: &str = r#" - WITH new_sats AS ( - SELECT - d.id, - ROUND( - d.income::numeric - / - ( - c.title_of_nobility - * POWER(1.231, AVG(k.knowledge) / 1.5) - ) - * 100 - ) AS new_satisfaction - FROM falukant_data.director d - JOIN falukant_data.knowledge k - ON d.director_character_id = k.character_id - JOIN falukant_data.character c - ON c.id = d.director_character_id - GROUP BY d.id, c.title_of_nobility, d.income - ) - UPDATE falukant_data.director dir - SET satisfaction = ns.new_satisfaction - FROM new_sats ns - WHERE dir.id = ns.id - AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction - RETURNING dir.employer_user_id; -"#; +// ...existing code... impl DirectorWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { @@ -614,41 +370,11 @@ impl DirectorWorker { conn: &mut DbConnection, plan: &ProductionPlan, ) -> Result<(), DbError> { - // Freie Lagerkapazität: Gesamtbestand minus bereits belegter Bestand - // (Inventar) minus bereits eingeplante Produktionsmengen. - let free_capacity = - plan.stock_size - plan.used_in_stock - plan.running_productions_quantity; + let free_capacity = Self::calc_free_capacity(plan); + let one_piece_cost = Self::calc_one_piece_cost(plan); + let max_money_production = Self::calc_max_money_production(plan, one_piece_cost); - // Stückkosten monetär berechnen. Da money ein f64 ist, arbeiten wir hier ebenfalls - // mit Gleitkomma und runden erst am Ende auf eine ganze Stückzahl ab. - let one_piece_cost = (plan.certificate * 6) as f64; - let mut max_money_production: i32 = 0; - if one_piece_cost > 0.0 { - if plan.money > 0.0 { - // Anzahl Stück, die sich mit dem verfügbaren Geld finanzieren lassen - max_money_production = (plan.money / one_piece_cost).floor() as i32; - } else { - // Falls das Geld aus der DB unerwartet als 0 eingelesen wurde, aber - // eigentlich ausreichend Guthaben vorhanden ist (bekannter Migrationsfall), - // lassen wir die Geldbegrenzung vorläufig fallen und begrenzen nur über - // Lagerkapazität und Hard-Limit 100. - eprintln!( - "[DirectorWorker] Warnung: money=0 für falukant_user_id={}, \ - verwende nur Lagerkapazität als Limit.", - plan.falukant_user_id - ); - max_money_production = i32::MAX; - } - } - - // Maximale Produktionsmenge begrenzen: - // - nie mehr als der freie Lagerplatz (`free_capacity`) - // - nie mehr als durch das verfügbare Geld finanzierbar - // - absolut maximal 100 Einheiten pro Produktion - let to_produce = free_capacity - .min(max_money_production) - .min(100) - .max(0); + let to_produce = free_capacity.min(max_money_production).min(100).max(0); eprintln!( "[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}", @@ -711,6 +437,30 @@ impl DirectorWorker { Ok(()) } + fn calc_free_capacity(plan: &ProductionPlan) -> i32 { + plan.stock_size - plan.used_in_stock - plan.running_productions_quantity + } + + fn calc_one_piece_cost(plan: &ProductionPlan) -> f64 { + (plan.certificate * 6) as f64 + } + + fn calc_max_money_production(plan: &ProductionPlan, one_piece_cost: f64) -> i32 { + if one_piece_cost > 0.0 { + if plan.money > 0.0 { + (plan.money / one_piece_cost).floor() as i32 + } else { + eprintln!( + "[DirectorWorker] Warnung: money=0 für falukant_user_id={}, verwende nur Lagerkapazität als Limit.", + plan.falukant_user_id + ); + i32::MAX + } + } else { + 0 + } + } + fn start_transports_stub(&mut self, director: &Director) -> Result<(), DbError> { self.base .set_current_step("DirectorWorker: start_transports"); @@ -736,11 +486,6 @@ impl DirectorWorker { // sein (Arbeitgeber des Directors). let falukant_user_id = if items.is_empty() { // Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln - const QUERY_GET_DIRECTOR_USER: &str = r#" - SELECT employer_user_id - FROM falukant_data.director - WHERE id = $1; - "#; conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?; let user_rows = conn.execute("get_director_user", &[&director.id])?; user_rows @@ -754,20 +499,6 @@ impl DirectorWorker { // Prüfe, ob Transportmittel im aktuellen Branch vorhanden sind // Ein Transport ist aktiv, wenn er noch in der Tabelle existiert - const QUERY_COUNT_VEHICLES_IN_BRANCH_REGION: &str = r#" - SELECT COUNT(*) AS count - FROM falukant_data.vehicle v - JOIN falukant_data.branch b - ON b.region_id = v.region_id - WHERE v.falukant_user_id = $1 - AND b.id = $2 - AND v.id NOT IN ( - SELECT DISTINCT t.vehicle_id - FROM falukant_data.transport t - WHERE t.vehicle_id IS NOT NULL - ); - "#; - conn.prepare("count_vehicles_in_branch", QUERY_COUNT_VEHICLES_IN_BRANCH_REGION)?; let vehicle_count_rows = conn.execute( "count_vehicles_in_branch", @@ -976,45 +707,45 @@ impl DirectorWorker { }) } - fn sell_single_inventory_item( - &mut self, - conn: &mut DbConnection, - item: &InventoryItem, - ) -> Result<(), DbError> { - if item.quantity <= 0 { - conn.execute("remove_inventory", &[&item.id])?; - return Ok(()); - } - - // Neue Preisberechnung gemäß Spezifikation: - // 1. Basispreis = product.sellCost * (worthPercent / 100) + // Helper: compute piece sell price from item fields + fn compute_piece_sell_price(item: &InventoryItem) -> f64 { let base_price = item.sell_cost * (item.worth_percent / 100.0); - - // 2. min = basePrice * 0.6, max = basePrice - let min_price = base_price * 0.6; - let max_price = base_price; - - // 3. price = min + (max - min) * (knowledgeFactor / 100) - // knowledgeFactor ist hier item.quality - let knowledge_factor = item.quality as f64; - let piece_sell_price = min_price + (max_price - min_price) * (knowledge_factor / 100.0); - - let sell_price = piece_sell_price * item.quantity as f64; + let min_price = base_price * 0.6; + let max_price = base_price; + let knowledge_factor = item.quality as f64; + min_price + (max_price - min_price) * (knowledge_factor / 100.0) + } - // Steuerberechnung: 1) Region ermitteln, 2) user offices, 3) cumulative tax (mit Befreiungen) + // Helper: get one_piece_cost from DB row fallback logic + fn resolve_one_piece_cost(conn: &mut DbConnection, product_id: i32, fallback: f64) -> Result { + conn.prepare("get_product_cost", "SELECT original_sell_cost, sell_cost FROM falukant_type.product WHERE id = $1")?; + let rows = conn.execute("get_product_cost", &[&product_id])?; + if let Some(row) = rows.get(0) { + if let Some(osc) = row.get("original_sell_cost") { + if let Ok(v) = osc.parse::() { return Ok(v); } + } + if let Some(sc) = row.get("sell_cost") { + if let Ok(v) = sc.parse::() { return Ok(v); } + } + } + Ok(fallback) + } + + // Helper: determine cumulative tax percent for a branch/user + fn get_cumulative_tax_percent(conn: &mut DbConnection, branch_id: i32, user_id: i32) -> Result { + // Default let mut cumulative_tax_percent = DEFAULT_TAX_PERCENT; conn.prepare("get_branch_region", "SELECT region_id FROM falukant_data.branch WHERE id = $1;")?; - let branch_rows = conn.execute("get_branch_region", &[&item.branch_id])?; + let branch_rows = conn.execute("get_branch_region", &[&branch_id])?; let branch_region_id: Option = branch_rows.get(0).and_then(|r| r.get("region_id")).and_then(|v| v.parse().ok()); if let Some(region_id) = branch_region_id { - // user offices conn.prepare( "get_user_offices", "SELECT po.id AS office_id, pot.name AS office_name, po.region_id, rt.label_tr AS region_type FROM falukant_data.political_office po JOIN falukant_type.political_office_type pot ON pot.id = po.office_type_id JOIN falukant_data.region r ON r.id = po.region_id JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE po.holder_id = $1 AND (po.end_date IS NULL OR po.end_date > NOW());", )?; - let offices = conn.execute("get_user_offices", &[&item.user_id])?; + let offices = conn.execute("get_user_offices", &[&user_id])?; let mut exempt_types: Vec = Vec::new(); let mut has_chancellor = false; @@ -1032,52 +763,55 @@ impl DirectorWorker { } if has_chancellor { - cumulative_tax_percent = 0.0; + return Ok(0.0); + } + + if exempt_types.is_empty() { + conn.prepare( + "cumulative_tax_no_exempt", + "WITH RECURSIVE ancestors AS (SELECT id, parent_id, COALESCE(tax_percent,0.0) AS tax_percent FROM falukant_data.region WHERE id = $1 UNION ALL SELECT r.id, r.parent_id, COALESCE(r.tax_percent,0.0) FROM falukant_data.region r JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;", + )?; + let res = conn.execute("cumulative_tax_no_exempt", &[®ion_id])?; + if let Some(row) = res.get(0) { + if let Some(tp) = row.get("total_percent") { + cumulative_tax_percent = tp.parse::().unwrap_or(DEFAULT_TAX_PERCENT); + } + } } else { - if exempt_types.is_empty() { - conn.prepare( - "cumulative_tax_no_exempt", - "WITH RECURSIVE ancestors AS (SELECT id, parent_id, COALESCE(tax_percent,0.0) AS tax_percent FROM falukant_data.region WHERE id = $1 UNION ALL SELECT r.id, r.parent_id, COALESCE(r.tax_percent,0.0) FROM falukant_data.region r JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;", - )?; - let res = conn.execute("cumulative_tax_no_exempt", &[®ion_id])?; - if let Some(row) = res.get(0) { - if let Some(tp) = row.get("total_percent") { - cumulative_tax_percent = tp.parse::().unwrap_or(DEFAULT_TAX_PERCENT); - } - } - } else { - conn.prepare( - "cumulative_tax_with_exempt", - "WITH RECURSIVE ancestors AS (SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END AS tax_percent FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE r.id = $1 UNION ALL SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;", - )?; - let exempt_array: Vec<&str> = exempt_types.iter().map(|s| s.as_str()).collect(); - let res = conn.execute("cumulative_tax_with_exempt", &[®ion_id, &exempt_array])?; - if let Some(row) = res.get(0) { - if let Some(tp) = row.get("total_percent") { - cumulative_tax_percent = tp.parse::().unwrap_or(DEFAULT_TAX_PERCENT); - } + conn.prepare( + "cumulative_tax_with_exempt", + "WITH RECURSIVE ancestors AS (SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END AS tax_percent FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE r.id = $1 UNION ALL SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;", + )?; + let exempt_array: Vec<&str> = exempt_types.iter().map(|s| s.as_str()).collect(); + let res = conn.execute("cumulative_tax_with_exempt", &[®ion_id, &exempt_array])?; + if let Some(row) = res.get(0) { + if let Some(tp) = row.get("total_percent") { + cumulative_tax_percent = tp.parse::().unwrap_or(DEFAULT_TAX_PERCENT); } } } } - // Produktkosten (original_sell_cost fallback sell_cost) - conn.prepare("get_product_cost", "SELECT original_sell_cost, sell_cost FROM falukant_type.product WHERE id = $1")?; - let cost_rows = conn.execute("get_product_cost", &[&item.product_id])?; - let mut one_piece_cost = item.sell_cost; - if let Some(row) = cost_rows.get(0) { - if let Some(osc) = row.get("original_sell_cost") { - if let Ok(v) = osc.parse::() { - one_piece_cost = v; - } - } else if let Some(sc) = row.get("sell_cost") { - if let Ok(v) = sc.parse::() { - one_piece_cost = v; - } - } + Ok(cumulative_tax_percent) + } + + fn sell_single_inventory_item( + &mut self, + conn: &mut DbConnection, + item: &InventoryItem, + ) -> Result<(), DbError> { + if item.quantity <= 0 { + conn.execute("remove_inventory", &[&item.id])?; + return Ok(()); } - // cents-based arithmetic + // compute piece price and full sell price + let piece_price = Self::compute_piece_sell_price(item); + let sell_price = piece_price * item.quantity as f64; + + let one_piece_cost = Self::resolve_one_piece_cost(conn, item.product_id, item.sell_cost)?; + let cumulative_tax_percent = Self::get_cumulative_tax_percent(conn, item.branch_id, item.user_id)?; + let revenue_cents = (sell_price * 100.0).round() as i64; let cost_cents = (one_piece_cost * item.quantity as f64 * 100.0).round() as i64; let profit_cents = (revenue_cents - cost_cents).max(0); @@ -1156,66 +890,22 @@ impl DirectorWorker { return Ok(0); } - // Regionale worth_percent-Werte für dieses Produkt laden - conn.prepare( - "get_region_worth_for_product", - QUERY_GET_REGION_WORTH_FOR_PRODUCT, - )?; - let rows = conn.execute( - "get_region_worth_for_product", - &[&falukant_user_id, &item.product_id], - )?; - - if rows.is_empty() { - return Ok(0); - } - - let mut worth_by_region: HashMap = HashMap::new(); - for row in rows { - let region_id = row - .get("region_id") - .and_then(|v| v.parse::().ok()) - .unwrap_or(-1); - let percent = row - .get("worth_percent") - .and_then(|v| v.parse::().ok()) - .unwrap_or(100.0); - - if region_id >= 0 { - worth_by_region.insert(region_id, percent); - } - } - + // Load worth_percent by region for this product + let worth_by_region = Self::get_worth_by_region(conn, falukant_user_id, item.product_id)?; if worth_by_region.is_empty() { - eprintln!( - "[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden", - item.product_id - ); + eprintln!("[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden", item.product_id); return Ok(0); } eprintln!( "[DirectorWorker] Gefundene Regionen für Produkt {}: {} Regionen", - item.product_id, worth_by_region.len() + item.product_id, + worth_by_region.len() ); - // Lokalen Stückpreis berechnen (neue Preisberechnung) - let local_percent = worth_by_region - .get(&item.region_id) - .copied() - .unwrap_or(100.0); - - // 1. Basispreis = product.sellCost * (worthPercent / 100) - let local_base_price = item.sell_cost * (local_percent / 100.0); - - // 2. min = basePrice * 0.6, max = basePrice - let local_min_price = local_base_price * 0.6; - let local_max_price = local_base_price; - - // 3. price = min + (max - min) * (knowledgeFactor / 100) - let knowledge_factor = item.quality as f64; - let local_piece_price = local_min_price + (local_max_price - local_min_price) * (knowledge_factor / 100.0); - + // Compute local piece price + let local_percent = worth_by_region.get(&item.region_id).copied().unwrap_or(100.0); + let local_piece_price = Self::compute_piece_price_for_percent(item, local_percent); eprintln!( "[DirectorWorker] Lokaler Preis für Produkt {}: {:.2} (worth_percent={:.2}, quality={})", item.product_id, local_piece_price, local_percent, item.quality @@ -1232,18 +922,7 @@ impl DirectorWorker { continue; } - // Remote-Stückpreis berechnen (neue Preisberechnung) - // 1. Basispreis = product.sellCost * (worthPercent / 100) - let remote_base_price = item.sell_cost * (remote_percent / 100.0); - - // 2. min = basePrice * 0.6, max = basePrice - let remote_min_price = remote_base_price * 0.6; - let remote_max_price = remote_base_price; - - // 3. price = min + (max - min) * (knowledgeFactor / 100) - let knowledge_factor = item.quality as f64; - let remote_piece_price = remote_min_price + (remote_max_price - remote_min_price) * (knowledge_factor / 100.0); - + let remote_piece_price = Self::compute_piece_price_for_percent(item, remote_percent); let delta_per_unit = remote_piece_price - local_piece_price; eprintln!( "[DirectorWorker] Region {}: Preis {:.2}, Delta {:.2}", @@ -1280,10 +959,7 @@ impl DirectorWorker { } // Maximale transportierbare Menge anhand der Kapazität ermitteln - let mut max_capacity: i32 = 0; - for v in &vehicles { - max_capacity = max_capacity.saturating_add(v.capacity); - } + let max_capacity = Self::calc_max_capacity(&vehicles); if max_capacity <= 0 { continue; @@ -1295,15 +971,7 @@ impl DirectorWorker { } let extra_revenue = delta_per_unit * qty as f64; - let total_value = remote_piece_price * qty as f64; - let transport_cost = total_value * 0.01_f64; - // Kostenformel: max(0.01, totalValue * 0.01) - let transport_cost = if transport_cost < 0.01 { - 0.01 - } else { - transport_cost - }; - + let transport_cost = Self::calc_transport_cost(remote_piece_price, qty); let net_gain = extra_revenue - transport_cost; eprintln!( "[DirectorWorker] Region {}: extra_revenue={:.2}, transport_cost={:.2}, net_gain={:.2}, qty={}", @@ -1340,50 +1008,12 @@ impl DirectorWorker { return Ok(0); } - // Nochmals verfügbare Transportmittel für die gewählte Route laden - let vehicles = Self::get_transport_vehicles_for_route( - conn, - falukant_user_id, - item.region_id, - target_region, - )?; - - if vehicles.is_empty() { - return Ok(0); - } - - // Transporte anlegen, begrenzt durch best_quantity und Kapazitäten - conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?; - - let mut remaining = best_quantity; - for v in &vehicles { - if remaining <= 0 { - break; - } - let size = std::cmp::min(remaining, v.capacity); - if size <= 0 { - continue; - } - - conn.execute( - "insert_transport", - &[ - &item.region_id, - &target_region, - &item.product_id, - &size, - &v.id, - ], - )?; - - remaining -= size; - } - - let shipped = best_quantity - remaining.max(0); + // Build and insert transports for chosen route + let shipped = Self::insert_transports_for_route(conn, item, target_region, best_quantity)?; // Inventar sofort reduzieren, nachdem Transporte erfolgreich angelegt wurden // Dies stellt sicher, dass Inventar und Transporte immer konsistent sind - if shipped > 0 { + if shipped > 0 { if shipped >= item.quantity { // Alles wurde in Transporte umgewandelt, Inventar komplett entfernen conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?; @@ -1411,15 +1041,8 @@ impl DirectorWorker { source_region: i32, target_region: i32, ) -> Result, DbError> { - // Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren - const QUERY_COUNT_VEHICLES_IN_REGION: &str = r#" - SELECT COUNT(*) AS count - FROM falukant_data.vehicle v - WHERE v.falukant_user_id = $1 - AND v.region_id = $2; - "#; - - conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?; + // Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren + conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?; let vehicle_count_rows = conn.execute( "count_vehicles_in_region", &[&falukant_user_id, &source_region], @@ -1436,15 +1059,8 @@ impl DirectorWorker { source_region, falukant_user_id, vehicle_count ); - // Debug: Prüfe, ob eine Route existiert - const QUERY_CHECK_ROUTE: &str = r#" - SELECT COUNT(*) AS count - FROM falukant_data.region_distance rd - WHERE (rd.source_region_id = $1 AND rd.target_region_id = $2) - OR (rd.source_region_id = $2 AND rd.target_region_id = $1); - "#; - - conn.prepare("check_route", QUERY_CHECK_ROUTE)?; + // Debug: Prüfe, ob eine Route existiert + conn.prepare("check_route", QUERY_CHECK_ROUTE)?; let route_rows = conn.execute( "check_route", &[&source_region, &target_region], @@ -1494,6 +1110,55 @@ impl DirectorWorker { Ok(result) } + // Helper: load worth_percent values for a product across all regions of a user's branches + fn get_worth_by_region(conn: &mut DbConnection, falukant_user_id: i32, product_id: i32) -> Result, DbError> { + conn.prepare("get_region_worth_for_product", QUERY_GET_REGION_WORTH_FOR_PRODUCT)?; + let rows = conn.execute("get_region_worth_for_product", &[&falukant_user_id, &product_id])?; + let mut map = HashMap::new(); + for row in rows { + if let Some(rid) = row.get("region_id").and_then(|v| v.parse::().ok()) { + let percent = row.get("worth_percent").and_then(|v| v.parse::().ok()).unwrap_or(100.0); + map.insert(rid, percent); + } + } + Ok(map) + } + + // Helper: compute piece price for an arbitrary worth_percent + fn compute_piece_price_for_percent(item: &InventoryItem, percent: f64) -> f64 { + let base_price = item.sell_cost * (percent / 100.0); + let min_price = base_price * 0.6; + let max_price = base_price; + let knowledge_factor = item.quality as f64; + min_price + (max_price - min_price) * (knowledge_factor / 100.0) + } + + fn calc_max_capacity(vehicles: &[TransportVehicle]) -> i32 { + vehicles.iter().fold(0i32, |acc, v| acc.saturating_add(v.capacity)) + } + + fn calc_transport_cost(remote_piece_price: f64, qty: i32) -> f64 { + let total_value = remote_piece_price * qty as f64; + let transport_cost = total_value * 0.01_f64; + if transport_cost < 0.01 { 0.01 } else { transport_cost } + } + + fn insert_transports_for_route(conn: &mut DbConnection, item: &InventoryItem, target_region: i32, desired: i32) -> Result { + let vehicles = Self::get_transport_vehicles_for_route(conn, item.user_id, item.region_id, target_region)?; + if vehicles.is_empty() { return Ok(0); } + + conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?; + let mut remaining = desired; + for v in &vehicles { + if remaining <= 0 { break; } + let size = std::cmp::min(remaining, v.capacity); + if size <= 0 { continue; } + conn.execute("insert_transport", &[&item.region_id, &target_region, &item.product_id, &size, &v.id])?; + remaining -= size; + } + Ok(desired - remaining.max(0)) + } + /// Plant leere Transporte, um Fahrzeuge zurückzuholen, wenn: /// - Keine Transportmittel im aktuellen Branch vorhanden sind /// - Aber bessere Verkaufspreise in anderen Branches existieren @@ -1504,14 +1169,8 @@ impl DirectorWorker { falukant_user_id: i32, current_branch_id: i32, ) -> Result<(), DbError> { - // Aktuelle Branch-Region ermitteln - const QUERY_GET_BRANCH_REGION: &str = r#" - SELECT region_id - FROM falukant_data.branch - WHERE id = $1; - "#; - - conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?; + // Aktuelle Branch-Region ermitteln + conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?; let branch_rows = conn.execute("get_branch_region", &[¤t_branch_id])?; let current_region_id = match branch_rows.into_iter().next() { @@ -1585,14 +1244,6 @@ impl DirectorWorker { // Berechne Preisvorteil (vereinfacht: verwende worth_percent-Differenz) // Hole worth_percent für beide Regionen (für ein beliebiges Produkt) let mut price_delta = 0.0; - const QUERY_GET_AVERAGE_WORTH: &str = r#" - SELECT - AVG(CASE WHEN region_id = $1 THEN worth_percent ELSE NULL END) AS current_worth, - AVG(CASE WHEN region_id = $2 THEN worth_percent ELSE NULL END) AS target_worth - FROM falukant_data.town_product_worth - WHERE region_id IN ($1, $2); - "#; - conn.prepare("get_average_worth", QUERY_GET_AVERAGE_WORTH)?; let worth_rows = conn.execute( "get_average_worth", @@ -1667,12 +1318,6 @@ impl DirectorWorker { inventory_id: i32, new_quantity: i32, ) -> Result<(), DbError> { - const QUERY_UPDATE_INVENTORY_QTY: &str = r#" - UPDATE falukant_data.inventory - SET quantity = $2 - WHERE id = $1; - "#; - conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?; conn.execute("update_inventory_qty", &[&inventory_id, &new_quantity])?; diff --git a/src/worker/events.rs b/src/worker/events.rs index e576e3e..f886d82 100644 --- a/src/worker/events.rs +++ b/src/worker/events.rs @@ -8,6 +8,15 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; +use crate::worker::sql::{ + QUERY_GET_RANDOM_USER, + QUERY_GET_RANDOM_INFANT, + QUERY_GET_RANDOM_CITY, + QUERY_GET_AFFECTED_USERS, + QUERY_INSERT_NOTIFICATION, + QUERY_GET_MONEY, + QUERY_UPDATE_MONEY, +}; /// Typisierung von Ereignissen #[derive(Debug, Clone, Copy, PartialEq)] @@ -420,15 +429,7 @@ impl EventsWorker { return Self::trigger_sudden_infant_death(pool, broker, event, rng); } - // Hole einen zufälligen aktiven Spieler - const QUERY_GET_RANDOM_USER: &str = r#" - SELECT id - FROM falukant_data.falukant_user - ORDER BY RANDOM() - LIMIT 1; - "#; - - conn.prepare("get_random_user", QUERY_GET_RANDOM_USER)?; + conn.prepare("get_random_user", QUERY_GET_RANDOM_USER)?; let rows = conn.execute("get_random_user", &[])?; let user_id: Option = rows @@ -604,20 +605,7 @@ impl EventsWorker { // Finde ein zufälliges Kind unter 2 Jahren // Maximalalter: 730 Tage (2 Jahre) - festgelegt in der WHERE-Klausel unten - const QUERY_GET_RANDOM_INFANT: &str = r#" - SELECT - c.id AS character_id, - c.user_id, - CURRENT_DATE - c.birthdate::date AS age_days - FROM falukant_data."character" c - WHERE c.user_id IS NOT NULL - AND c.health > 0 - AND CURRENT_DATE - c.birthdate::date <= 730 -- Maximalalter: 2 Jahre (730 Tage) - ORDER BY RANDOM() - LIMIT 1; - "#; - - conn.prepare("get_random_infant", QUERY_GET_RANDOM_INFANT)?; + conn.prepare("get_random_infant", QUERY_GET_RANDOM_INFANT)?; let rows = conn.execute("get_random_infant", &[])?; let character_id: Option = rows @@ -712,16 +700,7 @@ impl EventsWorker { .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; // Hole eine zufällige Stadt-Region - const QUERY_GET_RANDOM_CITY: &str = r#" - SELECT r.id AS region_id - FROM falukant_data.region r - JOIN falukant_type.region tr ON r.region_type_id = tr.id - WHERE tr.label_tr = 'city' - ORDER BY RANDOM() - LIMIT 1; - "#; - - conn.prepare("get_random_city", QUERY_GET_RANDOM_CITY)?; + conn.prepare("get_random_city", QUERY_GET_RANDOM_CITY)?; let rows = conn.execute("get_random_city", &[])?; let region_id: Option = rows @@ -913,14 +892,7 @@ impl EventsWorker { } // Finde alle betroffenen User in dieser Region (User mit Branches) - const QUERY_GET_AFFECTED_USERS: &str = r#" - SELECT DISTINCT b.falukant_user_id AS user_id - FROM falukant_data.branch b - WHERE b.region_id = $1 - AND b.falukant_user_id IS NOT NULL; - "#; - - conn.prepare("get_affected_users", QUERY_GET_AFFECTED_USERS)?; + conn.prepare("get_affected_users", QUERY_GET_AFFECTED_USERS)?; let user_rows = conn.execute("get_affected_users", &[®ion_id])?; // Sende Benachrichtigung an jeden betroffenen User einzeln @@ -976,13 +948,7 @@ impl EventsWorker { percent_change: f64, ) -> Result { // Hole aktuelles Geld - const QUERY_GET_MONEY: &str = r#" - SELECT money - FROM falukant_data.falukant_user - WHERE id = $1; - "#; - - conn.prepare("get_money", QUERY_GET_MONEY)?; + conn.prepare("get_money", QUERY_GET_MONEY)?; let rows = conn.execute("get_money", &[&user_id])?; let current_money: Option = rows @@ -1002,12 +968,8 @@ impl EventsWorker { let action = format!("Zufallsereignis: Geldänderung {:.2}%", percent_change); // Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection - const QUERY_UPDATE_MONEY: &str = r#" - SELECT falukant_data.update_money($1, $2, $3); - "#; - - conn.prepare("update_money_event", QUERY_UPDATE_MONEY)?; - let _ = conn.execute("update_money_event", &[&user_id, &change, &action])?; + conn.prepare("update_money_event", QUERY_UPDATE_MONEY)?; + let _ = conn.execute("update_money_event", &[&user_id, &change, &action])?; // Best-effort money_history insert for UI/history visibility. let money_str = format!("{:.2}", change); @@ -1811,17 +1773,7 @@ impl EventsWorker { .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; - const QUERY_INSERT_NOTIFICATION: &str = r#" - INSERT INTO falukant_log.notification ( - user_id, - tr, - shown, - created_at, - updated_at - ) VALUES ($1, $2, FALSE, NOW(), NOW()); - "#; - - conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?; + conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?; conn.execute("insert_notification", &[&user_id, &event_type])?; // falukantUpdateStatus diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 004bb6c..317d6ae 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -11,6 +11,7 @@ mod user_character; mod transport; mod weather; mod events; +mod sql; pub use base::Worker; pub use crate::db::ConnectionPool; diff --git a/src/worker/sql.rs b/src/worker/sql.rs new file mode 100644 index 0000000..9a3a3c4 --- /dev/null +++ b/src/worker/sql.rs @@ -0,0 +1,179 @@ +// Centralized SQL strings for workers. + +pub const QUERY_UPDATE_MONEY: &str = r#" +SELECT falukant_data.update_money($1, $2, $3); +"#; + +pub const QUERY_GET_MONEY: &str = r#" +SELECT money FROM falukant_data.falukant_user WHERE id = $1; +"#; + +pub const QUERY_GET_RANDOM_USER: &str = r#" +SELECT id FROM falukant_data.falukant_user ORDER BY RANDOM() LIMIT 1; +"#; + +pub const QUERY_GET_RANDOM_INFANT: &str = r#" +SELECT c.id AS character_id, c.user_id, CURRENT_DATE - c.birthdate::date AS age_days +FROM falukant_data."character" c +WHERE c.user_id IS NOT NULL AND c.health > 0 AND CURRENT_DATE - c.birthdate::date <= 730 +ORDER BY RANDOM() LIMIT 1; +"#; + +pub const QUERY_GET_RANDOM_CITY: &str = r#" +SELECT r.id AS region_id FROM falukant_data.region r JOIN falukant_type.region tr ON r.region_type_id = tr.id WHERE tr.label_tr = 'city' ORDER BY RANDOM() LIMIT 1; +"#; + +pub const QUERY_GET_AFFECTED_USERS: &str = r#" +SELECT DISTINCT b.falukant_user_id AS user_id FROM falukant_data.branch b WHERE b.region_id = $1 AND b.falukant_user_id IS NOT NULL; +"#; + +pub const QUERY_UPDATE_WEATHER: &str = r#" +WITH all_regions AS ( + SELECT DISTINCT r.id AS region_id FROM falukant_data.region r JOIN falukant_type.region tr ON r.region_type_id = tr.id WHERE tr.label_tr = 'city' +) +INSERT INTO falukant_data.weather (region_id, weather_type_id) +SELECT ar.region_id, (SELECT wt.id FROM falukant_type.weather wt ORDER BY random() + ar.region_id * 0 LIMIT 1) FROM all_regions ar +ON CONFLICT (region_id) DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id; +"#; + +pub const QUERY_INSERT_NOTIFICATION: &str = r#" +INSERT INTO falukant_log.notification (user_id, tr, shown, created_at, updated_at) VALUES ($1, $2, FALSE, NOW(), NOW()); +"#; + +pub const QUERY_GET_DIRECTORS: &str = r#" +SELECT d.may_produce, d.may_sell, d.may_start_transport, b.id AS branch_id, fu.id AS falukantUserId, d.id +FROM falukant_data.director d +JOIN falukant_data.falukant_user fu ON fu.id = d.employer_user_id +JOIN falukant_data.character c ON c.id = d.director_character_id +JOIN falukant_data.branch b ON b.region_id = c.region_id AND b.falukant_user_id = fu.id +WHERE current_time BETWEEN '08:00:00' AND '17:00:00'; +"#; + +pub const QUERY_GET_BEST_PRODUCTION: &str = r#" +SELECT fdu.id falukant_user_id, CAST(fdu.money AS text) AS money, fdu.certificate, ftp.id product_id, ftp.label_tr, fdb.region_id, +(SELECT SUM(quantity) FROM falukant_data.stock fds WHERE fds.branch_id = fdb.id) AS stock_size, +COALESCE((SELECT SUM(COALESCE(fdi.quantity, 0)) FROM falukant_data.stock fds JOIN falukant_data.inventory fdi ON fdi.stock_id = fds.id WHERE fds.branch_id = fdb.id), 0) AS used_in_stock, +(ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category) / (300.0 * ftp.production_time) AS worth, +fdb.id AS branch_id, (SELECT COUNT(id) FROM falukant_data.production WHERE branch_id = fdb.id) AS running_productions, +COALESCE((SELECT SUM(COALESCE(fdp.quantity, 0)) quantity FROM falukant_data.production fdp WHERE fdp.branch_id = fdb.id), 0) AS running_productions_quantity +FROM falukant_data.director fdd +JOIN falukant_data.character fdc ON fdc.id = fdd.director_character_id +JOIN falukant_data.falukant_user fdu ON fdd.employer_user_id = fdu.id +JOIN falukant_data.character user_character ON user_character.user_id = fdu.id +JOIN falukant_data.branch fdb ON fdb.falukant_user_id = fdu.id AND fdb.region_id = fdc.region_id +JOIN falukant_data.town_product_worth fdtpw ON fdtpw.region_id = fdb.region_id +JOIN falukant_data.knowledge fdk_character ON fdk_character.product_id = fdtpw.product_id AND fdk_character.character_id = user_character.id +JOIN falukant_data.knowledge fdk_director ON fdk_director.product_id = fdtpw.product_id AND fdk_director.character_id = fdd.director_character_id +JOIN falukant_type.product ftp ON ftp.id = fdtpw.product_id AND ftp.category <= fdu.certificate +WHERE fdd.id = $1 AND fdb.id = $2 ORDER BY worth DESC LIMIT 1; +"#; + +pub const QUERY_INSERT_PRODUCTION: &str = r#" +INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id) VALUES ($1, $2, $3, (SELECT weather_type_id FROM falukant_data.weather WHERE region_id = $4)); +"#; + +pub const QUERY_GET_BRANCH_CAPACITY: &str = r#" +SELECT (SELECT SUM(quantity) FROM falukant_data.stock fds WHERE fds.branch_id = $1) AS stock_size, +COALESCE((SELECT SUM(COALESCE(fdi.quantity, 0)) FROM falukant_data.stock fds JOIN falukant_data.inventory fdi ON fdi.stock_id = fds.id WHERE fds.branch_id = $1), 0) AS used_in_stock, +(SELECT COUNT(id) FROM falukant_data.production WHERE branch_id = $1) AS running_productions, +COALESCE((SELECT SUM(COALESCE(fdp.quantity, 0)) quantity FROM falukant_data.production fdp WHERE fdp.branch_id = $1), 0) AS running_productions_quantity; +"#; + +pub const QUERY_GET_INVENTORY: &str = r#" +SELECT i.id, i.product_id, i.quantity, i.quality, p.sell_cost, fu.id AS user_id, b.region_id, b.id AS branch_id, COALESCE(tpw.worth_percent, 100.0) AS worth_percent +FROM falukant_data.inventory i +JOIN falukant_data.stock s ON s.id = i.stock_id +JOIN falukant_data.branch b ON b.id = s.branch_id +JOIN falukant_data.falukant_user fu ON fu.id = b.falukant_user_id +JOIN falukant_data.director d ON d.employer_user_id = fu.id +JOIN falukant_type.product p ON p.id = i.product_id +LEFT JOIN falukant_data.town_product_worth tpw ON tpw.region_id = b.region_id AND tpw.product_id = i.product_id +WHERE d.id = $1 AND b.id = $2; +"#; + +pub const QUERY_REMOVE_INVENTORY: &str = r#" +DELETE FROM falukant_data.inventory WHERE id = $1; +"#; + +pub const QUERY_ADD_SELL_LOG: &str = r#" +INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id) VALUES ($1, $2, $3, $4) +ON CONFLICT (region_id, product_id, seller_id) DO UPDATE SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity; +"#; + +pub const QUERY_GET_REGION_WORTH_FOR_PRODUCT: &str = r#" +SELECT tpw.region_id, tpw.product_id, tpw.worth_percent FROM falukant_data.town_product_worth tpw JOIN falukant_data.branch b ON b.region_id = tpw.region_id WHERE b.falukant_user_id = $1 AND tpw.product_id = $2; +"#; + +pub const QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE: &str = r#" +SELECT v.id AS vehicle_id, vt.capacity AS capacity +FROM falukant_data.vehicle v +JOIN falukant_type.vehicle vt ON vt.id = v.vehicle_type_id +JOIN falukant_data.region_distance rd ON ((rd.source_region_id = v.region_id AND rd.target_region_id = $3) OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id)) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL) +WHERE v.falukant_user_id = $1 AND v.region_id = $2; +"#; + +pub const QUERY_INSERT_TRANSPORT: &str = r#" +INSERT INTO falukant_data.transport (source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, NOW(), NOW()); +"#; + +pub const QUERY_INSERT_EMPTY_TRANSPORT: &str = r#" +INSERT INTO falukant_data.transport (source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at) VALUES ($1, $2, NULL, 0, $3, NOW(), NOW()); +"#; + +pub const QUERY_GET_USER_BRANCHES: &str = r#" +SELECT DISTINCT b.region_id, b.id AS branch_id FROM falukant_data.branch b WHERE b.falukant_user_id = $1 AND b.region_id != $2; +"#; + +pub const QUERY_GET_FREE_VEHICLES_IN_REGION: &str = r#" +SELECT v.id AS vehicle_id, vt.capacity AS capacity FROM falukant_data.vehicle v JOIN falukant_type.vehicle vt ON vt.id = v.vehicle_type_id WHERE v.falukant_user_id = $1 AND v.region_id = $2 AND v.id NOT IN (SELECT DISTINCT t.vehicle_id FROM falukant_data.transport t WHERE t.vehicle_id IS NOT NULL); +"#; + +pub const QUERY_GET_SALARY_TO_PAY: &str = r#" +SELECT d.id, d.employer_user_id, d.income FROM falukant_data.director d WHERE DATE(d.last_salary_payout) < DATE(NOW()); +"#; + +pub const QUERY_SET_SALARY_PAYED: &str = r#" +UPDATE falukant_data.director SET last_salary_payout = NOW() WHERE id = $1; +"#; + +pub const QUERY_UPDATE_SATISFACTION: &str = r#" +WITH new_sats AS ( + SELECT d.id, ROUND(d.income::numeric / (c.title_of_nobility * POWER(1.231, AVG(k.knowledge) / 1.5)) * 100) AS new_satisfaction + FROM falukant_data.director d + JOIN falukant_data.knowledge k ON d.director_character_id = k.character_id + JOIN falukant_data.character c ON c.id = d.director_character_id + GROUP BY d.id, c.title_of_nobility, d.income +) +UPDATE falukant_data.director dir SET satisfaction = ns.new_satisfaction FROM new_sats ns WHERE dir.id = ns.id AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction RETURNING dir.employer_user_id; +"#; + +pub const QUERY_GET_DIRECTOR_USER: &str = r#" +SELECT fu.id AS falukant_user_id FROM falukant_data.director d JOIN falukant_data.falukant_user fu ON fu.id = d.employer_user_id WHERE d.id = $1 LIMIT 1; +"#; + +pub const QUERY_COUNT_VEHICLES_IN_BRANCH_REGION: &str = r#" +SELECT COUNT(v.id) AS cnt FROM falukant_data.vehicle v WHERE v.falukant_user_id = $1 AND v.region_id = $2; +"#; + +pub const QUERY_COUNT_VEHICLES_IN_REGION: &str = r#" +SELECT COUNT(v.id) AS cnt FROM falukant_data.vehicle v WHERE v.falukant_user_id = $1 AND v.region_id = $2; +"#; + +pub const QUERY_CHECK_ROUTE: &str = r#" +SELECT 1 FROM falukant_data.region_distance rd WHERE (rd.source_region_id = $1 AND rd.target_region_id = $2) OR (rd.source_region_id = $2 AND rd.target_region_id = $1) LIMIT 1; +"#; + +pub const QUERY_GET_BRANCH_REGION: &str = r#" +SELECT region_id FROM falukant_data.branch WHERE id = $1 LIMIT 1; +"#; + +pub const QUERY_GET_AVERAGE_WORTH: &str = r#" +SELECT AVG(tpw.worth_percent) AS avg_worth FROM falukant_data.town_product_worth tpw WHERE tpw.product_id = $1 AND tpw.region_id IN (SELECT region_id FROM falukant_data.branch WHERE falukant_user_id = $2); +"#; + +pub const QUERY_UPDATE_INVENTORY_QTY: &str = r#" +UPDATE falukant_data.inventory SET quantity = $1 WHERE id = $2; +"#; + + + diff --git a/src/worker/underground.rs b/src/worker/underground.rs index 31df821..dda8e04 100644 --- a/src/worker/underground.rs +++ b/src/worker/underground.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use std::time::Duration; use super::base::{BaseWorker, Worker, WorkerState}; +use crate::worker::sql::QUERY_UPDATE_MONEY; pub struct UndergroundWorker { base: BaseWorker, @@ -118,10 +119,7 @@ const Q_SELECT_FALUKANT_USER: &str = r#" LIMIT 1; "#; -// Query für Geldänderungen (lokale Variante von BaseWorker::change_falukant_user_money) -const QUERY_UPDATE_MONEY: &str = r#" - SELECT falukant_data.update_money($1, $2, $3); -"#; +// Use centralized QUERY_UPDATE_MONEY from src/worker/sql.rs impl UndergroundWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { diff --git a/src/worker/weather.rs b/src/worker/weather.rs index b0313e7..5dbe9db 100644 --- a/src/worker/weather.rs +++ b/src/worker/weather.rs @@ -5,39 +5,13 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; +use crate::worker::sql::QUERY_UPDATE_WEATHER; pub struct WeatherWorker { base: BaseWorker, } -// Query zum Aktualisieren des Wetters für alle Regionen -// Wählt für jede Region ein zufälliges Wetter aus allen verfügbaren Wettertypen aus -// Wichtig: Jede Region bekommt ein individuelles, zufälliges Wetter. Die vorherige -// Variante konnte vom Planner unter Umständen die Zufalls-Subquery nur einmal -// auswerten; mit LATERAL wird die Zufallsauswahl pro Region garantiert ausgeführt. -const QUERY_UPDATE_WEATHER: &str = r#" - WITH all_regions AS ( - SELECT DISTINCT r.id AS region_id - FROM falukant_data.region r - JOIN falukant_type.region tr ON r.region_type_id = tr.id - WHERE tr.label_tr = 'city' - ), - random_weather AS ( - SELECT ar.region_id, wt.id AS weather_type_id - FROM all_regions ar - CROSS JOIN LATERAL ( - SELECT wt.id - FROM falukant_type.weather wt - ORDER BY RANDOM() - LIMIT 1 - ) wt - ) - INSERT INTO falukant_data.weather (region_id, weather_type_id) - SELECT rw.region_id, rw.weather_type_id - FROM random_weather rw - ON CONFLICT (region_id) - DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id; -"#; +// Reuse QUERY_UPDATE_WEATHER from centralized SQL module impl WeatherWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { @@ -89,17 +63,17 @@ impl WeatherWorker { .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + // Run the prepared SQL that uses per-row RANDOM() trick conn.prepare("update_weather", QUERY_UPDATE_WEATHER)?; let updated_rows = conn.execute("update_weather", &[])?; eprintln!( - "[WeatherWorker] Wetter aktualisiert. {} Regionen betroffen.", + "[WeatherWorker] Wetter aktualisiert (per-row random). {} Regionen betroffen.", updated_rows.len() ); // Benachrichtige alle Clients über Wetteränderungen - let message = r#"{"event":"weather_updated"}"#; - broker.publish(message.to_string()); + broker.publish("{\"event\":\"weather_updated\"}".to_string()); Ok(()) }