Refactor SQL queries into centralized module

- Moved various SQL query strings from individual worker files into a new `sql.rs` module for better organization and reusability.
- Updated `events.rs`, `underground.rs`, and `weather.rs` to use the centralized SQL queries.
- Removed redundant query definitions from `events.rs`, `underground.rs`, and `weather.rs`.
This commit is contained in:
Torsten Schulz (local)
2025-12-12 15:18:30 +01:00
parent eab46f5cdc
commit a9d490ce38
7 changed files with 401 additions and 667 deletions

View File

@@ -1,4 +1,5 @@
use crate::db::{ConnectionPool, DbError};
use crate::worker::sql::{QUERY_UPDATE_MONEY, QUERY_GET_MONEY};
use crate::message_broker::MessageBroker;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
@@ -147,11 +148,7 @@ impl BaseWorker {
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection
const QUERY_UPDATE_MONEY: &str = r#"
SELECT falukant_data.update_money($1, $2, $3);
"#;
conn.prepare("update_money", QUERY_UPDATE_MONEY)?;
conn.prepare("update_money", QUERY_UPDATE_MONEY)?;
// Validate float to avoid passing NaN/inf which the postgres client
// may fail to serialize with an unclear error message.
@@ -165,9 +162,6 @@ impl BaseWorker {
// We must ensure the resulting money fits in numeric(10,2).
// numeric(10,2) max absolute value is < 10^8 (100_000_000) before rounding.
// Fetch current money for the user and clamp the delta if needed.
const QUERY_GET_MONEY: &str = r#"
SELECT money FROM falukant_data.falukant_user WHERE id = $1;
"#;
conn.prepare("get_money_for_clamp", QUERY_GET_MONEY)?;
let rows = conn.execute("get_money_for_clamp", &[&falukant_user_id])?;
@@ -178,13 +172,11 @@ impl BaseWorker {
.unwrap_or(0.0);
// compute tentative result
let tentative = current_money + money_change;
let tentative = current_money + money_change;
// numeric(10,2) allows values with absolute < 10^8 (100_000_000)
const MAX_ABS: f64 = 100_000_000.0 - 0.01; // leave room for scale
let _allowed = MAX_ABS - current_money;
let adjusted_money_change = if tentative >= MAX_ABS {
let clipped = MAX_ABS - current_money;
eprintln!(
@@ -203,19 +195,12 @@ impl BaseWorker {
money_change
};
// Keep only important clamp logging: when clipping occurs we log it above.
// Send exact types matching the DB function signature:
let uid_i32: i32 = falukant_user_id;
let money_str = format!("{:.2}", adjusted_money_change);
// Note: we intentionally avoid parameterized call due to serialization
// issues in this environment and instead execute a literal SQL below.
// Execute update; avoid noisy logging here.
// Use a literal SQL call because parameterized execution keeps failing
// with "error serializing parameter 1" in this environment.
// Note: we intentionally avoid parameterized call due to serialization
// issues in this environment and instead execute a literal SQL below.
fn escape_sql_literal(s: &str) -> String {
s.replace('\'', "''")
}

View File

@@ -7,6 +7,31 @@ use std::time::{Duration, Instant};
use crate::db::ConnectionPool;
use super::base::{BaseWorker, Worker, WorkerState, DEFAULT_TAX_PERCENT, DEFAULT_TREASURY_USER_ID};
use crate::worker::sql::{
QUERY_GET_DIRECTORS,
QUERY_GET_BEST_PRODUCTION,
QUERY_INSERT_PRODUCTION,
QUERY_GET_BRANCH_CAPACITY,
QUERY_GET_INVENTORY,
QUERY_REMOVE_INVENTORY,
QUERY_ADD_SELL_LOG,
QUERY_GET_REGION_WORTH_FOR_PRODUCT,
QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE,
QUERY_INSERT_TRANSPORT,
QUERY_INSERT_EMPTY_TRANSPORT,
QUERY_GET_USER_BRANCHES,
QUERY_GET_FREE_VEHICLES_IN_REGION,
QUERY_GET_SALARY_TO_PAY,
QUERY_SET_SALARY_PAYED,
QUERY_UPDATE_SATISFACTION,
QUERY_GET_DIRECTOR_USER,
QUERY_COUNT_VEHICLES_IN_BRANCH_REGION,
QUERY_COUNT_VEHICLES_IN_REGION,
QUERY_CHECK_ROUTE,
QUERY_GET_BRANCH_REGION,
QUERY_GET_AVERAGE_WORTH,
QUERY_UPDATE_INVENTORY_QTY,
};
#[derive(Debug, Clone)]
struct Director {
@@ -65,279 +90,10 @@ pub struct DirectorWorker {
// Maximale Anzahl paralleler Produktionen pro Branch
const MAX_PARALLEL_PRODUCTIONS: i32 = 2;
// SQL-Queries (1:1 aus director_worker.h)
const QUERY_GET_DIRECTORS: &str = r#"
SELECT
d.may_produce,
d.may_sell,
d.may_start_transport,
b.id AS branch_id,
fu.id AS falukantUserId,
d.id
FROM falukant_data.director d
JOIN falukant_data.falukant_user fu
ON fu.id = d.employer_user_id
JOIN falukant_data.character c
ON c.id = d.director_character_id
JOIN falukant_data.branch b
ON b.region_id = c.region_id
AND b.falukant_user_id = fu.id
WHERE current_time BETWEEN '08:00:00' AND '17:00:00';
"#;
const QUERY_GET_BEST_PRODUCTION: &str = r#"
SELECT
fdu.id falukant_user_id,
-- Geld explizit als Text casten, damit das Mapping im Rust-Code
-- zuverlässig funktioniert (unabhängig vom nativen DB-Typ wie `money`).
CAST(fdu.money AS text) AS money,
fdu.certificate,
ftp.id product_id,
ftp.label_tr,
fdb.region_id,
(
SELECT SUM(quantity)
FROM falukant_data.stock fds
WHERE fds.branch_id = fdb.id
) AS stock_size,
COALESCE((
SELECT SUM(COALESCE(fdi.quantity, 0))
FROM falukant_data.stock fds
JOIN falukant_data.inventory fdi
ON fdi.stock_id = fds.id
WHERE fds.branch_id = fdb.id
), 0) AS used_in_stock,
(ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category)
/ (300.0 * ftp.production_time) AS worth,
fdb.id AS branch_id,
(
SELECT COUNT(id)
FROM falukant_data.production
WHERE branch_id = fdb.id
) AS running_productions,
COALESCE((
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
FROM falukant_data.production fdp
WHERE fdp.branch_id = fdb.id
), 0) AS running_productions_quantity
FROM falukant_data.director fdd
JOIN falukant_data.character fdc
ON fdc.id = fdd.director_character_id
JOIN falukant_data.falukant_user fdu
ON fdd.employer_user_id = fdu.id
JOIN falukant_data.character user_character
ON user_character.user_id = fdu.id
JOIN falukant_data.branch fdb
ON fdb.falukant_user_id = fdu.id
AND fdb.region_id = fdc.region_id
JOIN falukant_data.town_product_worth fdtpw
ON fdtpw.region_id = fdb.region_id
JOIN falukant_data.knowledge fdk_character
ON fdk_character.product_id = fdtpw.product_id
AND fdk_character.character_id = user_character.id
JOIN falukant_data.knowledge fdk_director
ON fdk_director.product_id = fdtpw.product_id
AND fdk_director.character_id = fdd.director_character_id
JOIN falukant_type.product ftp
ON ftp.id = fdtpw.product_id
AND ftp.category <= fdu.certificate
WHERE fdd.id = $1
AND fdb.id = $2
ORDER BY worth DESC
LIMIT 1;
"#;
const QUERY_INSERT_PRODUCTION: &str = r#"
INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id)
VALUES ($1, $2, $3, (
SELECT weather_type_id
FROM falukant_data.weather
WHERE region_id = $4
));
"#;
// Query zum Abfragen der aktuellen Lager- und Produktionswerte für einen Branch
// (ohne den kompletten Produktionsplan neu zu berechnen)
const QUERY_GET_BRANCH_CAPACITY: &str = r#"
SELECT
(
SELECT SUM(quantity)
FROM falukant_data.stock fds
WHERE fds.branch_id = $1
) AS stock_size,
COALESCE((
SELECT SUM(COALESCE(fdi.quantity, 0))
FROM falukant_data.stock fds
JOIN falukant_data.inventory fdi
ON fdi.stock_id = fds.id
WHERE fds.branch_id = $1
), 0) AS used_in_stock,
(
SELECT COUNT(id)
FROM falukant_data.production
WHERE branch_id = $1
) AS running_productions,
COALESCE((
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
FROM falukant_data.production fdp
WHERE fdp.branch_id = $1
), 0) AS running_productions_quantity;
"#;
const QUERY_GET_INVENTORY: &str = r#"
SELECT
i.id,
i.product_id,
i.quantity,
i.quality,
p.sell_cost,
fu.id AS user_id,
b.region_id,
b.id AS branch_id,
COALESCE(tpw.worth_percent, 100.0) AS worth_percent
FROM falukant_data.inventory i
JOIN falukant_data.stock s
ON s.id = i.stock_id
JOIN falukant_data.branch b
ON b.id = s.branch_id
JOIN falukant_data.falukant_user fu
ON fu.id = b.falukant_user_id
JOIN falukant_data.director d
ON d.employer_user_id = fu.id
JOIN falukant_type.product p
ON p.id = i.product_id
LEFT JOIN falukant_data.town_product_worth tpw
ON tpw.region_id = b.region_id
AND tpw.product_id = i.product_id
WHERE d.id = $1
AND b.id = $2;
"#;
const QUERY_REMOVE_INVENTORY: &str = r#"
DELETE FROM falukant_data.inventory
WHERE id = $1;
"#;
const QUERY_ADD_SELL_LOG: &str = r#"
INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (region_id, product_id, seller_id)
DO UPDATE
SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity;
"#;
// Regionale Verkaufswürdigkeit pro Produkt/Region für alle Branches eines Users
const QUERY_GET_REGION_WORTH_FOR_PRODUCT: &str = r#"
SELECT
tpw.region_id,
tpw.product_id,
tpw.worth_percent
FROM falukant_data.town_product_worth tpw
JOIN falukant_data.branch b
ON b.region_id = tpw.region_id
WHERE b.falukant_user_id = $1
AND tpw.product_id = $2;
"#;
// ...existing code...
// Verfügbare Transportmittel für eine Route (source_region -> target_region)
const QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE: &str = r#"
SELECT
v.id AS vehicle_id,
vt.capacity AS capacity
FROM falukant_data.vehicle v
JOIN falukant_type.vehicle vt
ON vt.id = v.vehicle_type_id
JOIN falukant_data.region_distance rd
ON (
(rd.source_region_id = v.region_id AND rd.target_region_id = $3)
OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id)
)
AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
WHERE v.falukant_user_id = $1
AND v.region_id = $2;
"#;
// Transport-Eintrag anlegen
const QUERY_INSERT_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport
(source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW());
"#;
// Leere Transporte (product_id = NULL, size = 0) zum Zurückholen von Fahrzeugen
const QUERY_INSERT_EMPTY_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport
(source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at)
VALUES ($1, $2, NULL, 0, $3, NOW(), NOW());
"#;
// Alle Branches des Users mit ihren Regionen
const QUERY_GET_USER_BRANCHES: &str = r#"
SELECT DISTINCT
b.region_id,
b.id AS branch_id
FROM falukant_data.branch b
WHERE b.falukant_user_id = $1
AND b.region_id != $2;
"#;
// Freie Transportmittel in einer Region (nicht in aktiven Transporten)
// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert
const QUERY_GET_FREE_VEHICLES_IN_REGION: &str = r#"
SELECT
v.id AS vehicle_id,
vt.capacity AS capacity
FROM falukant_data.vehicle v
JOIN falukant_type.vehicle vt
ON vt.id = v.vehicle_type_id
WHERE v.falukant_user_id = $1
AND v.region_id = $2
AND v.id NOT IN (
SELECT DISTINCT t.vehicle_id
FROM falukant_data.transport t
WHERE t.vehicle_id IS NOT NULL
);
"#;
const QUERY_GET_SALARY_TO_PAY: &str = r#"
SELECT d.id, d.employer_user_id, d.income
FROM falukant_data.director d
WHERE DATE(d.last_salary_payout) < DATE(NOW());
"#;
const QUERY_SET_SALARY_PAYED: &str = r#"
UPDATE falukant_data.director
SET last_salary_payout = NOW()
WHERE id = $1;
"#;
const QUERY_UPDATE_SATISFACTION: &str = r#"
WITH new_sats AS (
SELECT
d.id,
ROUND(
d.income::numeric
/
(
c.title_of_nobility
* POWER(1.231, AVG(k.knowledge) / 1.5)
)
* 100
) AS new_satisfaction
FROM falukant_data.director d
JOIN falukant_data.knowledge k
ON d.director_character_id = k.character_id
JOIN falukant_data.character c
ON c.id = d.director_character_id
GROUP BY d.id, c.title_of_nobility, d.income
)
UPDATE falukant_data.director dir
SET satisfaction = ns.new_satisfaction
FROM new_sats ns
WHERE dir.id = ns.id
AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction
RETURNING dir.employer_user_id;
"#;
// ...existing code...
impl DirectorWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -614,41 +370,11 @@ impl DirectorWorker {
conn: &mut DbConnection,
plan: &ProductionPlan,
) -> Result<(), DbError> {
// Freie Lagerkapazität: Gesamtbestand minus bereits belegter Bestand
// (Inventar) minus bereits eingeplante Produktionsmengen.
let free_capacity =
plan.stock_size - plan.used_in_stock - plan.running_productions_quantity;
let free_capacity = Self::calc_free_capacity(plan);
let one_piece_cost = Self::calc_one_piece_cost(plan);
let max_money_production = Self::calc_max_money_production(plan, one_piece_cost);
// Stückkosten monetär berechnen. Da money ein f64 ist, arbeiten wir hier ebenfalls
// mit Gleitkomma und runden erst am Ende auf eine ganze Stückzahl ab.
let one_piece_cost = (plan.certificate * 6) as f64;
let mut max_money_production: i32 = 0;
if one_piece_cost > 0.0 {
if plan.money > 0.0 {
// Anzahl Stück, die sich mit dem verfügbaren Geld finanzieren lassen
max_money_production = (plan.money / one_piece_cost).floor() as i32;
} else {
// Falls das Geld aus der DB unerwartet als 0 eingelesen wurde, aber
// eigentlich ausreichend Guthaben vorhanden ist (bekannter Migrationsfall),
// lassen wir die Geldbegrenzung vorläufig fallen und begrenzen nur über
// Lagerkapazität und Hard-Limit 100.
eprintln!(
"[DirectorWorker] Warnung: money=0 für falukant_user_id={}, \
verwende nur Lagerkapazität als Limit.",
plan.falukant_user_id
);
max_money_production = i32::MAX;
}
}
// Maximale Produktionsmenge begrenzen:
// - nie mehr als der freie Lagerplatz (`free_capacity`)
// - nie mehr als durch das verfügbare Geld finanzierbar
// - absolut maximal 100 Einheiten pro Produktion
let to_produce = free_capacity
.min(max_money_production)
.min(100)
.max(0);
let to_produce = free_capacity.min(max_money_production).min(100).max(0);
eprintln!(
"[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}",
@@ -711,6 +437,30 @@ impl DirectorWorker {
Ok(())
}
fn calc_free_capacity(plan: &ProductionPlan) -> i32 {
plan.stock_size - plan.used_in_stock - plan.running_productions_quantity
}
fn calc_one_piece_cost(plan: &ProductionPlan) -> f64 {
(plan.certificate * 6) as f64
}
fn calc_max_money_production(plan: &ProductionPlan, one_piece_cost: f64) -> i32 {
if one_piece_cost > 0.0 {
if plan.money > 0.0 {
(plan.money / one_piece_cost).floor() as i32
} else {
eprintln!(
"[DirectorWorker] Warnung: money=0 für falukant_user_id={}, verwende nur Lagerkapazität als Limit.",
plan.falukant_user_id
);
i32::MAX
}
} else {
0
}
}
fn start_transports_stub(&mut self, director: &Director) -> Result<(), DbError> {
self.base
.set_current_step("DirectorWorker: start_transports");
@@ -736,11 +486,6 @@ impl DirectorWorker {
// sein (Arbeitgeber des Directors).
let falukant_user_id = if items.is_empty() {
// Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln
const QUERY_GET_DIRECTOR_USER: &str = r#"
SELECT employer_user_id
FROM falukant_data.director
WHERE id = $1;
"#;
conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?;
let user_rows = conn.execute("get_director_user", &[&director.id])?;
user_rows
@@ -754,20 +499,6 @@ impl DirectorWorker {
// Prüfe, ob Transportmittel im aktuellen Branch vorhanden sind
// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert
const QUERY_COUNT_VEHICLES_IN_BRANCH_REGION: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.vehicle v
JOIN falukant_data.branch b
ON b.region_id = v.region_id
WHERE v.falukant_user_id = $1
AND b.id = $2
AND v.id NOT IN (
SELECT DISTINCT t.vehicle_id
FROM falukant_data.transport t
WHERE t.vehicle_id IS NOT NULL
);
"#;
conn.prepare("count_vehicles_in_branch", QUERY_COUNT_VEHICLES_IN_BRANCH_REGION)?;
let vehicle_count_rows = conn.execute(
"count_vehicles_in_branch",
@@ -976,45 +707,45 @@ impl DirectorWorker {
})
}
fn sell_single_inventory_item(
&mut self,
conn: &mut DbConnection,
item: &InventoryItem,
) -> Result<(), DbError> {
if item.quantity <= 0 {
conn.execute("remove_inventory", &[&item.id])?;
return Ok(());
}
// Neue Preisberechnung gemäß Spezifikation:
// 1. Basispreis = product.sellCost * (worthPercent / 100)
// Helper: compute piece sell price from item fields
fn compute_piece_sell_price(item: &InventoryItem) -> f64 {
let base_price = item.sell_cost * (item.worth_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let min_price = base_price * 0.6;
let max_price = base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
// knowledgeFactor ist hier item.quality
let knowledge_factor = item.quality as f64;
let piece_sell_price = min_price + (max_price - min_price) * (knowledge_factor / 100.0);
let sell_price = piece_sell_price * item.quantity as f64;
let min_price = base_price * 0.6;
let max_price = base_price;
let knowledge_factor = item.quality as f64;
min_price + (max_price - min_price) * (knowledge_factor / 100.0)
}
// Steuerberechnung: 1) Region ermitteln, 2) user offices, 3) cumulative tax (mit Befreiungen)
// Helper: get one_piece_cost from DB row fallback logic
fn resolve_one_piece_cost(conn: &mut DbConnection, product_id: i32, fallback: f64) -> Result<f64, DbError> {
conn.prepare("get_product_cost", "SELECT original_sell_cost, sell_cost FROM falukant_type.product WHERE id = $1")?;
let rows = conn.execute("get_product_cost", &[&product_id])?;
if let Some(row) = rows.get(0) {
if let Some(osc) = row.get("original_sell_cost") {
if let Ok(v) = osc.parse::<f64>() { return Ok(v); }
}
if let Some(sc) = row.get("sell_cost") {
if let Ok(v) = sc.parse::<f64>() { return Ok(v); }
}
}
Ok(fallback)
}
// Helper: determine cumulative tax percent for a branch/user
fn get_cumulative_tax_percent(conn: &mut DbConnection, branch_id: i32, user_id: i32) -> Result<f64, DbError> {
// Default
let mut cumulative_tax_percent = DEFAULT_TAX_PERCENT;
conn.prepare("get_branch_region", "SELECT region_id FROM falukant_data.branch WHERE id = $1;")?;
let branch_rows = conn.execute("get_branch_region", &[&item.branch_id])?;
let branch_rows = conn.execute("get_branch_region", &[&branch_id])?;
let branch_region_id: Option<i32> = branch_rows.get(0).and_then(|r| r.get("region_id")).and_then(|v| v.parse().ok());
if let Some(region_id) = branch_region_id {
// user offices
conn.prepare(
"get_user_offices",
"SELECT po.id AS office_id, pot.name AS office_name, po.region_id, rt.label_tr AS region_type FROM falukant_data.political_office po JOIN falukant_type.political_office_type pot ON pot.id = po.office_type_id JOIN falukant_data.region r ON r.id = po.region_id JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE po.holder_id = $1 AND (po.end_date IS NULL OR po.end_date > NOW());",
)?;
let offices = conn.execute("get_user_offices", &[&item.user_id])?;
let offices = conn.execute("get_user_offices", &[&user_id])?;
let mut exempt_types: Vec<String> = Vec::new();
let mut has_chancellor = false;
@@ -1032,52 +763,55 @@ impl DirectorWorker {
}
if has_chancellor {
cumulative_tax_percent = 0.0;
return Ok(0.0);
}
if exempt_types.is_empty() {
conn.prepare(
"cumulative_tax_no_exempt",
"WITH RECURSIVE ancestors AS (SELECT id, parent_id, COALESCE(tax_percent,0.0) AS tax_percent FROM falukant_data.region WHERE id = $1 UNION ALL SELECT r.id, r.parent_id, COALESCE(r.tax_percent,0.0) FROM falukant_data.region r JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;",
)?;
let res = conn.execute("cumulative_tax_no_exempt", &[&region_id])?;
if let Some(row) = res.get(0) {
if let Some(tp) = row.get("total_percent") {
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
}
}
} else {
if exempt_types.is_empty() {
conn.prepare(
"cumulative_tax_no_exempt",
"WITH RECURSIVE ancestors AS (SELECT id, parent_id, COALESCE(tax_percent,0.0) AS tax_percent FROM falukant_data.region WHERE id = $1 UNION ALL SELECT r.id, r.parent_id, COALESCE(r.tax_percent,0.0) FROM falukant_data.region r JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;",
)?;
let res = conn.execute("cumulative_tax_no_exempt", &[&region_id])?;
if let Some(row) = res.get(0) {
if let Some(tp) = row.get("total_percent") {
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
}
}
} else {
conn.prepare(
"cumulative_tax_with_exempt",
"WITH RECURSIVE ancestors AS (SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END AS tax_percent FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE r.id = $1 UNION ALL SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;",
)?;
let exempt_array: Vec<&str> = exempt_types.iter().map(|s| s.as_str()).collect();
let res = conn.execute("cumulative_tax_with_exempt", &[&region_id, &exempt_array])?;
if let Some(row) = res.get(0) {
if let Some(tp) = row.get("total_percent") {
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
}
conn.prepare(
"cumulative_tax_with_exempt",
"WITH RECURSIVE ancestors AS (SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END AS tax_percent FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE r.id = $1 UNION ALL SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;",
)?;
let exempt_array: Vec<&str> = exempt_types.iter().map(|s| s.as_str()).collect();
let res = conn.execute("cumulative_tax_with_exempt", &[&region_id, &exempt_array])?;
if let Some(row) = res.get(0) {
if let Some(tp) = row.get("total_percent") {
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
}
}
}
}
// Produktkosten (original_sell_cost fallback sell_cost)
conn.prepare("get_product_cost", "SELECT original_sell_cost, sell_cost FROM falukant_type.product WHERE id = $1")?;
let cost_rows = conn.execute("get_product_cost", &[&item.product_id])?;
let mut one_piece_cost = item.sell_cost;
if let Some(row) = cost_rows.get(0) {
if let Some(osc) = row.get("original_sell_cost") {
if let Ok(v) = osc.parse::<f64>() {
one_piece_cost = v;
}
} else if let Some(sc) = row.get("sell_cost") {
if let Ok(v) = sc.parse::<f64>() {
one_piece_cost = v;
}
}
Ok(cumulative_tax_percent)
}
fn sell_single_inventory_item(
&mut self,
conn: &mut DbConnection,
item: &InventoryItem,
) -> Result<(), DbError> {
if item.quantity <= 0 {
conn.execute("remove_inventory", &[&item.id])?;
return Ok(());
}
// cents-based arithmetic
// compute piece price and full sell price
let piece_price = Self::compute_piece_sell_price(item);
let sell_price = piece_price * item.quantity as f64;
let one_piece_cost = Self::resolve_one_piece_cost(conn, item.product_id, item.sell_cost)?;
let cumulative_tax_percent = Self::get_cumulative_tax_percent(conn, item.branch_id, item.user_id)?;
let revenue_cents = (sell_price * 100.0).round() as i64;
let cost_cents = (one_piece_cost * item.quantity as f64 * 100.0).round() as i64;
let profit_cents = (revenue_cents - cost_cents).max(0);
@@ -1156,66 +890,22 @@ impl DirectorWorker {
return Ok(0);
}
// Regionale worth_percent-Werte für dieses Produkt laden
conn.prepare(
"get_region_worth_for_product",
QUERY_GET_REGION_WORTH_FOR_PRODUCT,
)?;
let rows = conn.execute(
"get_region_worth_for_product",
&[&falukant_user_id, &item.product_id],
)?;
if rows.is_empty() {
return Ok(0);
}
let mut worth_by_region: HashMap<i32, f64> = HashMap::new();
for row in rows {
let region_id = row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
let percent = row
.get("worth_percent")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0);
if region_id >= 0 {
worth_by_region.insert(region_id, percent);
}
}
// Load worth_percent by region for this product
let worth_by_region = Self::get_worth_by_region(conn, falukant_user_id, item.product_id)?;
if worth_by_region.is_empty() {
eprintln!(
"[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden",
item.product_id
);
eprintln!("[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden", item.product_id);
return Ok(0);
}
eprintln!(
"[DirectorWorker] Gefundene Regionen für Produkt {}: {} Regionen",
item.product_id, worth_by_region.len()
item.product_id,
worth_by_region.len()
);
// Lokalen Stückpreis berechnen (neue Preisberechnung)
let local_percent = worth_by_region
.get(&item.region_id)
.copied()
.unwrap_or(100.0);
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let local_base_price = item.sell_cost * (local_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let local_min_price = local_base_price * 0.6;
let local_max_price = local_base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
let knowledge_factor = item.quality as f64;
let local_piece_price = local_min_price + (local_max_price - local_min_price) * (knowledge_factor / 100.0);
// Compute local piece price
let local_percent = worth_by_region.get(&item.region_id).copied().unwrap_or(100.0);
let local_piece_price = Self::compute_piece_price_for_percent(item, local_percent);
eprintln!(
"[DirectorWorker] Lokaler Preis für Produkt {}: {:.2} (worth_percent={:.2}, quality={})",
item.product_id, local_piece_price, local_percent, item.quality
@@ -1232,18 +922,7 @@ impl DirectorWorker {
continue;
}
// Remote-Stückpreis berechnen (neue Preisberechnung)
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let remote_base_price = item.sell_cost * (remote_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let remote_min_price = remote_base_price * 0.6;
let remote_max_price = remote_base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
let knowledge_factor = item.quality as f64;
let remote_piece_price = remote_min_price + (remote_max_price - remote_min_price) * (knowledge_factor / 100.0);
let remote_piece_price = Self::compute_piece_price_for_percent(item, remote_percent);
let delta_per_unit = remote_piece_price - local_piece_price;
eprintln!(
"[DirectorWorker] Region {}: Preis {:.2}, Delta {:.2}",
@@ -1280,10 +959,7 @@ impl DirectorWorker {
}
// Maximale transportierbare Menge anhand der Kapazität ermitteln
let mut max_capacity: i32 = 0;
for v in &vehicles {
max_capacity = max_capacity.saturating_add(v.capacity);
}
let max_capacity = Self::calc_max_capacity(&vehicles);
if max_capacity <= 0 {
continue;
@@ -1295,15 +971,7 @@ impl DirectorWorker {
}
let extra_revenue = delta_per_unit * qty as f64;
let total_value = remote_piece_price * qty as f64;
let transport_cost = total_value * 0.01_f64;
// Kostenformel: max(0.01, totalValue * 0.01)
let transport_cost = if transport_cost < 0.01 {
0.01
} else {
transport_cost
};
let transport_cost = Self::calc_transport_cost(remote_piece_price, qty);
let net_gain = extra_revenue - transport_cost;
eprintln!(
"[DirectorWorker] Region {}: extra_revenue={:.2}, transport_cost={:.2}, net_gain={:.2}, qty={}",
@@ -1340,50 +1008,12 @@ impl DirectorWorker {
return Ok(0);
}
// Nochmals verfügbare Transportmittel für die gewählte Route laden
let vehicles = Self::get_transport_vehicles_for_route(
conn,
falukant_user_id,
item.region_id,
target_region,
)?;
if vehicles.is_empty() {
return Ok(0);
}
// Transporte anlegen, begrenzt durch best_quantity und Kapazitäten
conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?;
let mut remaining = best_quantity;
for v in &vehicles {
if remaining <= 0 {
break;
}
let size = std::cmp::min(remaining, v.capacity);
if size <= 0 {
continue;
}
conn.execute(
"insert_transport",
&[
&item.region_id,
&target_region,
&item.product_id,
&size,
&v.id,
],
)?;
remaining -= size;
}
let shipped = best_quantity - remaining.max(0);
// Build and insert transports for chosen route
let shipped = Self::insert_transports_for_route(conn, item, target_region, best_quantity)?;
// Inventar sofort reduzieren, nachdem Transporte erfolgreich angelegt wurden
// Dies stellt sicher, dass Inventar und Transporte immer konsistent sind
if shipped > 0 {
if shipped > 0 {
if shipped >= item.quantity {
// Alles wurde in Transporte umgewandelt, Inventar komplett entfernen
conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?;
@@ -1411,15 +1041,8 @@ impl DirectorWorker {
source_region: i32,
target_region: i32,
) -> Result<Vec<TransportVehicle>, DbError> {
// Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren
const QUERY_COUNT_VEHICLES_IN_REGION: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.vehicle v
WHERE v.falukant_user_id = $1
AND v.region_id = $2;
"#;
conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?;
// Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren
conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?;
let vehicle_count_rows = conn.execute(
"count_vehicles_in_region",
&[&falukant_user_id, &source_region],
@@ -1436,15 +1059,8 @@ impl DirectorWorker {
source_region, falukant_user_id, vehicle_count
);
// Debug: Prüfe, ob eine Route existiert
const QUERY_CHECK_ROUTE: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.region_distance rd
WHERE (rd.source_region_id = $1 AND rd.target_region_id = $2)
OR (rd.source_region_id = $2 AND rd.target_region_id = $1);
"#;
conn.prepare("check_route", QUERY_CHECK_ROUTE)?;
// Debug: Prüfe, ob eine Route existiert
conn.prepare("check_route", QUERY_CHECK_ROUTE)?;
let route_rows = conn.execute(
"check_route",
&[&source_region, &target_region],
@@ -1494,6 +1110,55 @@ impl DirectorWorker {
Ok(result)
}
// Helper: load worth_percent values for a product across all regions of a user's branches
fn get_worth_by_region(conn: &mut DbConnection, falukant_user_id: i32, product_id: i32) -> Result<HashMap<i32,f64>, DbError> {
conn.prepare("get_region_worth_for_product", QUERY_GET_REGION_WORTH_FOR_PRODUCT)?;
let rows = conn.execute("get_region_worth_for_product", &[&falukant_user_id, &product_id])?;
let mut map = HashMap::new();
for row in rows {
if let Some(rid) = row.get("region_id").and_then(|v| v.parse::<i32>().ok()) {
let percent = row.get("worth_percent").and_then(|v| v.parse::<f64>().ok()).unwrap_or(100.0);
map.insert(rid, percent);
}
}
Ok(map)
}
// Helper: compute piece price for an arbitrary worth_percent
fn compute_piece_price_for_percent(item: &InventoryItem, percent: f64) -> f64 {
let base_price = item.sell_cost * (percent / 100.0);
let min_price = base_price * 0.6;
let max_price = base_price;
let knowledge_factor = item.quality as f64;
min_price + (max_price - min_price) * (knowledge_factor / 100.0)
}
fn calc_max_capacity(vehicles: &[TransportVehicle]) -> i32 {
vehicles.iter().fold(0i32, |acc, v| acc.saturating_add(v.capacity))
}
fn calc_transport_cost(remote_piece_price: f64, qty: i32) -> f64 {
let total_value = remote_piece_price * qty as f64;
let transport_cost = total_value * 0.01_f64;
if transport_cost < 0.01 { 0.01 } else { transport_cost }
}
fn insert_transports_for_route(conn: &mut DbConnection, item: &InventoryItem, target_region: i32, desired: i32) -> Result<i32, DbError> {
let vehicles = Self::get_transport_vehicles_for_route(conn, item.user_id, item.region_id, target_region)?;
if vehicles.is_empty() { return Ok(0); }
conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?;
let mut remaining = desired;
for v in &vehicles {
if remaining <= 0 { break; }
let size = std::cmp::min(remaining, v.capacity);
if size <= 0 { continue; }
conn.execute("insert_transport", &[&item.region_id, &target_region, &item.product_id, &size, &v.id])?;
remaining -= size;
}
Ok(desired - remaining.max(0))
}
/// Plant leere Transporte, um Fahrzeuge zurückzuholen, wenn:
/// - Keine Transportmittel im aktuellen Branch vorhanden sind
/// - Aber bessere Verkaufspreise in anderen Branches existieren
@@ -1504,14 +1169,8 @@ impl DirectorWorker {
falukant_user_id: i32,
current_branch_id: i32,
) -> Result<(), DbError> {
// Aktuelle Branch-Region ermitteln
const QUERY_GET_BRANCH_REGION: &str = r#"
SELECT region_id
FROM falukant_data.branch
WHERE id = $1;
"#;
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
// Aktuelle Branch-Region ermitteln
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
let branch_rows = conn.execute("get_branch_region", &[&current_branch_id])?;
let current_region_id = match branch_rows.into_iter().next() {
@@ -1585,14 +1244,6 @@ impl DirectorWorker {
// Berechne Preisvorteil (vereinfacht: verwende worth_percent-Differenz)
// Hole worth_percent für beide Regionen (für ein beliebiges Produkt)
let mut price_delta = 0.0;
const QUERY_GET_AVERAGE_WORTH: &str = r#"
SELECT
AVG(CASE WHEN region_id = $1 THEN worth_percent ELSE NULL END) AS current_worth,
AVG(CASE WHEN region_id = $2 THEN worth_percent ELSE NULL END) AS target_worth
FROM falukant_data.town_product_worth
WHERE region_id IN ($1, $2);
"#;
conn.prepare("get_average_worth", QUERY_GET_AVERAGE_WORTH)?;
let worth_rows = conn.execute(
"get_average_worth",
@@ -1667,12 +1318,6 @@ impl DirectorWorker {
inventory_id: i32,
new_quantity: i32,
) -> Result<(), DbError> {
const QUERY_UPDATE_INVENTORY_QTY: &str = r#"
UPDATE falukant_data.inventory
SET quantity = $2
WHERE id = $1;
"#;
conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?;
conn.execute("update_inventory_qty", &[&inventory_id, &new_quantity])?;

View File

@@ -8,6 +8,15 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_RANDOM_USER,
QUERY_GET_RANDOM_INFANT,
QUERY_GET_RANDOM_CITY,
QUERY_GET_AFFECTED_USERS,
QUERY_INSERT_NOTIFICATION,
QUERY_GET_MONEY,
QUERY_UPDATE_MONEY,
};
/// Typisierung von Ereignissen
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -420,15 +429,7 @@ impl EventsWorker {
return Self::trigger_sudden_infant_death(pool, broker, event, rng);
}
// Hole einen zufälligen aktiven Spieler
const QUERY_GET_RANDOM_USER: &str = r#"
SELECT id
FROM falukant_data.falukant_user
ORDER BY RANDOM()
LIMIT 1;
"#;
conn.prepare("get_random_user", QUERY_GET_RANDOM_USER)?;
conn.prepare("get_random_user", QUERY_GET_RANDOM_USER)?;
let rows = conn.execute("get_random_user", &[])?;
let user_id: Option<i32> = rows
@@ -604,20 +605,7 @@ impl EventsWorker {
// Finde ein zufälliges Kind unter 2 Jahren
// Maximalalter: 730 Tage (2 Jahre) - festgelegt in der WHERE-Klausel unten
const QUERY_GET_RANDOM_INFANT: &str = r#"
SELECT
c.id AS character_id,
c.user_id,
CURRENT_DATE - c.birthdate::date AS age_days
FROM falukant_data."character" c
WHERE c.user_id IS NOT NULL
AND c.health > 0
AND CURRENT_DATE - c.birthdate::date <= 730 -- Maximalalter: 2 Jahre (730 Tage)
ORDER BY RANDOM()
LIMIT 1;
"#;
conn.prepare("get_random_infant", QUERY_GET_RANDOM_INFANT)?;
conn.prepare("get_random_infant", QUERY_GET_RANDOM_INFANT)?;
let rows = conn.execute("get_random_infant", &[])?;
let character_id: Option<i32> = rows
@@ -712,16 +700,7 @@ impl EventsWorker {
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Hole eine zufällige Stadt-Region
const QUERY_GET_RANDOM_CITY: &str = r#"
SELECT r.id AS region_id
FROM falukant_data.region r
JOIN falukant_type.region tr ON r.region_type_id = tr.id
WHERE tr.label_tr = 'city'
ORDER BY RANDOM()
LIMIT 1;
"#;
conn.prepare("get_random_city", QUERY_GET_RANDOM_CITY)?;
conn.prepare("get_random_city", QUERY_GET_RANDOM_CITY)?;
let rows = conn.execute("get_random_city", &[])?;
let region_id: Option<i32> = rows
@@ -913,14 +892,7 @@ impl EventsWorker {
}
// Finde alle betroffenen User in dieser Region (User mit Branches)
const QUERY_GET_AFFECTED_USERS: &str = r#"
SELECT DISTINCT b.falukant_user_id AS user_id
FROM falukant_data.branch b
WHERE b.region_id = $1
AND b.falukant_user_id IS NOT NULL;
"#;
conn.prepare("get_affected_users", QUERY_GET_AFFECTED_USERS)?;
conn.prepare("get_affected_users", QUERY_GET_AFFECTED_USERS)?;
let user_rows = conn.execute("get_affected_users", &[&region_id])?;
// Sende Benachrichtigung an jeden betroffenen User einzeln
@@ -976,13 +948,7 @@ impl EventsWorker {
percent_change: f64,
) -> Result<f64, DbError> {
// Hole aktuelles Geld
const QUERY_GET_MONEY: &str = r#"
SELECT money
FROM falukant_data.falukant_user
WHERE id = $1;
"#;
conn.prepare("get_money", QUERY_GET_MONEY)?;
conn.prepare("get_money", QUERY_GET_MONEY)?;
let rows = conn.execute("get_money", &[&user_id])?;
let current_money: Option<f64> = rows
@@ -1002,12 +968,8 @@ impl EventsWorker {
let action = format!("Zufallsereignis: Geldänderung {:.2}%", percent_change);
// Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection
const QUERY_UPDATE_MONEY: &str = r#"
SELECT falukant_data.update_money($1, $2, $3);
"#;
conn.prepare("update_money_event", QUERY_UPDATE_MONEY)?;
let _ = conn.execute("update_money_event", &[&user_id, &change, &action])?;
conn.prepare("update_money_event", QUERY_UPDATE_MONEY)?;
let _ = conn.execute("update_money_event", &[&user_id, &change, &action])?;
// Best-effort money_history insert for UI/history visibility.
let money_str = format!("{:.2}", change);
@@ -1811,17 +1773,7 @@ impl EventsWorker {
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
const QUERY_INSERT_NOTIFICATION: &str = r#"
INSERT INTO falukant_log.notification (
user_id,
tr,
shown,
created_at,
updated_at
) VALUES ($1, $2, FALSE, NOW(), NOW());
"#;
conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?;
conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?;
conn.execute("insert_notification", &[&user_id, &event_type])?;
// falukantUpdateStatus

View File

@@ -11,6 +11,7 @@ mod user_character;
mod transport;
mod weather;
mod events;
mod sql;
pub use base::Worker;
pub use crate::db::ConnectionPool;

179
src/worker/sql.rs Normal file
View File

@@ -0,0 +1,179 @@
// Centralized SQL strings for workers.
pub const QUERY_UPDATE_MONEY: &str = r#"
SELECT falukant_data.update_money($1, $2, $3);
"#;
pub const QUERY_GET_MONEY: &str = r#"
SELECT money FROM falukant_data.falukant_user WHERE id = $1;
"#;
pub const QUERY_GET_RANDOM_USER: &str = r#"
SELECT id FROM falukant_data.falukant_user ORDER BY RANDOM() LIMIT 1;
"#;
pub const QUERY_GET_RANDOM_INFANT: &str = r#"
SELECT c.id AS character_id, c.user_id, CURRENT_DATE - c.birthdate::date AS age_days
FROM falukant_data."character" c
WHERE c.user_id IS NOT NULL AND c.health > 0 AND CURRENT_DATE - c.birthdate::date <= 730
ORDER BY RANDOM() LIMIT 1;
"#;
pub const QUERY_GET_RANDOM_CITY: &str = r#"
SELECT r.id AS region_id FROM falukant_data.region r JOIN falukant_type.region tr ON r.region_type_id = tr.id WHERE tr.label_tr = 'city' ORDER BY RANDOM() LIMIT 1;
"#;
pub const QUERY_GET_AFFECTED_USERS: &str = r#"
SELECT DISTINCT b.falukant_user_id AS user_id FROM falukant_data.branch b WHERE b.region_id = $1 AND b.falukant_user_id IS NOT NULL;
"#;
pub const QUERY_UPDATE_WEATHER: &str = r#"
WITH all_regions AS (
SELECT DISTINCT r.id AS region_id FROM falukant_data.region r JOIN falukant_type.region tr ON r.region_type_id = tr.id WHERE tr.label_tr = 'city'
)
INSERT INTO falukant_data.weather (region_id, weather_type_id)
SELECT ar.region_id, (SELECT wt.id FROM falukant_type.weather wt ORDER BY random() + ar.region_id * 0 LIMIT 1) FROM all_regions ar
ON CONFLICT (region_id) DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id;
"#;
pub const QUERY_INSERT_NOTIFICATION: &str = r#"
INSERT INTO falukant_log.notification (user_id, tr, shown, created_at, updated_at) VALUES ($1, $2, FALSE, NOW(), NOW());
"#;
pub const QUERY_GET_DIRECTORS: &str = r#"
SELECT d.may_produce, d.may_sell, d.may_start_transport, b.id AS branch_id, fu.id AS falukantUserId, d.id
FROM falukant_data.director d
JOIN falukant_data.falukant_user fu ON fu.id = d.employer_user_id
JOIN falukant_data.character c ON c.id = d.director_character_id
JOIN falukant_data.branch b ON b.region_id = c.region_id AND b.falukant_user_id = fu.id
WHERE current_time BETWEEN '08:00:00' AND '17:00:00';
"#;
pub const QUERY_GET_BEST_PRODUCTION: &str = r#"
SELECT fdu.id falukant_user_id, CAST(fdu.money AS text) AS money, fdu.certificate, ftp.id product_id, ftp.label_tr, fdb.region_id,
(SELECT SUM(quantity) FROM falukant_data.stock fds WHERE fds.branch_id = fdb.id) AS stock_size,
COALESCE((SELECT SUM(COALESCE(fdi.quantity, 0)) FROM falukant_data.stock fds JOIN falukant_data.inventory fdi ON fdi.stock_id = fds.id WHERE fds.branch_id = fdb.id), 0) AS used_in_stock,
(ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category) / (300.0 * ftp.production_time) AS worth,
fdb.id AS branch_id, (SELECT COUNT(id) FROM falukant_data.production WHERE branch_id = fdb.id) AS running_productions,
COALESCE((SELECT SUM(COALESCE(fdp.quantity, 0)) quantity FROM falukant_data.production fdp WHERE fdp.branch_id = fdb.id), 0) AS running_productions_quantity
FROM falukant_data.director fdd
JOIN falukant_data.character fdc ON fdc.id = fdd.director_character_id
JOIN falukant_data.falukant_user fdu ON fdd.employer_user_id = fdu.id
JOIN falukant_data.character user_character ON user_character.user_id = fdu.id
JOIN falukant_data.branch fdb ON fdb.falukant_user_id = fdu.id AND fdb.region_id = fdc.region_id
JOIN falukant_data.town_product_worth fdtpw ON fdtpw.region_id = fdb.region_id
JOIN falukant_data.knowledge fdk_character ON fdk_character.product_id = fdtpw.product_id AND fdk_character.character_id = user_character.id
JOIN falukant_data.knowledge fdk_director ON fdk_director.product_id = fdtpw.product_id AND fdk_director.character_id = fdd.director_character_id
JOIN falukant_type.product ftp ON ftp.id = fdtpw.product_id AND ftp.category <= fdu.certificate
WHERE fdd.id = $1 AND fdb.id = $2 ORDER BY worth DESC LIMIT 1;
"#;
pub const QUERY_INSERT_PRODUCTION: &str = r#"
INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id) VALUES ($1, $2, $3, (SELECT weather_type_id FROM falukant_data.weather WHERE region_id = $4));
"#;
pub const QUERY_GET_BRANCH_CAPACITY: &str = r#"
SELECT (SELECT SUM(quantity) FROM falukant_data.stock fds WHERE fds.branch_id = $1) AS stock_size,
COALESCE((SELECT SUM(COALESCE(fdi.quantity, 0)) FROM falukant_data.stock fds JOIN falukant_data.inventory fdi ON fdi.stock_id = fds.id WHERE fds.branch_id = $1), 0) AS used_in_stock,
(SELECT COUNT(id) FROM falukant_data.production WHERE branch_id = $1) AS running_productions,
COALESCE((SELECT SUM(COALESCE(fdp.quantity, 0)) quantity FROM falukant_data.production fdp WHERE fdp.branch_id = $1), 0) AS running_productions_quantity;
"#;
pub const QUERY_GET_INVENTORY: &str = r#"
SELECT i.id, i.product_id, i.quantity, i.quality, p.sell_cost, fu.id AS user_id, b.region_id, b.id AS branch_id, COALESCE(tpw.worth_percent, 100.0) AS worth_percent
FROM falukant_data.inventory i
JOIN falukant_data.stock s ON s.id = i.stock_id
JOIN falukant_data.branch b ON b.id = s.branch_id
JOIN falukant_data.falukant_user fu ON fu.id = b.falukant_user_id
JOIN falukant_data.director d ON d.employer_user_id = fu.id
JOIN falukant_type.product p ON p.id = i.product_id
LEFT JOIN falukant_data.town_product_worth tpw ON tpw.region_id = b.region_id AND tpw.product_id = i.product_id
WHERE d.id = $1 AND b.id = $2;
"#;
pub const QUERY_REMOVE_INVENTORY: &str = r#"
DELETE FROM falukant_data.inventory WHERE id = $1;
"#;
pub const QUERY_ADD_SELL_LOG: &str = r#"
INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id) VALUES ($1, $2, $3, $4)
ON CONFLICT (region_id, product_id, seller_id) DO UPDATE SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity;
"#;
pub const QUERY_GET_REGION_WORTH_FOR_PRODUCT: &str = r#"
SELECT tpw.region_id, tpw.product_id, tpw.worth_percent FROM falukant_data.town_product_worth tpw JOIN falukant_data.branch b ON b.region_id = tpw.region_id WHERE b.falukant_user_id = $1 AND tpw.product_id = $2;
"#;
pub const QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE: &str = r#"
SELECT v.id AS vehicle_id, vt.capacity AS capacity
FROM falukant_data.vehicle v
JOIN falukant_type.vehicle vt ON vt.id = v.vehicle_type_id
JOIN falukant_data.region_distance rd ON ((rd.source_region_id = v.region_id AND rd.target_region_id = $3) OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id)) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
WHERE v.falukant_user_id = $1 AND v.region_id = $2;
"#;
pub const QUERY_INSERT_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport (source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, NOW(), NOW());
"#;
pub const QUERY_INSERT_EMPTY_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport (source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at) VALUES ($1, $2, NULL, 0, $3, NOW(), NOW());
"#;
pub const QUERY_GET_USER_BRANCHES: &str = r#"
SELECT DISTINCT b.region_id, b.id AS branch_id FROM falukant_data.branch b WHERE b.falukant_user_id = $1 AND b.region_id != $2;
"#;
pub const QUERY_GET_FREE_VEHICLES_IN_REGION: &str = r#"
SELECT v.id AS vehicle_id, vt.capacity AS capacity FROM falukant_data.vehicle v JOIN falukant_type.vehicle vt ON vt.id = v.vehicle_type_id WHERE v.falukant_user_id = $1 AND v.region_id = $2 AND v.id NOT IN (SELECT DISTINCT t.vehicle_id FROM falukant_data.transport t WHERE t.vehicle_id IS NOT NULL);
"#;
pub const QUERY_GET_SALARY_TO_PAY: &str = r#"
SELECT d.id, d.employer_user_id, d.income FROM falukant_data.director d WHERE DATE(d.last_salary_payout) < DATE(NOW());
"#;
pub const QUERY_SET_SALARY_PAYED: &str = r#"
UPDATE falukant_data.director SET last_salary_payout = NOW() WHERE id = $1;
"#;
pub const QUERY_UPDATE_SATISFACTION: &str = r#"
WITH new_sats AS (
SELECT d.id, ROUND(d.income::numeric / (c.title_of_nobility * POWER(1.231, AVG(k.knowledge) / 1.5)) * 100) AS new_satisfaction
FROM falukant_data.director d
JOIN falukant_data.knowledge k ON d.director_character_id = k.character_id
JOIN falukant_data.character c ON c.id = d.director_character_id
GROUP BY d.id, c.title_of_nobility, d.income
)
UPDATE falukant_data.director dir SET satisfaction = ns.new_satisfaction FROM new_sats ns WHERE dir.id = ns.id AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction RETURNING dir.employer_user_id;
"#;
pub const QUERY_GET_DIRECTOR_USER: &str = r#"
SELECT fu.id AS falukant_user_id FROM falukant_data.director d JOIN falukant_data.falukant_user fu ON fu.id = d.employer_user_id WHERE d.id = $1 LIMIT 1;
"#;
pub const QUERY_COUNT_VEHICLES_IN_BRANCH_REGION: &str = r#"
SELECT COUNT(v.id) AS cnt FROM falukant_data.vehicle v WHERE v.falukant_user_id = $1 AND v.region_id = $2;
"#;
pub const QUERY_COUNT_VEHICLES_IN_REGION: &str = r#"
SELECT COUNT(v.id) AS cnt FROM falukant_data.vehicle v WHERE v.falukant_user_id = $1 AND v.region_id = $2;
"#;
pub const QUERY_CHECK_ROUTE: &str = r#"
SELECT 1 FROM falukant_data.region_distance rd WHERE (rd.source_region_id = $1 AND rd.target_region_id = $2) OR (rd.source_region_id = $2 AND rd.target_region_id = $1) LIMIT 1;
"#;
pub const QUERY_GET_BRANCH_REGION: &str = r#"
SELECT region_id FROM falukant_data.branch WHERE id = $1 LIMIT 1;
"#;
pub const QUERY_GET_AVERAGE_WORTH: &str = r#"
SELECT AVG(tpw.worth_percent) AS avg_worth FROM falukant_data.town_product_worth tpw WHERE tpw.product_id = $1 AND tpw.region_id IN (SELECT region_id FROM falukant_data.branch WHERE falukant_user_id = $2);
"#;
pub const QUERY_UPDATE_INVENTORY_QTY: &str = r#"
UPDATE falukant_data.inventory SET quantity = $1 WHERE id = $2;
"#;

View File

@@ -11,6 +11,7 @@ use std::sync::Arc;
use std::time::Duration;
use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::QUERY_UPDATE_MONEY;
pub struct UndergroundWorker {
base: BaseWorker,
@@ -118,10 +119,7 @@ const Q_SELECT_FALUKANT_USER: &str = r#"
LIMIT 1;
"#;
// Query für Geldänderungen (lokale Variante von BaseWorker::change_falukant_user_money)
const QUERY_UPDATE_MONEY: &str = r#"
SELECT falukant_data.update_money($1, $2, $3);
"#;
// Use centralized QUERY_UPDATE_MONEY from src/worker/sql.rs
impl UndergroundWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {

View File

@@ -5,39 +5,13 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::QUERY_UPDATE_WEATHER;
pub struct WeatherWorker {
base: BaseWorker,
}
// Query zum Aktualisieren des Wetters für alle Regionen
// Wählt für jede Region ein zufälliges Wetter aus allen verfügbaren Wettertypen aus
// Wichtig: Jede Region bekommt ein individuelles, zufälliges Wetter. Die vorherige
// Variante konnte vom Planner unter Umständen die Zufalls-Subquery nur einmal
// auswerten; mit LATERAL wird die Zufallsauswahl pro Region garantiert ausgeführt.
const QUERY_UPDATE_WEATHER: &str = r#"
WITH all_regions AS (
SELECT DISTINCT r.id AS region_id
FROM falukant_data.region r
JOIN falukant_type.region tr ON r.region_type_id = tr.id
WHERE tr.label_tr = 'city'
),
random_weather AS (
SELECT ar.region_id, wt.id AS weather_type_id
FROM all_regions ar
CROSS JOIN LATERAL (
SELECT wt.id
FROM falukant_type.weather wt
ORDER BY RANDOM()
LIMIT 1
) wt
)
INSERT INTO falukant_data.weather (region_id, weather_type_id)
SELECT rw.region_id, rw.weather_type_id
FROM random_weather rw
ON CONFLICT (region_id)
DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id;
"#;
// Reuse QUERY_UPDATE_WEATHER from centralized SQL module
impl WeatherWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -89,17 +63,17 @@ impl WeatherWorker {
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Run the prepared SQL that uses per-row RANDOM() trick
conn.prepare("update_weather", QUERY_UPDATE_WEATHER)?;
let updated_rows = conn.execute("update_weather", &[])?;
eprintln!(
"[WeatherWorker] Wetter aktualisiert. {} Regionen betroffen.",
"[WeatherWorker] Wetter aktualisiert (per-row random). {} Regionen betroffen.",
updated_rows.len()
);
// Benachrichtige alle Clients über Wetteränderungen
let message = r#"{"event":"weather_updated"}"#;
broker.publish(message.to_string());
broker.publish("{\"event\":\"weather_updated\"}".to_string());
Ok(())
}