18 Commits

Author SHA1 Message Date
Torsten Schulz (local)
a82d554494 Enhance SQL query in Worker to include character reputation: Add COALESCE for reputation in the vote count query and adjust the GROUP BY and ORDER BY clauses to incorporate reputation, improving data accuracy in election results. 2025-12-20 23:06:51 +01:00
Torsten Schulz (local)
833202344b Refactor SQL query in Worker to improve distance calculations: Replace the existing LEFT JOIN with a LATERAL join to enhance the selection of region distances based on transport mode, ensuring more accurate results. Update polling frequency in TransportWorker from once per second to once per minute for better resource management. 2025-12-20 22:20:29 +01:00
Torsten Schulz (local)
cd1b33a474 Update SQL query in Worker to exclude vehicles involved in transport: Modify the vehicle selection query to filter out vehicles that have been used in transport, enhancing data accuracy and integrity. 2025-12-20 21:20:05 +01:00
Torsten Schulz (local)
1719af2344 Refactor revenue and tax calculations in DirectorWorker: Adjust the pricing logic to account for cumulative tax inflation, ensuring accurate revenue and payout calculations. Update SQL query for sell logs to change conflict resolution order for better data integrity. 2025-12-20 15:20:03 +01:00
Torsten Schulz (local)
398e0ba677 Enhance SQL insert query for sell logs and improve error logging in TransportWorker: Add sell_timestamp to the insert statement and refine error messages to include transport details for better debugging. 2025-12-20 14:20:09 +01:00
Torsten Schulz (local)
4fca4b4d75 Enhance SQL queries and logging in TransportWorker and WeatherWorker: Update weather assignment logic to ensure unique weather types per region, improve distance calculations in transport queries, and refine logging for transport processing. Additionally, adjust notification insert queries to include character_id. 2025-12-20 11:17:32 +01:00
80012fec64 Merge pull request 'Configure Renovate' (#1) from renovate/configure into main
Reviewed-on: #1
2025-12-19 16:08:08 +01:00
8e92a63895 Add renovate.json 2025-12-19 16:00:14 +01:00
Torsten Schulz (local)
f7710b64c9 Refactor WeatherWorker SQL query: Replace the existing weather update logic with a new query that assigns random weather types to each region, ensuring individual updates. Clean up logging messages for clarity. 2025-12-18 15:34:16 +01:00
Torsten Schulz (local)
6d12c70d84 Entferne Info-Logs aus den Worker-Klassen: Reduziere die Protokollierung in ProduceWorker, DirectorWorker und TransportWorker, um die Ausgabe zu optimieren und nur Fehler zu protokollieren. 2025-12-16 12:58:37 +01:00
Torsten Schulz (local)
d2e253b79a Füge erweiterte SQL-Abfragen und Logging in ProduceWorker hinzu: Implementiere Update- und Insert-Logik für das Inventar und verbessere die Fehlerausgaben. 2025-12-16 10:52:06 +01:00
Torsten Schulz (local)
74fee2d4c9 Verbessere Fehlerbehandlung in SQL-Abfragen: Füge detaillierte Fehlermeldungen für Vorbereitungs- und Ausführungsfehler in den Director- und Politics-Workern hinzu. 2025-12-16 08:52:08 +01:00
Torsten Schulz (local)
b45990c1b6 Aktualisiere Benachrichtigungsabfragen: Ersetze character_id durch user_id in den Benachrichtigungs-INSERT-Abfragen und optimiere die Abfragen zur Auswahl des Benutzers. 2025-12-16 08:24:46 +01:00
Torsten Schulz (local)
ae90166adb Aktualisiere Benachrichtigungsabfragen: Ersetze user_id durch character_id und optimiere die Abfragen zur Auswahl des Charakters. 2025-12-15 21:20:30 +01:00
Torsten Schulz (local)
2aa4e7c666 Füge VSCode-Build-Tasks hinzu: Erstelle Aufgaben für den Build-Prozess und Clippy-Überprüfungen. 2025-12-13 23:40:20 +01:00
Torsten Schulz (local)
ce06b1a4f0 Verbessere SQL-Abfragen im Produktionsworker: Optimiere die Abfrage für abgeschlossene Produktionen und verbessere die Lesbarkeit der Preisberechnung. 2025-12-13 12:17:00 +01:00
Torsten Schulz (local)
10bc1e5a52 Refactor SQL queries into a dedicated module
- Moved SQL queries from multiple worker files into `src/worker/sql.rs` for better organization and maintainability.
- Updated references in `stockage_manager.rs`, `transport.rs`, `underground.rs`, `user_character.rs`, and `value_recalculation.rs` to use the new centralized SQL queries.
- Improved code readability by replacing `.get(0)` with `.first()` for better clarity when retrieving the first row from query results.
- Cleaned up unnecessary comments and consolidated related SQL queries.
2025-12-13 11:57:28 +01:00
Torsten Schulz (local)
a9d490ce38 Refactor SQL queries into centralized module
- Moved various SQL query strings from individual worker files into a new `sql.rs` module for better organization and reusability.
- Updated `events.rs`, `underground.rs`, and `weather.rs` to use the centralized SQL queries.
- Removed redundant query definitions from `events.rs`, `underground.rs`, and `weather.rs`.
2025-12-12 15:18:30 +01:00
19 changed files with 2508 additions and 2938 deletions

27
.vscode/tasks.json vendored Normal file
View File

@@ -0,0 +1,27 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "Build and clippy",
"type": "shell",
"command": "cargo build && cargo clippy",
"group": "build"
},
{
"label": "cargo build",
"type": "shell",
"command": "cargo build",
"group": "build"
},
{
"label": "Build YpDaemon",
"type": "shell",
"command": "cargo build",
"isBackground": false,
"problemMatcher": [
"$rustc"
],
"group": "build"
}
]
}

6
renovate.json Normal file
View File

@@ -0,0 +1,6 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:recommended"
]
}

View File

@@ -92,8 +92,32 @@ impl Database {
.get(name) .get(name)
.ok_or_else(|| DbError::new(format!("Unbekanntes Statement: {name}")))?; .ok_or_else(|| DbError::new(format!("Unbekanntes Statement: {name}")))?;
let rows = self.client.query(sql.as_str(), params)?; match self.client.query(sql.as_str(), params) {
Ok(rows.into_iter().map(Self::row_to_map).collect()) Ok(rows) => Ok(rows.into_iter().map(Self::row_to_map).collect()),
Err(err) => {
if let Some(db_err) = err.as_db_error() {
let code = db_err.code().code().to_string();
let message = db_err.message();
let detail = db_err.detail().unwrap_or_default();
let hint = db_err.hint().unwrap_or_default();
// SQL ggf. kürzen, um Log-Flut zu vermeiden
let mut sql_preview = sql.clone();
const MAX_SQL_PREVIEW: usize = 800;
if sql_preview.len() > MAX_SQL_PREVIEW {
sql_preview.truncate(MAX_SQL_PREVIEW);
sql_preview.push_str("");
}
Err(DbError::new(format!(
"Postgres-Fehler bei Statement '{name}': {} (SQLSTATE: {}, Detail: {}, Hint: {}) | SQL: {}",
message, code, detail, hint, sql_preview
)))
} else {
Err(DbError::new(format!(
"Postgres-Fehler (Client) bei Statement '{name}': {err}"
)))
}
}
}
} }
fn row_to_map(row: postgres::Row) -> Row { fn row_to_map(row: postgres::Row) -> Row {

View File

@@ -86,15 +86,7 @@ async fn append_ws_log(
.map(|s| s.to_string()); .map(|s| s.to_string());
let target_user = json let target_user = json
.get("user_id") .get("user_id")
.and_then(|v| { .and_then(|v| v.as_str().map(|s| s.to_string()).or_else(|| v.as_i64().map(|n| n.to_string())));
if let Some(s) = v.as_str() {
Some(s.to_string())
} else if let Some(n) = v.as_i64() {
Some(n.to_string())
} else {
None
}
});
(event, target_user) (event, target_user)
} else { } else {
(None, None) (None, None)
@@ -510,10 +502,8 @@ async fn handle_connection<S>(
.and_then(|v| { .and_then(|v| {
if let Some(s) = v.as_str() { if let Some(s) = v.as_str() {
s.parse::<i64>().ok() s.parse::<i64>().ok()
} else if let Some(n) = v.as_i64() {
Some(n)
} else { } else {
None v.as_i64()
} }
}) })
.map(|v| v == numeric_uid) .map(|v| v == numeric_uid)

View File

@@ -1,4 +1,5 @@
use crate::db::{ConnectionPool, DbError}; use crate::db::{ConnectionPool, DbError};
use crate::worker::sql::{QUERY_UPDATE_MONEY, QUERY_GET_MONEY};
use crate::message_broker::MessageBroker; use crate::message_broker::MessageBroker;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -96,7 +97,6 @@ impl BaseWorker {
} }
let state = Arc::clone(&self.state); let state = Arc::clone(&self.state);
let name = self.name.clone();
self.watchdog_thread = Some(thread::spawn(move || { self.watchdog_thread = Some(thread::spawn(move || {
while state.running_watchdog.load(Ordering::Relaxed) { while state.running_watchdog.load(Ordering::Relaxed) {
@@ -116,7 +116,7 @@ impl BaseWorker {
// beim Debuggen selten. Deshalb nur loggen, wenn der Worker // beim Debuggen selten. Deshalb nur loggen, wenn der Worker
// sich nicht im Idle-Zustand befindet. // sich nicht im Idle-Zustand befindet.
if !step.ends_with(" idle") { if !step.ends_with(" idle") {
eprintln!("[{name}] Watchdog: current step = {step}"); // keine Info-Logs im Watchdog
} }
} }
})); }));
@@ -147,11 +147,7 @@ impl BaseWorker {
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection // Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection
const QUERY_UPDATE_MONEY: &str = r#" conn.prepare("update_money", QUERY_UPDATE_MONEY)?;
SELECT falukant_data.update_money($1, $2, $3);
"#;
conn.prepare("update_money", QUERY_UPDATE_MONEY)?;
// Validate float to avoid passing NaN/inf which the postgres client // Validate float to avoid passing NaN/inf which the postgres client
// may fail to serialize with an unclear error message. // may fail to serialize with an unclear error message.
@@ -165,26 +161,21 @@ impl BaseWorker {
// We must ensure the resulting money fits in numeric(10,2). // We must ensure the resulting money fits in numeric(10,2).
// numeric(10,2) max absolute value is < 10^8 (100_000_000) before rounding. // numeric(10,2) max absolute value is < 10^8 (100_000_000) before rounding.
// Fetch current money for the user and clamp the delta if needed. // Fetch current money for the user and clamp the delta if needed.
const QUERY_GET_MONEY: &str = r#"
SELECT money FROM falukant_data.falukant_user WHERE id = $1;
"#;
conn.prepare("get_money_for_clamp", QUERY_GET_MONEY)?; conn.prepare("get_money_for_clamp", QUERY_GET_MONEY)?;
let rows = conn.execute("get_money_for_clamp", &[&falukant_user_id])?; let rows = conn.execute("get_money_for_clamp", &[&falukant_user_id])?;
let current_money: f64 = rows let current_money: f64 = rows
.get(0) .first()
.and_then(|r| r.get("money")) .and_then(|r| r.get("money"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0); .unwrap_or(0.0);
// compute tentative result // compute tentative result
let tentative = current_money + money_change; let tentative = current_money + money_change;
// numeric(10,2) allows values with absolute < 10^8 (100_000_000) // numeric(10,2) allows values with absolute < 10^8 (100_000_000)
const MAX_ABS: f64 = 100_000_000.0 - 0.01; // leave room for scale const MAX_ABS: f64 = 100_000_000.0 - 0.01; // leave room for scale
let _allowed = MAX_ABS - current_money;
let adjusted_money_change = if tentative >= MAX_ABS { let adjusted_money_change = if tentative >= MAX_ABS {
let clipped = MAX_ABS - current_money; let clipped = MAX_ABS - current_money;
eprintln!( eprintln!(
@@ -203,19 +194,12 @@ impl BaseWorker {
money_change money_change
}; };
// Keep only important clamp logging: when clipping occurs we log it above.
// Send exact types matching the DB function signature: // Send exact types matching the DB function signature:
let uid_i32: i32 = falukant_user_id; let uid_i32: i32 = falukant_user_id;
let money_str = format!("{:.2}", adjusted_money_change); let money_str = format!("{:.2}", adjusted_money_change);
// Note: we intentionally avoid parameterized call due to serialization // Note: we intentionally avoid parameterized call due to serialization
// issues in this environment and instead execute a literal SQL below. // issues in this environment and instead execute a literal SQL below.
// Execute update; avoid noisy logging here.
// Use a literal SQL call because parameterized execution keeps failing
// with "error serializing parameter 1" in this environment.
fn escape_sql_literal(s: &str) -> String { fn escape_sql_literal(s: &str) -> String {
s.replace('\'', "''") s.replace('\'', "''")
} }

View File

@@ -10,6 +10,19 @@ use std::thread;
use std::time::Duration; use std::time::Duration;
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED,
QUERY_GET_TOWN_REGION_IDS,
QUERY_LOAD_FIRST_NAMES,
QUERY_LOAD_LAST_NAMES,
QUERY_INSERT_CHARACTER,
QUERY_GET_ELIGIBLE_NPC_FOR_DEATH,
QUERY_DELETE_DIRECTOR,
QUERY_DELETE_RELATIONSHIP,
QUERY_DELETE_CHILD_RELATION,
QUERY_INSERT_NOTIFICATION,
QUERY_MARK_CHARACTER_DECEASED,
};
pub struct CharacterCreationWorker { pub struct CharacterCreationWorker {
pub(crate) base: BaseWorker, pub(crate) base: BaseWorker,
@@ -21,145 +34,7 @@ pub struct CharacterCreationWorker {
death_thread: Option<thread::JoinHandle<()>>, death_thread: Option<thread::JoinHandle<()>>,
} }
// SQL-Queries analog zur C++-Implementierung
const QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED: &str = r#"
SELECT created_at
FROM falukant_data."character"
WHERE user_id IS NULL
AND created_at::date = CURRENT_DATE
ORDER BY created_at DESC
LIMIT 1;
"#;
const QUERY_GET_TOWN_REGION_IDS: &str = r#"
SELECT fdr.id
FROM falukant_data.region fdr
JOIN falukant_type.region ftr ON fdr.region_type_id = ftr.id
WHERE ftr.label_tr = 'city';
"#;
const QUERY_LOAD_FIRST_NAMES: &str = r#"
SELECT id, gender
FROM falukant_predefine.firstname;
"#;
const QUERY_LOAD_LAST_NAMES: &str = r#"
SELECT id
FROM falukant_predefine.lastname;
"#;
const QUERY_INSERT_CHARACTER: &str = r#"
INSERT INTO falukant_data.character(
user_id,
region_id,
first_name,
last_name,
birthdate,
gender,
created_at,
updated_at,
title_of_nobility
) VALUES (
NULL,
$1,
$2,
$3,
NOW(),
$4,
NOW(),
NOW(),
$5
);
"#;
const QUERY_GET_ELIGIBLE_NPC_FOR_DEATH: &str = r#"
WITH aged AS (
SELECT
c.id,
(current_date - c.birthdate::date) AS age,
c.user_id
FROM
falukant_data.character c
WHERE
c.user_id IS NULL
AND (current_date - c.birthdate::date) > 60
),
always_sel AS (
SELECT *
FROM aged
WHERE age > 85
),
random_sel AS (
SELECT *
FROM aged
WHERE age <= 85
ORDER BY random()
LIMIT 10
)
SELECT *
FROM always_sel
UNION ALL
SELECT *
FROM random_sel;
"#;
const QUERY_DELETE_DIRECTOR: &str = r#"
DELETE FROM falukant_data.director
WHERE director_character_id = $1
RETURNING employer_user_id;
"#;
const QUERY_DELETE_RELATIONSHIP: &str = r#"
WITH deleted AS (
DELETE FROM falukant_data.relationship
WHERE character1_id = $1
OR character2_id = $1
RETURNING
CASE
WHEN character1_id = $1 THEN character2_id
ELSE character1_id
END AS related_character_id,
relationship_type_id
)
SELECT
c.user_id AS related_user_id
FROM deleted d
JOIN falukant_data.character c
ON c.id = d.related_character_id;
"#;
const QUERY_DELETE_CHILD_RELATION: &str = r#"
WITH deleted AS (
DELETE FROM falukant_data.child_relation
WHERE child_character_id = $1
RETURNING
father_character_id,
mother_character_id
)
SELECT
cf.user_id AS father_user_id,
cm.user_id AS mother_user_id
FROM deleted d
JOIN falukant_data.character cf
ON cf.id = d.father_character_id
JOIN falukant_data.character cm
ON cm.id = d.mother_character_id;
"#;
const QUERY_INSERT_NOTIFICATION: &str = r#"
INSERT INTO falukant_log.notification (
user_id,
tr,
shown,
created_at,
updated_at
) VALUES ($1, 'director_death', FALSE, NOW(), NOW());
"#;
const QUERY_MARK_CHARACTER_DECEASED: &str = r#"
DELETE FROM falukant_data.character
WHERE id = $1;
"#;
impl CharacterCreationWorker { impl CharacterCreationWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -329,10 +204,10 @@ impl CharacterCreationWorker {
} }
fn load_names(&mut self) { fn load_names(&mut self) {
if self.first_name_cache.is_empty() || self.last_name_cache.is_empty() { if (self.first_name_cache.is_empty() || self.last_name_cache.is_empty())
if let Err(err) = self.load_first_and_last_names() { && let Err(err) = self.load_first_and_last_names()
eprintln!("[CharacterCreationWorker] Fehler in loadNames: {err}"); {
} eprintln!("[CharacterCreationWorker] Fehler in loadNames: {err}");
} }
} }
@@ -491,12 +366,12 @@ impl CharacterCreationWorker {
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0); .unwrap_or(0);
if character_id > 0 && Self::calculate_death_probability(age) { if character_id > 0 && Self::calculate_death_probability(age)
if let Err(err) = Self::handle_character_death(pool, broker, character_id) { && let Err(err) = Self::handle_character_death(pool, broker, character_id)
eprintln!( {
"[CharacterCreationWorker] Fehler beim Bearbeiten des NPC-Todes (id={character_id}): {err}" eprintln!(
); "[CharacterCreationWorker] Fehler beim Bearbeiten des NPC-Todes (id={character_id}): {err}"
} );
} }
} }
@@ -531,13 +406,12 @@ impl CharacterCreationWorker {
// 1) Director löschen und User benachrichtigen // 1) Director löschen und User benachrichtigen
conn.prepare("delete_director", QUERY_DELETE_DIRECTOR)?; conn.prepare("delete_director", QUERY_DELETE_DIRECTOR)?;
let dir_result = conn.execute("delete_director", &[&character_id])?; let dir_result = conn.execute("delete_director", &[&character_id])?;
if let Some(row) = dir_result.get(0) { if let Some(row) = dir_result.first()
if let Some(user_id) = row && let Some(user_id) = row
.get("employer_user_id") .get("employer_user_id")
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
{ {
Self::notify_user(pool, broker, user_id, "director_death")?; Self::notify_user(pool, broker, user_id, "director_death")?;
}
} }
// 2) Relationships löschen und betroffene User benachrichtigen // 2) Relationships löschen und betroffene User benachrichtigen

View File

@@ -7,6 +7,35 @@ use std::time::{Duration, Instant};
use crate::db::ConnectionPool; use crate::db::ConnectionPool;
use super::base::{BaseWorker, Worker, WorkerState, DEFAULT_TAX_PERCENT, DEFAULT_TREASURY_USER_ID}; use super::base::{BaseWorker, Worker, WorkerState, DEFAULT_TAX_PERCENT, DEFAULT_TREASURY_USER_ID};
use crate::worker::sql::{
QUERY_GET_DIRECTORS,
QUERY_GET_BEST_PRODUCTION,
QUERY_INSERT_PRODUCTION,
QUERY_GET_BRANCH_CAPACITY,
QUERY_GET_INVENTORY,
QUERY_REMOVE_INVENTORY,
QUERY_ADD_SELL_LOG,
QUERY_GET_REGION_WORTH_FOR_PRODUCT,
QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE,
QUERY_INSERT_TRANSPORT,
QUERY_INSERT_EMPTY_TRANSPORT,
QUERY_GET_USER_BRANCHES,
QUERY_GET_FREE_VEHICLES_IN_REGION,
QUERY_GET_SALARY_TO_PAY,
QUERY_SET_SALARY_PAYED,
QUERY_UPDATE_SATISFACTION,
QUERY_GET_DIRECTOR_USER,
QUERY_COUNT_VEHICLES_IN_BRANCH_REGION,
QUERY_COUNT_VEHICLES_IN_REGION,
QUERY_CHECK_ROUTE,
QUERY_GET_BRANCH_REGION,
QUERY_GET_AVERAGE_WORTH,
QUERY_UPDATE_INVENTORY_QTY,
QUERY_GET_PRODUCT_COST,
QUERY_GET_USER_OFFICES,
QUERY_CUMULATIVE_TAX_NO_EXEMPT,
QUERY_CUMULATIVE_TAX_WITH_EXEMPT,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Director { struct Director {
@@ -65,279 +94,10 @@ pub struct DirectorWorker {
// Maximale Anzahl paralleler Produktionen pro Branch // Maximale Anzahl paralleler Produktionen pro Branch
const MAX_PARALLEL_PRODUCTIONS: i32 = 2; const MAX_PARALLEL_PRODUCTIONS: i32 = 2;
// SQL-Queries (1:1 aus director_worker.h) // ...existing code...
const QUERY_GET_DIRECTORS: &str = r#"
SELECT
d.may_produce,
d.may_sell,
d.may_start_transport,
b.id AS branch_id,
fu.id AS falukantUserId,
d.id
FROM falukant_data.director d
JOIN falukant_data.falukant_user fu
ON fu.id = d.employer_user_id
JOIN falukant_data.character c
ON c.id = d.director_character_id
JOIN falukant_data.branch b
ON b.region_id = c.region_id
AND b.falukant_user_id = fu.id
WHERE current_time BETWEEN '08:00:00' AND '17:00:00';
"#;
const QUERY_GET_BEST_PRODUCTION: &str = r#"
SELECT
fdu.id falukant_user_id,
-- Geld explizit als Text casten, damit das Mapping im Rust-Code
-- zuverlässig funktioniert (unabhängig vom nativen DB-Typ wie `money`).
CAST(fdu.money AS text) AS money,
fdu.certificate,
ftp.id product_id,
ftp.label_tr,
fdb.region_id,
(
SELECT SUM(quantity)
FROM falukant_data.stock fds
WHERE fds.branch_id = fdb.id
) AS stock_size,
COALESCE((
SELECT SUM(COALESCE(fdi.quantity, 0))
FROM falukant_data.stock fds
JOIN falukant_data.inventory fdi
ON fdi.stock_id = fds.id
WHERE fds.branch_id = fdb.id
), 0) AS used_in_stock,
(ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category)
/ (300.0 * ftp.production_time) AS worth,
fdb.id AS branch_id,
(
SELECT COUNT(id)
FROM falukant_data.production
WHERE branch_id = fdb.id
) AS running_productions,
COALESCE((
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
FROM falukant_data.production fdp
WHERE fdp.branch_id = fdb.id
), 0) AS running_productions_quantity
FROM falukant_data.director fdd
JOIN falukant_data.character fdc
ON fdc.id = fdd.director_character_id
JOIN falukant_data.falukant_user fdu
ON fdd.employer_user_id = fdu.id
JOIN falukant_data.character user_character
ON user_character.user_id = fdu.id
JOIN falukant_data.branch fdb
ON fdb.falukant_user_id = fdu.id
AND fdb.region_id = fdc.region_id
JOIN falukant_data.town_product_worth fdtpw
ON fdtpw.region_id = fdb.region_id
JOIN falukant_data.knowledge fdk_character
ON fdk_character.product_id = fdtpw.product_id
AND fdk_character.character_id = user_character.id
JOIN falukant_data.knowledge fdk_director
ON fdk_director.product_id = fdtpw.product_id
AND fdk_director.character_id = fdd.director_character_id
JOIN falukant_type.product ftp
ON ftp.id = fdtpw.product_id
AND ftp.category <= fdu.certificate
WHERE fdd.id = $1
AND fdb.id = $2
ORDER BY worth DESC
LIMIT 1;
"#;
const QUERY_INSERT_PRODUCTION: &str = r#"
INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id)
VALUES ($1, $2, $3, (
SELECT weather_type_id
FROM falukant_data.weather
WHERE region_id = $4
));
"#;
// Query zum Abfragen der aktuellen Lager- und Produktionswerte für einen Branch
// (ohne den kompletten Produktionsplan neu zu berechnen)
const QUERY_GET_BRANCH_CAPACITY: &str = r#"
SELECT
(
SELECT SUM(quantity)
FROM falukant_data.stock fds
WHERE fds.branch_id = $1
) AS stock_size,
COALESCE((
SELECT SUM(COALESCE(fdi.quantity, 0))
FROM falukant_data.stock fds
JOIN falukant_data.inventory fdi
ON fdi.stock_id = fds.id
WHERE fds.branch_id = $1
), 0) AS used_in_stock,
(
SELECT COUNT(id)
FROM falukant_data.production
WHERE branch_id = $1
) AS running_productions,
COALESCE((
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
FROM falukant_data.production fdp
WHERE fdp.branch_id = $1
), 0) AS running_productions_quantity;
"#;
const QUERY_GET_INVENTORY: &str = r#"
SELECT
i.id,
i.product_id,
i.quantity,
i.quality,
p.sell_cost,
fu.id AS user_id,
b.region_id,
b.id AS branch_id,
COALESCE(tpw.worth_percent, 100.0) AS worth_percent
FROM falukant_data.inventory i
JOIN falukant_data.stock s
ON s.id = i.stock_id
JOIN falukant_data.branch b
ON b.id = s.branch_id
JOIN falukant_data.falukant_user fu
ON fu.id = b.falukant_user_id
JOIN falukant_data.director d
ON d.employer_user_id = fu.id
JOIN falukant_type.product p
ON p.id = i.product_id
LEFT JOIN falukant_data.town_product_worth tpw
ON tpw.region_id = b.region_id
AND tpw.product_id = i.product_id
WHERE d.id = $1
AND b.id = $2;
"#;
const QUERY_REMOVE_INVENTORY: &str = r#"
DELETE FROM falukant_data.inventory
WHERE id = $1;
"#;
const QUERY_ADD_SELL_LOG: &str = r#"
INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (region_id, product_id, seller_id)
DO UPDATE
SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity;
"#;
// Regionale Verkaufswürdigkeit pro Produkt/Region für alle Branches eines Users
const QUERY_GET_REGION_WORTH_FOR_PRODUCT: &str = r#"
SELECT
tpw.region_id,
tpw.product_id,
tpw.worth_percent
FROM falukant_data.town_product_worth tpw
JOIN falukant_data.branch b
ON b.region_id = tpw.region_id
WHERE b.falukant_user_id = $1
AND tpw.product_id = $2;
"#;
// Verfügbare Transportmittel für eine Route (source_region -> target_region) // Verfügbare Transportmittel für eine Route (source_region -> target_region)
const QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE: &str = r#" // ...existing code...
SELECT
v.id AS vehicle_id,
vt.capacity AS capacity
FROM falukant_data.vehicle v
JOIN falukant_type.vehicle vt
ON vt.id = v.vehicle_type_id
JOIN falukant_data.region_distance rd
ON (
(rd.source_region_id = v.region_id AND rd.target_region_id = $3)
OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id)
)
AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
WHERE v.falukant_user_id = $1
AND v.region_id = $2;
"#;
// Transport-Eintrag anlegen
const QUERY_INSERT_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport
(source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW());
"#;
// Leere Transporte (product_id = NULL, size = 0) zum Zurückholen von Fahrzeugen
const QUERY_INSERT_EMPTY_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport
(source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at)
VALUES ($1, $2, NULL, 0, $3, NOW(), NOW());
"#;
// Alle Branches des Users mit ihren Regionen
const QUERY_GET_USER_BRANCHES: &str = r#"
SELECT DISTINCT
b.region_id,
b.id AS branch_id
FROM falukant_data.branch b
WHERE b.falukant_user_id = $1
AND b.region_id != $2;
"#;
// Freie Transportmittel in einer Region (nicht in aktiven Transporten)
// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert
const QUERY_GET_FREE_VEHICLES_IN_REGION: &str = r#"
SELECT
v.id AS vehicle_id,
vt.capacity AS capacity
FROM falukant_data.vehicle v
JOIN falukant_type.vehicle vt
ON vt.id = v.vehicle_type_id
WHERE v.falukant_user_id = $1
AND v.region_id = $2
AND v.id NOT IN (
SELECT DISTINCT t.vehicle_id
FROM falukant_data.transport t
WHERE t.vehicle_id IS NOT NULL
);
"#;
const QUERY_GET_SALARY_TO_PAY: &str = r#"
SELECT d.id, d.employer_user_id, d.income
FROM falukant_data.director d
WHERE DATE(d.last_salary_payout) < DATE(NOW());
"#;
const QUERY_SET_SALARY_PAYED: &str = r#"
UPDATE falukant_data.director
SET last_salary_payout = NOW()
WHERE id = $1;
"#;
const QUERY_UPDATE_SATISFACTION: &str = r#"
WITH new_sats AS (
SELECT
d.id,
ROUND(
d.income::numeric
/
(
c.title_of_nobility
* POWER(1.231, AVG(k.knowledge) / 1.5)
)
* 100
) AS new_satisfaction
FROM falukant_data.director d
JOIN falukant_data.knowledge k
ON d.director_character_id = k.character_id
JOIN falukant_data.character c
ON c.id = d.director_character_id
GROUP BY d.id, c.title_of_nobility, d.income
)
UPDATE falukant_data.director dir
SET satisfaction = ns.new_satisfaction
FROM new_sats ns
WHERE dir.id = ns.id
AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction
RETURNING dir.employer_user_id;
"#;
impl DirectorWorker { impl DirectorWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -347,7 +107,7 @@ impl DirectorWorker {
} }
} }
fn run_iteration(&mut self, state: &WorkerState) { fn run_iteration(&mut self, _state: &WorkerState) {
self.base.set_current_step("DirectorWorker iteration"); self.base.set_current_step("DirectorWorker iteration");
let now = Instant::now(); let now = Instant::now();
@@ -363,11 +123,7 @@ impl DirectorWorker {
self.last_run = Some(now); self.last_run = Some(now);
} }
std::thread::sleep(Duration::from_secs(1)); std::thread::sleep(Duration::from_secs(1));
if !state.running_worker.load(Ordering::Relaxed) {
return;
}
} }
fn perform_all_tasks(&mut self) -> Result<(), DbError> { fn perform_all_tasks(&mut self) -> Result<(), DbError> {
@@ -397,7 +153,7 @@ impl DirectorWorker {
.collect(); .collect();
if directors.is_empty() { if directors.is_empty() {
eprintln!("[DirectorWorker] Keine Direktoren für Aktionen gefunden (Zeitfenster oder DB-Daten)."); // keine Info-Logs
} }
for director in directors { for director in directors {
@@ -466,7 +222,7 @@ impl DirectorWorker {
return Ok(()); return Ok(());
} }
let mut base_plan = match Self::map_row_to_production_plan(&rows[0]) { let mut base_plan = match rows.first().and_then(Self::map_row_to_production_plan) {
Some(p) => p, Some(p) => p,
None => { None => {
eprintln!( eprintln!(
@@ -614,41 +370,11 @@ impl DirectorWorker {
conn: &mut DbConnection, conn: &mut DbConnection,
plan: &ProductionPlan, plan: &ProductionPlan,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
// Freie Lagerkapazität: Gesamtbestand minus bereits belegter Bestand let free_capacity = Self::calc_free_capacity(plan);
// (Inventar) minus bereits eingeplante Produktionsmengen. let one_piece_cost = Self::calc_one_piece_cost(plan);
let free_capacity = let max_money_production = Self::calc_max_money_production(plan, one_piece_cost);
plan.stock_size - plan.used_in_stock - plan.running_productions_quantity;
// Stückkosten monetär berechnen. Da money ein f64 ist, arbeiten wir hier ebenfalls let to_produce = (free_capacity.min(max_money_production)).clamp(0, 100);
// mit Gleitkomma und runden erst am Ende auf eine ganze Stückzahl ab.
let one_piece_cost = (plan.certificate * 6) as f64;
let mut max_money_production: i32 = 0;
if one_piece_cost > 0.0 {
if plan.money > 0.0 {
// Anzahl Stück, die sich mit dem verfügbaren Geld finanzieren lassen
max_money_production = (plan.money / one_piece_cost).floor() as i32;
} else {
// Falls das Geld aus der DB unerwartet als 0 eingelesen wurde, aber
// eigentlich ausreichend Guthaben vorhanden ist (bekannter Migrationsfall),
// lassen wir die Geldbegrenzung vorläufig fallen und begrenzen nur über
// Lagerkapazität und Hard-Limit 100.
eprintln!(
"[DirectorWorker] Warnung: money=0 für falukant_user_id={}, \
verwende nur Lagerkapazität als Limit.",
plan.falukant_user_id
);
max_money_production = i32::MAX;
}
}
// Maximale Produktionsmenge begrenzen:
// - nie mehr als der freie Lagerplatz (`free_capacity`)
// - nie mehr als durch das verfügbare Geld finanzierbar
// - absolut maximal 100 Einheiten pro Produktion
let to_produce = free_capacity
.min(max_money_production)
.min(100)
.max(0);
eprintln!( eprintln!(
"[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}", "[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}",
@@ -711,6 +437,30 @@ impl DirectorWorker {
Ok(()) Ok(())
} }
fn calc_free_capacity(plan: &ProductionPlan) -> i32 {
plan.stock_size - plan.used_in_stock - plan.running_productions_quantity
}
fn calc_one_piece_cost(plan: &ProductionPlan) -> f64 {
(plan.certificate * 6) as f64
}
fn calc_max_money_production(plan: &ProductionPlan, one_piece_cost: f64) -> i32 {
if one_piece_cost > 0.0 {
if plan.money > 0.0 {
(plan.money / one_piece_cost).floor() as i32
} else {
eprintln!(
"[DirectorWorker] Warnung: money=0 für falukant_user_id={}, verwende nur Lagerkapazität als Limit.",
plan.falukant_user_id
);
i32::MAX
}
} else {
0
}
}
fn start_transports_stub(&mut self, director: &Director) -> Result<(), DbError> { fn start_transports_stub(&mut self, director: &Director) -> Result<(), DbError> {
self.base self.base
.set_current_step("DirectorWorker: start_transports"); .set_current_step("DirectorWorker: start_transports");
@@ -736,11 +486,6 @@ impl DirectorWorker {
// sein (Arbeitgeber des Directors). // sein (Arbeitgeber des Directors).
let falukant_user_id = if items.is_empty() { let falukant_user_id = if items.is_empty() {
// Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln // Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln
const QUERY_GET_DIRECTOR_USER: &str = r#"
SELECT employer_user_id
FROM falukant_data.director
WHERE id = $1;
"#;
conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?; conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?;
let user_rows = conn.execute("get_director_user", &[&director.id])?; let user_rows = conn.execute("get_director_user", &[&director.id])?;
user_rows user_rows
@@ -754,20 +499,6 @@ impl DirectorWorker {
// Prüfe, ob Transportmittel im aktuellen Branch vorhanden sind // Prüfe, ob Transportmittel im aktuellen Branch vorhanden sind
// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert // Ein Transport ist aktiv, wenn er noch in der Tabelle existiert
const QUERY_COUNT_VEHICLES_IN_BRANCH_REGION: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.vehicle v
JOIN falukant_data.branch b
ON b.region_id = v.region_id
WHERE v.falukant_user_id = $1
AND b.id = $2
AND v.id NOT IN (
SELECT DISTINCT t.vehicle_id
FROM falukant_data.transport t
WHERE t.vehicle_id IS NOT NULL
);
"#;
conn.prepare("count_vehicles_in_branch", QUERY_COUNT_VEHICLES_IN_BRANCH_REGION)?; conn.prepare("count_vehicles_in_branch", QUERY_COUNT_VEHICLES_IN_BRANCH_REGION)?;
let vehicle_count_rows = conn.execute( let vehicle_count_rows = conn.execute(
"count_vehicles_in_branch", "count_vehicles_in_branch",
@@ -976,45 +707,50 @@ impl DirectorWorker {
}) })
} }
fn sell_single_inventory_item( // Helper: compute piece sell price from item fields
&mut self, fn compute_piece_sell_price(item: &InventoryItem) -> f64 {
conn: &mut DbConnection,
item: &InventoryItem,
) -> Result<(), DbError> {
if item.quantity <= 0 {
conn.execute("remove_inventory", &[&item.id])?;
return Ok(());
}
// Neue Preisberechnung gemäß Spezifikation:
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let base_price = item.sell_cost * (item.worth_percent / 100.0); let base_price = item.sell_cost * (item.worth_percent / 100.0);
let min_price = base_price * 0.6;
let max_price = base_price;
let knowledge_factor = item.quality as f64;
min_price + (max_price - min_price) * (knowledge_factor / 100.0)
}
// 2. min = basePrice * 0.6, max = basePrice // Helper: get one_piece_cost from DB row fallback logic
let min_price = base_price * 0.6; fn resolve_one_piece_cost(conn: &mut DbConnection, product_id: i32, fallback: f64) -> Result<f64, DbError> {
let max_price = base_price; conn.prepare("get_product_cost", QUERY_GET_PRODUCT_COST)?;
let rows = conn.execute("get_product_cost", &[&product_id])?;
if let Some(row) = rows.first()
&& let Some(osc) = row.get("original_sell_cost")
&& let Ok(v) = osc.parse::<f64>()
{
return Ok(v);
}
if let Some(row) = rows.first()
&& let Some(sc) = row.get("sell_cost")
&& let Ok(v) = sc.parse::<f64>()
{
return Ok(v);
}
Ok(fallback)
}
// 3. price = min + (max - min) * (knowledgeFactor / 100) // Helper: determine cumulative tax percent for a branch/user
// knowledgeFactor ist hier item.quality fn get_cumulative_tax_percent(conn: &mut DbConnection, branch_id: i32, user_id: i32) -> Result<f64, DbError> {
let knowledge_factor = item.quality as f64; // Default
let piece_sell_price = min_price + (max_price - min_price) * (knowledge_factor / 100.0);
let sell_price = piece_sell_price * item.quantity as f64;
// Steuerberechnung: 1) Region ermitteln, 2) user offices, 3) cumulative tax (mit Befreiungen)
let mut cumulative_tax_percent = DEFAULT_TAX_PERCENT; let mut cumulative_tax_percent = DEFAULT_TAX_PERCENT;
conn.prepare("get_branch_region", "SELECT region_id FROM falukant_data.branch WHERE id = $1;")?; conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)
let branch_rows = conn.execute("get_branch_region", &[&item.branch_id])?; .map_err(|e| DbError::new(format!("[DirectorWorker] prepare get_branch_region: {e}")))?;
let branch_region_id: Option<i32> = branch_rows.get(0).and_then(|r| r.get("region_id")).and_then(|v| v.parse().ok()); let branch_rows = conn.execute("get_branch_region", &[&branch_id])
.map_err(|e| DbError::new(format!("[DirectorWorker] exec get_branch_region branch_id={}: {e}", branch_id)))?;
let branch_region_id: Option<i32> = branch_rows.first().and_then(|r| r.get("region_id")).and_then(|v| v.parse().ok());
if let Some(region_id) = branch_region_id { if let Some(region_id) = branch_region_id {
// user offices conn.prepare("get_user_offices", QUERY_GET_USER_OFFICES)
conn.prepare( .map_err(|e| DbError::new(format!("[DirectorWorker] prepare get_user_offices: {e}")))?;
"get_user_offices", let offices = conn.execute("get_user_offices", &[&user_id])
"SELECT po.id AS office_id, pot.name AS office_name, po.region_id, rt.label_tr AS region_type FROM falukant_data.political_office po JOIN falukant_type.political_office_type pot ON pot.id = po.office_type_id JOIN falukant_data.region r ON r.id = po.region_id JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE po.holder_id = $1 AND (po.end_date IS NULL OR po.end_date > NOW());", .map_err(|e| DbError::new(format!("[DirectorWorker] exec get_user_offices user_id={}: {e}", user_id)))?;
)?;
let offices = conn.execute("get_user_offices", &[&item.user_id])?;
let mut exempt_types: Vec<String> = Vec::new(); let mut exempt_types: Vec<String> = Vec::new();
let mut has_chancellor = false; let mut has_chancellor = false;
@@ -1032,59 +768,69 @@ impl DirectorWorker {
} }
if has_chancellor { if has_chancellor {
cumulative_tax_percent = 0.0; return Ok(0.0);
}
if exempt_types.is_empty() {
conn.prepare("cumulative_tax_no_exempt", QUERY_CUMULATIVE_TAX_NO_EXEMPT)
.map_err(|e| DbError::new(format!("[DirectorWorker] prepare cumulative_tax_no_exempt: {e}")))?;
let res = conn.execute("cumulative_tax_no_exempt", &[&region_id])
.map_err(|e| DbError::new(format!("[DirectorWorker] exec cumulative_tax_no_exempt region_id={}: {e}", region_id)))?;
if let Some(row) = res.first()
&& let Some(tp) = row.get("total_percent")
{
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
}
} else { } else {
if exempt_types.is_empty() { conn.prepare("cumulative_tax_with_exempt", QUERY_CUMULATIVE_TAX_WITH_EXEMPT)
conn.prepare( .map_err(|e| DbError::new(format!("[DirectorWorker] prepare cumulative_tax_with_exempt: {e}")))?;
"cumulative_tax_no_exempt", let exempt_array: Vec<&str> = exempt_types.iter().map(|s| s.as_str()).collect();
"WITH RECURSIVE ancestors AS (SELECT id, parent_id, COALESCE(tax_percent,0.0) AS tax_percent FROM falukant_data.region WHERE id = $1 UNION ALL SELECT r.id, r.parent_id, COALESCE(r.tax_percent,0.0) FROM falukant_data.region r JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;", let res = conn.execute("cumulative_tax_with_exempt", &[&region_id, &exempt_array])
)?; .map_err(|e| DbError::new(format!("[DirectorWorker] exec cumulative_tax_with_exempt region_id={} exempt={:?}: {}", region_id, exempt_array, e)))?;
let res = conn.execute("cumulative_tax_no_exempt", &[&region_id])?; if let Some(row) = res.first() && let Some(tp) = row.get("total_percent") {
if let Some(row) = res.get(0) { cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
if let Some(tp) = row.get("total_percent") {
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
}
}
} else {
conn.prepare(
"cumulative_tax_with_exempt",
"WITH RECURSIVE ancestors AS (SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END AS tax_percent FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE r.id = $1 UNION ALL SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;",
)?;
let exempt_array: Vec<&str> = exempt_types.iter().map(|s| s.as_str()).collect();
let res = conn.execute("cumulative_tax_with_exempt", &[&region_id, &exempt_array])?;
if let Some(row) = res.get(0) {
if let Some(tp) = row.get("total_percent") {
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
}
}
} }
} }
} }
// Produktkosten (original_sell_cost fallback sell_cost) Ok(cumulative_tax_percent)
conn.prepare("get_product_cost", "SELECT original_sell_cost, sell_cost FROM falukant_type.product WHERE id = $1")?; }
let cost_rows = conn.execute("get_product_cost", &[&item.product_id])?;
let mut one_piece_cost = item.sell_cost; fn sell_single_inventory_item(
if let Some(row) = cost_rows.get(0) { &mut self,
if let Some(osc) = row.get("original_sell_cost") { conn: &mut DbConnection,
if let Ok(v) = osc.parse::<f64>() { item: &InventoryItem,
one_piece_cost = v; ) -> Result<(), DbError> {
} if item.quantity <= 0 {
} else if let Some(sc) = row.get("sell_cost") { conn.execute("remove_inventory", &[&item.id])?;
if let Ok(v) = sc.parse::<f64>() { return Ok(());
one_piece_cost = v;
}
}
} }
// cents-based arithmetic // compute piece price and full sell price
let revenue_cents = (sell_price * 100.0).round() as i64; let piece_price = Self::compute_piece_sell_price(item);
let cost_cents = (one_piece_cost * item.quantity as f64 * 100.0).round() as i64;
let profit_cents = (revenue_cents - cost_cents).max(0); let one_piece_cost = Self::resolve_one_piece_cost(conn, item.product_id, item.sell_cost)?;
let tax_cents = ((profit_cents as f64) * cumulative_tax_percent / 100.0).round() as i64; let cumulative_tax_percent = Self::get_cumulative_tax_percent(conn, item.branch_id, item.user_id)?;
// Preis-Inflation: Der Preis wird basierend auf der Steuer inflatiert,
// damit der Netto-Betrag für den Verkäufer gleich bleibt (wie im Frontend)
let inflation_factor = if cumulative_tax_percent >= 100.0 {
1.0
} else {
1.0 / (1.0 - cumulative_tax_percent / 100.0)
};
let adjusted_price_per_unit = (piece_price * inflation_factor * 100.0).round() / 100.0;
let revenue = adjusted_price_per_unit * item.quantity as f64;
// Steuer wird auf Revenue berechnet (nicht auf Profit), wie im Frontend
let revenue_cents = (revenue * 100.0).round() as i64;
let tax_cents = ((revenue * cumulative_tax_percent / 100.0) * 100.0).round() as i64;
let payout_cents = revenue_cents - tax_cents; let payout_cents = revenue_cents - tax_cents;
eprintln!("[DirectorWorker] sell: revenue={:.2}, cost={:.2}, profit_cents={}, tax%={:.2}, tax_cents={}, payout_cents={}", sell_price, one_piece_cost * item.quantity as f64, profit_cents, cumulative_tax_percent, tax_cents, payout_cents); let cost = one_piece_cost * item.quantity as f64;
let profit_cents = revenue_cents - (cost * 100.0).round() as i64;
eprintln!("[DirectorWorker] sell: revenue={:.2}, cost={:.2}, profit_cents={}, tax%={:.2}, tax_cents={}, payout_cents={}", revenue, cost, profit_cents, cumulative_tax_percent, tax_cents, payout_cents);
if tax_cents > 0 { if tax_cents > 0 {
let tax_amount = (tax_cents as f64) / 100.0; let tax_amount = (tax_cents as f64) / 100.0;
@@ -1094,17 +840,15 @@ impl DirectorWorker {
} }
let payout_amount = (payout_cents as f64) / 100.0; let payout_amount = (payout_cents as f64) / 100.0;
if payout_cents != 0 { if payout_cents != 0 && let Err(err) = self.base.change_falukant_user_money(item.user_id, payout_amount, "sell products") {
if let Err(err) = self.base.change_falukant_user_money(item.user_id, payout_amount, "sell products") { eprintln!("[DirectorWorker] Fehler bei change_falukant_user_money (sell products): {err}");
eprintln!("[DirectorWorker] Fehler bei change_falukant_user_money (sell products): {err}");
}
} }
// Debug: Log vor dem DB-Aufruf // Debug: Log vor dem DB-Aufruf
eprintln!( eprintln!(
"[DirectorWorker] sell: user_id={}, revenue={:.2}, tax={:.2}, payout={:.2}, product_id={}", "[DirectorWorker] sell: user_id={}, revenue={:.2}, tax={:.2}, payout={:.2}, product_id={}",
item.user_id, item.user_id,
sell_price, revenue,
(tax_cents as f64) / 100.0, (tax_cents as f64) / 100.0,
payout_amount, payout_amount,
item.product_id item.product_id
@@ -1156,66 +900,22 @@ impl DirectorWorker {
return Ok(0); return Ok(0);
} }
// Regionale worth_percent-Werte für dieses Produkt laden // Load worth_percent by region for this product
conn.prepare( let worth_by_region = Self::get_worth_by_region(conn, falukant_user_id, item.product_id)?;
"get_region_worth_for_product",
QUERY_GET_REGION_WORTH_FOR_PRODUCT,
)?;
let rows = conn.execute(
"get_region_worth_for_product",
&[&falukant_user_id, &item.product_id],
)?;
if rows.is_empty() {
return Ok(0);
}
let mut worth_by_region: HashMap<i32, f64> = HashMap::new();
for row in rows {
let region_id = row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
let percent = row
.get("worth_percent")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0);
if region_id >= 0 {
worth_by_region.insert(region_id, percent);
}
}
if worth_by_region.is_empty() { if worth_by_region.is_empty() {
eprintln!( eprintln!("[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden", item.product_id);
"[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden",
item.product_id
);
return Ok(0); return Ok(0);
} }
eprintln!( eprintln!(
"[DirectorWorker] Gefundene Regionen für Produkt {}: {} Regionen", "[DirectorWorker] Gefundene Regionen für Produkt {}: {} Regionen",
item.product_id, worth_by_region.len() item.product_id,
worth_by_region.len()
); );
// Lokalen Stückpreis berechnen (neue Preisberechnung) // Compute local piece price
let local_percent = worth_by_region let local_percent = worth_by_region.get(&item.region_id).copied().unwrap_or(100.0);
.get(&item.region_id) let local_piece_price = Self::compute_piece_price_for_percent(item, local_percent);
.copied()
.unwrap_or(100.0);
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let local_base_price = item.sell_cost * (local_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let local_min_price = local_base_price * 0.6;
let local_max_price = local_base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
let knowledge_factor = item.quality as f64;
let local_piece_price = local_min_price + (local_max_price - local_min_price) * (knowledge_factor / 100.0);
eprintln!( eprintln!(
"[DirectorWorker] Lokaler Preis für Produkt {}: {:.2} (worth_percent={:.2}, quality={})", "[DirectorWorker] Lokaler Preis für Produkt {}: {:.2} (worth_percent={:.2}, quality={})",
item.product_id, local_piece_price, local_percent, item.quality item.product_id, local_piece_price, local_percent, item.quality
@@ -1232,18 +932,7 @@ impl DirectorWorker {
continue; continue;
} }
// Remote-Stückpreis berechnen (neue Preisberechnung) let remote_piece_price = Self::compute_piece_price_for_percent(item, remote_percent);
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let remote_base_price = item.sell_cost * (remote_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let remote_min_price = remote_base_price * 0.6;
let remote_max_price = remote_base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
let knowledge_factor = item.quality as f64;
let remote_piece_price = remote_min_price + (remote_max_price - remote_min_price) * (knowledge_factor / 100.0);
let delta_per_unit = remote_piece_price - local_piece_price; let delta_per_unit = remote_piece_price - local_piece_price;
eprintln!( eprintln!(
"[DirectorWorker] Region {}: Preis {:.2}, Delta {:.2}", "[DirectorWorker] Region {}: Preis {:.2}, Delta {:.2}",
@@ -1280,10 +969,7 @@ impl DirectorWorker {
} }
// Maximale transportierbare Menge anhand der Kapazität ermitteln // Maximale transportierbare Menge anhand der Kapazität ermitteln
let mut max_capacity: i32 = 0; let max_capacity = Self::calc_max_capacity(&vehicles);
for v in &vehicles {
max_capacity = max_capacity.saturating_add(v.capacity);
}
if max_capacity <= 0 { if max_capacity <= 0 {
continue; continue;
@@ -1295,15 +981,7 @@ impl DirectorWorker {
} }
let extra_revenue = delta_per_unit * qty as f64; let extra_revenue = delta_per_unit * qty as f64;
let total_value = remote_piece_price * qty as f64; let transport_cost = Self::calc_transport_cost(remote_piece_price, qty);
let transport_cost = total_value * 0.01_f64;
// Kostenformel: max(0.01, totalValue * 0.01)
let transport_cost = if transport_cost < 0.01 {
0.01
} else {
transport_cost
};
let net_gain = extra_revenue - transport_cost; let net_gain = extra_revenue - transport_cost;
eprintln!( eprintln!(
"[DirectorWorker] Region {}: extra_revenue={:.2}, transport_cost={:.2}, net_gain={:.2}, qty={}", "[DirectorWorker] Region {}: extra_revenue={:.2}, transport_cost={:.2}, net_gain={:.2}, qty={}",
@@ -1340,50 +1018,12 @@ impl DirectorWorker {
return Ok(0); return Ok(0);
} }
// Nochmals verfügbare Transportmittel für die gewählte Route laden // Build and insert transports for chosen route
let vehicles = Self::get_transport_vehicles_for_route( let shipped = Self::insert_transports_for_route(conn, item, target_region, best_quantity)?;
conn,
falukant_user_id,
item.region_id,
target_region,
)?;
if vehicles.is_empty() {
return Ok(0);
}
// Transporte anlegen, begrenzt durch best_quantity und Kapazitäten
conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?;
let mut remaining = best_quantity;
for v in &vehicles {
if remaining <= 0 {
break;
}
let size = std::cmp::min(remaining, v.capacity);
if size <= 0 {
continue;
}
conn.execute(
"insert_transport",
&[
&item.region_id,
&target_region,
&item.product_id,
&size,
&v.id,
],
)?;
remaining -= size;
}
let shipped = best_quantity - remaining.max(0);
// Inventar sofort reduzieren, nachdem Transporte erfolgreich angelegt wurden // Inventar sofort reduzieren, nachdem Transporte erfolgreich angelegt wurden
// Dies stellt sicher, dass Inventar und Transporte immer konsistent sind // Dies stellt sicher, dass Inventar und Transporte immer konsistent sind
if shipped > 0 { if shipped > 0 {
if shipped >= item.quantity { if shipped >= item.quantity {
// Alles wurde in Transporte umgewandelt, Inventar komplett entfernen // Alles wurde in Transporte umgewandelt, Inventar komplett entfernen
conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?; conn.prepare("remove_inventory", QUERY_REMOVE_INVENTORY)?;
@@ -1411,15 +1051,8 @@ impl DirectorWorker {
source_region: i32, source_region: i32,
target_region: i32, target_region: i32,
) -> Result<Vec<TransportVehicle>, DbError> { ) -> Result<Vec<TransportVehicle>, DbError> {
// Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren // Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren
const QUERY_COUNT_VEHICLES_IN_REGION: &str = r#" conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?;
SELECT COUNT(*) AS count
FROM falukant_data.vehicle v
WHERE v.falukant_user_id = $1
AND v.region_id = $2;
"#;
conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?;
let vehicle_count_rows = conn.execute( let vehicle_count_rows = conn.execute(
"count_vehicles_in_region", "count_vehicles_in_region",
&[&falukant_user_id, &source_region], &[&falukant_user_id, &source_region],
@@ -1436,15 +1069,8 @@ impl DirectorWorker {
source_region, falukant_user_id, vehicle_count source_region, falukant_user_id, vehicle_count
); );
// Debug: Prüfe, ob eine Route existiert // Debug: Prüfe, ob eine Route existiert
const QUERY_CHECK_ROUTE: &str = r#" conn.prepare("check_route", QUERY_CHECK_ROUTE)?;
SELECT COUNT(*) AS count
FROM falukant_data.region_distance rd
WHERE (rd.source_region_id = $1 AND rd.target_region_id = $2)
OR (rd.source_region_id = $2 AND rd.target_region_id = $1);
"#;
conn.prepare("check_route", QUERY_CHECK_ROUTE)?;
let route_rows = conn.execute( let route_rows = conn.execute(
"check_route", "check_route",
&[&source_region, &target_region], &[&source_region, &target_region],
@@ -1494,6 +1120,55 @@ impl DirectorWorker {
Ok(result) Ok(result)
} }
// Helper: load worth_percent values for a product across all regions of a user's branches
fn get_worth_by_region(conn: &mut DbConnection, falukant_user_id: i32, product_id: i32) -> Result<HashMap<i32,f64>, DbError> {
conn.prepare("get_region_worth_for_product", QUERY_GET_REGION_WORTH_FOR_PRODUCT)?;
let rows = conn.execute("get_region_worth_for_product", &[&falukant_user_id, &product_id])?;
let mut map = HashMap::new();
for row in rows {
if let Some(rid) = row.get("region_id").and_then(|v| v.parse::<i32>().ok()) {
let percent = row.get("worth_percent").and_then(|v| v.parse::<f64>().ok()).unwrap_or(100.0);
map.insert(rid, percent);
}
}
Ok(map)
}
// Helper: compute piece price for an arbitrary worth_percent
fn compute_piece_price_for_percent(item: &InventoryItem, percent: f64) -> f64 {
let base_price = item.sell_cost * (percent / 100.0);
let min_price = base_price * 0.6;
let max_price = base_price;
let knowledge_factor = item.quality as f64;
min_price + (max_price - min_price) * (knowledge_factor / 100.0)
}
fn calc_max_capacity(vehicles: &[TransportVehicle]) -> i32 {
vehicles.iter().fold(0i32, |acc, v| acc.saturating_add(v.capacity))
}
fn calc_transport_cost(remote_piece_price: f64, qty: i32) -> f64 {
let total_value = remote_piece_price * qty as f64;
let transport_cost = total_value * 0.01_f64;
if transport_cost < 0.01 { 0.01 } else { transport_cost }
}
fn insert_transports_for_route(conn: &mut DbConnection, item: &InventoryItem, target_region: i32, desired: i32) -> Result<i32, DbError> {
let vehicles = Self::get_transport_vehicles_for_route(conn, item.user_id, item.region_id, target_region)?;
if vehicles.is_empty() { return Ok(0); }
conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?;
let mut remaining = desired;
for v in &vehicles {
if remaining <= 0 { break; }
let size = std::cmp::min(remaining, v.capacity);
if size <= 0 { continue; }
conn.execute("insert_transport", &[&item.region_id, &target_region, &item.product_id, &size, &v.id])?;
remaining -= size;
}
Ok(desired - remaining.max(0))
}
/// Plant leere Transporte, um Fahrzeuge zurückzuholen, wenn: /// Plant leere Transporte, um Fahrzeuge zurückzuholen, wenn:
/// - Keine Transportmittel im aktuellen Branch vorhanden sind /// - Keine Transportmittel im aktuellen Branch vorhanden sind
/// - Aber bessere Verkaufspreise in anderen Branches existieren /// - Aber bessere Verkaufspreise in anderen Branches existieren
@@ -1504,14 +1179,8 @@ impl DirectorWorker {
falukant_user_id: i32, falukant_user_id: i32,
current_branch_id: i32, current_branch_id: i32,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
// Aktuelle Branch-Region ermitteln // Aktuelle Branch-Region ermitteln
const QUERY_GET_BRANCH_REGION: &str = r#" conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
SELECT region_id
FROM falukant_data.branch
WHERE id = $1;
"#;
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
let branch_rows = conn.execute("get_branch_region", &[&current_branch_id])?; let branch_rows = conn.execute("get_branch_region", &[&current_branch_id])?;
let current_region_id = match branch_rows.into_iter().next() { let current_region_id = match branch_rows.into_iter().next() {
@@ -1585,14 +1254,6 @@ impl DirectorWorker {
// Berechne Preisvorteil (vereinfacht: verwende worth_percent-Differenz) // Berechne Preisvorteil (vereinfacht: verwende worth_percent-Differenz)
// Hole worth_percent für beide Regionen (für ein beliebiges Produkt) // Hole worth_percent für beide Regionen (für ein beliebiges Produkt)
let mut price_delta = 0.0; let mut price_delta = 0.0;
const QUERY_GET_AVERAGE_WORTH: &str = r#"
SELECT
AVG(CASE WHEN region_id = $1 THEN worth_percent ELSE NULL END) AS current_worth,
AVG(CASE WHEN region_id = $2 THEN worth_percent ELSE NULL END) AS target_worth
FROM falukant_data.town_product_worth
WHERE region_id IN ($1, $2);
"#;
conn.prepare("get_average_worth", QUERY_GET_AVERAGE_WORTH)?; conn.prepare("get_average_worth", QUERY_GET_AVERAGE_WORTH)?;
let worth_rows = conn.execute( let worth_rows = conn.execute(
"get_average_worth", "get_average_worth",
@@ -1667,12 +1328,6 @@ impl DirectorWorker {
inventory_id: i32, inventory_id: i32,
new_quantity: i32, new_quantity: i32,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
const QUERY_UPDATE_INVENTORY_QTY: &str = r#"
UPDATE falukant_data.inventory
SET quantity = $2
WHERE id = $1;
"#;
conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?; conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?;
conn.execute("update_inventory_qty", &[&inventory_id, &new_quantity])?; conn.execute("update_inventory_qty", &[&inventory_id, &new_quantity])?;

File diff suppressed because it is too large Load Diff

View File

@@ -5,48 +5,17 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_NEW_HOUSE_DATA,
QUERY_ADD_NEW_BUYABLE_HOUSE,
QUERY_UPDATE_BUYABLE_HOUSE_STATE,
QUERY_UPDATE_USER_HOUSE_STATE,
};
pub struct HouseWorker { pub struct HouseWorker {
base: BaseWorker, base: BaseWorker,
} }
// SQL-Queries analog zu `houseworker.h`
const QUERY_GET_NEW_HOUSE_DATA: &str = r#"
SELECT
h.id AS house_id
FROM
falukant_type.house AS h
WHERE
random() < 0.0001
AND label_tr <> 'under_bridge';
"#;
const QUERY_ADD_NEW_BUYABLE_HOUSE: &str = r#"
INSERT INTO falukant_data.buyable_house (house_type_id)
VALUES ($1);
"#;
const QUERY_UPDATE_BUYABLE_HOUSE_STATE: &str = r#"
UPDATE falukant_data.buyable_house
SET roof_condition = ROUND(roof_condition - random() * (3 + 0 * id)),
floor_condition = ROUND(floor_condition - random() * (3 + 0 * id)),
wall_condition = ROUND(wall_condition - random() * (3 + 0 * id)),
window_condition = ROUND(window_condition - random() * (3 + 0 * id));
"#;
const QUERY_UPDATE_USER_HOUSE_STATE: &str = r#"
UPDATE falukant_data.user_house
SET roof_condition = ROUND(roof_condition - random() * (3 + 0 * id)),
floor_condition = ROUND(floor_condition - random() * (3 + 0 * id)),
wall_condition = ROUND(wall_condition - random() * (3 + 0 * id)),
window_condition = ROUND(window_condition - random() * (3 + 0 * id))
WHERE house_type_id NOT IN (
SELECT id
FROM falukant_type.house h
WHERE h.label_tr = 'under_bridge'
);
"#;
impl HouseWorker { impl HouseWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self { Self {

View File

@@ -11,6 +11,7 @@ mod user_character;
mod transport; mod transport;
mod weather; mod weather;
mod events; mod events;
mod sql;
pub use base::Worker; pub use base::Worker;
pub use crate::db::ConnectionPool; pub use crate::db::ConnectionPool;

View File

@@ -6,6 +6,23 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_COUNT_OFFICES_PER_REGION,
QUERY_FIND_OFFICE_GAPS,
QUERY_SELECT_NEEDED_ELECTIONS,
QUERY_INSERT_CANDIDATES,
QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES,
QUERY_PROCESS_EXPIRED_AND_FILL,
QUERY_USERS_IN_CITIES_OF_REGIONS,
QUERY_NOTIFY_OFFICE_EXPIRATION,
QUERY_NOTIFY_ELECTION_CREATED,
QUERY_NOTIFY_OFFICE_FILLED,
QUERY_GET_USERS_WITH_EXPIRING_OFFICES,
QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS,
QUERY_GET_USERS_WITH_FILLED_OFFICES,
QUERY_PROCESS_ELECTIONS,
QUERY_TRIM_EXCESS_OFFICES_GLOBAL,
};
pub struct PoliticsWorker { pub struct PoliticsWorker {
base: BaseWorker, base: BaseWorker,
@@ -42,466 +59,6 @@ struct Office {
// --- SQL-Konstanten (1:1 aus politics_worker.h übernommen) ------------------ // --- SQL-Konstanten (1:1 aus politics_worker.h übernommen) ------------------
const QUERY_COUNT_OFFICES_PER_REGION: &str = r#"
WITH
seats_per_region AS (
SELECT
pot.id AS office_type_id,
rt.id AS region_id,
pot.seats_per_region AS seats_total
FROM falukant_type.political_office_type AS pot
JOIN falukant_type.region AS rt
ON pot.region_type = rt.label_tr
),
occupied AS (
SELECT
po.office_type_id,
po.region_id,
COUNT(*) AS occupied_count
FROM falukant_data.political_office AS po
GROUP BY po.office_type_id, po.region_id
),
combined AS (
SELECT
spr.region_id,
spr.seats_total AS required_count,
COALESCE(o.occupied_count, 0) AS occupied_count
FROM seats_per_region AS spr
LEFT JOIN occupied AS o
ON spr.office_type_id = o.office_type_id
AND spr.region_id = o.region_id
)
SELECT
region_id,
SUM(required_count) AS required_count,
SUM(occupied_count) AS occupied_count
FROM combined
GROUP BY region_id;
"#;
/// Findet alle Kombinationen aus Amtstyp und Region, für die laut
/// `seats_per_region` mehr Sitze existieren sollten, als aktuell in
/// `falukant_data.political_office` belegt sind. Es werden ausschließlich
/// positive Differenzen (Gaps) zurückgegeben wenn `occupied > required`
/// (z.B. nach Reduktion der Sitzzahl), wird **nichts** gelöscht und die
/// Kombination erscheint hier nicht.
const QUERY_FIND_OFFICE_GAPS: &str = r#"
WITH
seats AS (
SELECT
pot.id AS office_type_id,
rt.id AS region_id,
pot.seats_per_region AS seats_total
FROM falukant_type.political_office_type AS pot
JOIN falukant_type.region AS rt
ON pot.region_type = rt.label_tr
),
occupied AS (
SELECT
po.office_type_id,
po.region_id,
COUNT(*) AS occupied_count
FROM falukant_data.political_office AS po
GROUP BY po.office_type_id, po.region_id
)
SELECT
s.office_type_id,
s.region_id,
(s.seats_total - COALESCE(o.occupied_count, 0)) AS gaps
FROM seats AS s
LEFT JOIN occupied AS o
ON s.office_type_id = o.office_type_id
AND s.region_id = o.region_id
WHERE (s.seats_total - COALESCE(o.occupied_count, 0)) > 0;
"#;
const QUERY_SELECT_NEEDED_ELECTIONS: &str = r#"
WITH
target_date AS (
SELECT NOW()::date AS election_date
),
expired_today AS (
DELETE FROM falukant_data.political_office AS po
USING falukant_type.political_office_type AS pot
WHERE po.office_type_id = pot.id
AND (po.created_at + (pot.term_length * INTERVAL '1 day'))::date
= (SELECT election_date FROM target_date)
RETURNING
pot.id AS office_type_id,
po.region_id AS region_id
),
gaps_per_region AS (
SELECT
office_type_id,
region_id,
COUNT(*) AS gaps
FROM expired_today
GROUP BY office_type_id, region_id
),
to_schedule AS (
SELECT
g.office_type_id,
g.region_id,
g.gaps,
td.election_date
FROM gaps_per_region AS g
CROSS JOIN target_date AS td
WHERE NOT EXISTS (
SELECT 1
FROM falukant_data.election AS e
WHERE e.office_type_id = g.office_type_id
AND e.region_id = g.region_id
AND e.date::date = td.election_date
)
),
new_elections AS (
INSERT INTO falukant_data.election
(office_type_id, date, posts_to_fill, created_at, updated_at, region_id)
SELECT
ts.office_type_id,
ts.election_date,
ts.gaps,
NOW(),
NOW(),
ts.region_id
FROM to_schedule AS ts
RETURNING
id AS election_id,
region_id,
posts_to_fill
)
SELECT
ne.election_id,
ne.region_id,
ne.posts_to_fill
FROM new_elections AS ne
ORDER BY ne.region_id, ne.election_id;
"#;
const QUERY_INSERT_CANDIDATES: &str = r#"
INSERT INTO falukant_data.candidate
(election_id, character_id, created_at, updated_at)
SELECT
$1 AS election_id,
sub.id AS character_id,
NOW() AS created_at,
NOW() AS updated_at
FROM (
WITH RECURSIVE region_tree AS (
SELECT r.id
FROM falukant_data.region AS r
WHERE r.id = $2
UNION ALL
SELECT r2.id
FROM falukant_data.region AS r2
JOIN region_tree AS rt
ON r2.parent_id = rt.id
)
SELECT ch.id
FROM falukant_data.character AS ch
JOIN region_tree AS rt2
ON ch.region_id = rt2.id
WHERE ch.user_id IS NULL
AND ch.birthdate <= NOW() - INTERVAL '21 days'
AND ch.title_of_nobility IN (
SELECT id
FROM falukant_type.title
WHERE label_tr != 'noncivil'
)
ORDER BY RANDOM()
LIMIT ($3 * 2)
) AS sub(id);
"#;
/// Wahlen finden, für die noch keine Kandidaten existieren.
/// Das umfasst sowohl frisch eingetragene Wahlen als auch manuell
/// angelegte Wahlen, solange:
/// - das Wahl-Datum heute oder in der Zukunft liegt und
/// - noch kein Eintrag in `falukant_data.candidate` existiert.
const QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES: &str = r#"
SELECT
e.id AS election_id,
e.region_id AS region_id,
e.posts_to_fill
FROM falukant_data.election AS e
WHERE e.region_id IS NOT NULL
AND e.posts_to_fill > 0
AND e.date::date >= CURRENT_DATE
AND NOT EXISTS (
SELECT 1
FROM falukant_data.candidate AS c
WHERE c.election_id = e.id
);
"#;
const QUERY_PROCESS_EXPIRED_AND_FILL: &str = r#"
WITH
expired_offices AS (
DELETE FROM falukant_data.political_office AS po
USING falukant_type.political_office_type AS pot
WHERE po.office_type_id = pot.id
AND (po.created_at + (pot.term_length * INTERVAL '1 day')) <= NOW()
RETURNING
pot.id AS office_type_id,
po.region_id AS region_id
),
distinct_types AS (
SELECT DISTINCT office_type_id, region_id FROM expired_offices
),
votes_per_candidate AS (
SELECT
dt.office_type_id,
dt.region_id,
c.character_id,
COUNT(v.id) AS vote_count
FROM distinct_types AS dt
JOIN falukant_data.election AS e
ON e.office_type_id = dt.office_type_id
JOIN falukant_data.vote AS v
ON v.election_id = e.id
JOIN falukant_data.candidate AS c
ON c.election_id = e.id
AND c.id = v.candidate_id
WHERE e.date >= (NOW() - INTERVAL '30 days')
GROUP BY dt.office_type_id, dt.region_id, c.character_id
),
ranked_winners AS (
SELECT
vpc.office_type_id,
vpc.region_id,
vpc.character_id,
ROW_NUMBER() OVER (
PARTITION BY vpc.office_type_id, vpc.region_id
ORDER BY vpc.vote_count DESC
) AS rn
FROM votes_per_candidate AS vpc
),
selected_winners AS (
SELECT
rw.office_type_id,
rw.region_id,
rw.character_id
FROM ranked_winners AS rw
JOIN falukant_type.political_office_type AS pot
ON pot.id = rw.office_type_id
WHERE rw.rn <= pot.seats_per_region
),
insert_winners AS (
INSERT INTO falukant_data.political_office
(office_type_id, character_id, created_at, updated_at, region_id)
SELECT
sw.office_type_id,
sw.character_id,
NOW(),
NOW(),
sw.region_id
FROM selected_winners AS sw
RETURNING id AS new_office_id, office_type_id, character_id, region_id
),
count_inserted AS (
SELECT
office_type_id,
region_id,
COUNT(*) AS inserted_count
FROM insert_winners
GROUP BY office_type_id, region_id
),
needed_to_fill AS (
SELECT
dt.office_type_id,
dt.region_id,
(pot.seats_per_region - COALESCE(ci.inserted_count, 0)) AS gaps
FROM distinct_types AS dt
JOIN falukant_type.political_office_type AS pot
ON pot.id = dt.office_type_id
LEFT JOIN count_inserted AS ci
ON ci.office_type_id = dt.office_type_id
AND ci.region_id = dt.region_id
WHERE (pot.seats_per_region - COALESCE(ci.inserted_count, 0)) > 0
),
random_candidates AS (
SELECT
rtf.office_type_id,
rtf.region_id,
ch.id AS character_id,
ROW_NUMBER() OVER (
PARTITION BY rtf.office_type_id, rtf.region_id
ORDER BY RANDOM()
) AS rn
FROM needed_to_fill AS rtf
JOIN falukant_data.character AS ch
ON ch.region_id = rtf.region_id
AND ch.user_id IS NULL
AND ch.birthdate <= NOW() - INTERVAL '21 days'
AND ch.title_of_nobility IN (
SELECT id FROM falukant_type.title WHERE label_tr != 'noncivil'
)
AND NOT EXISTS (
SELECT 1
FROM falukant_data.political_office AS po2
JOIN falukant_type.political_office_type AS pot2
ON pot2.id = po2.office_type_id
WHERE po2.character_id = ch.id
AND (po2.created_at + (pot2.term_length * INTERVAL '1 day')) >
NOW() + INTERVAL '2 days'
)
),
insert_random AS (
INSERT INTO falukant_data.political_office
(office_type_id, character_id, created_at, updated_at, region_id)
SELECT
rc.office_type_id,
rc.character_id,
NOW(),
NOW(),
rc.region_id
FROM random_candidates AS rc
JOIN needed_to_fill AS rtf
ON rtf.office_type_id = rc.office_type_id
AND rtf.region_id = rc.region_id
WHERE rc.rn <= rtf.gaps
RETURNING id AS new_office_id, office_type_id, character_id, region_id
)
SELECT
new_office_id AS office_id,
office_type_id,
character_id,
region_id
FROM insert_winners
UNION ALL
SELECT
new_office_id AS office_id,
office_type_id,
character_id,
region_id
FROM insert_random;
"#;
const QUERY_USERS_IN_CITIES_OF_REGIONS: &str = r#"
WITH RECURSIVE region_tree AS (
SELECT id
FROM falukant_data.region
WHERE id = $1
UNION ALL
SELECT r2.id
FROM falukant_data.region AS r2
JOIN region_tree AS rt
ON r2.parent_id = rt.id
)
SELECT DISTINCT ch.user_id
FROM falukant_data.character AS ch
JOIN region_tree AS rt2
ON ch.region_id = rt2.id
WHERE ch.user_id IS NOT NULL;
"#;
const QUERY_NOTIFY_OFFICE_EXPIRATION: &str = r#"
INSERT INTO falukant_log.notification
(user_id, tr, created_at, updated_at)
SELECT
po.character_id,
'notify_office_expiring',
NOW(),
NOW()
FROM falukant_data.political_office AS po
JOIN falukant_type.political_office_type AS pot
ON po.office_type_id = pot.id
WHERE (po.created_at + (pot.term_length * INTERVAL '1 day'))
BETWEEN (NOW() + INTERVAL '2 days')
AND (NOW() + INTERVAL '2 days' + INTERVAL '1 second');
"#;
const QUERY_NOTIFY_ELECTION_CREATED: &str = r#"
INSERT INTO falukant_log.notification
(user_id, tr, created_at, updated_at)
VALUES
($1, 'notify_election_created', NOW(), NOW());
"#;
const QUERY_NOTIFY_OFFICE_FILLED: &str = r#"
INSERT INTO falukant_log.notification
(user_id, tr, created_at, updated_at)
VALUES
($1, 'notify_office_filled', NOW(), NOW());
"#;
const QUERY_GET_USERS_WITH_EXPIRING_OFFICES: &str = r#"
SELECT DISTINCT ch.user_id
FROM falukant_data.political_office AS po
JOIN falukant_type.political_office_type AS pot
ON po.office_type_id = pot.id
JOIN falukant_data.character AS ch
ON po.character_id = ch.id
WHERE ch.user_id IS NOT NULL
AND (po.created_at + (pot.term_length * INTERVAL '1 day'))
BETWEEN (NOW() + INTERVAL '2 days')
AND (NOW() + INTERVAL '2 days' + INTERVAL '1 second');
"#;
const QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS: &str = r#"
SELECT DISTINCT ch.user_id
FROM falukant_data.election AS e
JOIN falukant_data.character AS ch
ON ch.region_id = e.region_id
WHERE ch.user_id IS NOT NULL
AND e.date >= NOW() - INTERVAL '1 day';
"#;
const QUERY_GET_USERS_WITH_FILLED_OFFICES: &str = r#"
SELECT DISTINCT ch.user_id
FROM falukant_data.political_office AS po
JOIN falukant_data.character AS ch
ON po.character_id = ch.id
WHERE ch.user_id IS NOT NULL
AND po.created_at >= NOW() - INTERVAL '1 minute';
"#;
const QUERY_PROCESS_ELECTIONS: &str = r#"
SELECT office_id, office_type_id, character_id, region_id
FROM falukant_data.process_elections();
"#;
/// Schneidet für alle Amtstyp/Region-Kombinationen überzählige Einträge in
/// `falukant_data.political_office` ab, so dass höchstens
/// `seats_per_region` Ämter pro Kombination übrig bleiben.
///
/// Die Auswahl, welche Ämter entfernt werden, erfolgt deterministisch über
/// `created_at DESC`: die **neuesten** Ämter bleiben bevorzugt im Amt,
/// ältere Einträge werden zuerst entfernt. Damit lässt sich das Verhalten
/// später leicht anpassen (z.B. nach bestimmten Prioritäten).
const QUERY_TRIM_EXCESS_OFFICES_GLOBAL: &str = r#"
WITH seats AS (
SELECT
pot.id AS office_type_id,
rt.id AS region_id,
pot.seats_per_region AS seats_total
FROM falukant_type.political_office_type AS pot
JOIN falukant_type.region AS rt
ON pot.region_type = rt.label_tr
),
ranked AS (
SELECT
po.id,
po.office_type_id,
po.region_id,
s.seats_total,
ROW_NUMBER() OVER (
PARTITION BY po.office_type_id, po.region_id
ORDER BY po.created_at DESC
) AS rn
FROM falukant_data.political_office AS po
JOIN seats AS s
ON s.office_type_id = po.office_type_id
AND s.region_id = po.region_id
),
to_delete AS (
SELECT id
FROM ranked
WHERE rn > seats_total
)
DELETE FROM falukant_data.political_office
WHERE id IN (SELECT id FROM to_delete);
"#;
impl PoliticsWorker { impl PoliticsWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -542,11 +99,11 @@ impl PoliticsWorker {
broker: &MessageBroker, broker: &MessageBroker,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
// 1) Optional: Positionen evaluieren (aktuell nur Logging/Struktur) // 1) Optional: Positionen evaluieren (aktuell nur Logging/Struktur)
let _ = Self::evaluate_political_positions(pool)?; Self::evaluate_political_positions(pool)?;
// 2) Schema-Änderungen abgleichen: neue / zusätzliche Ämter anlegen, // 2) Schema-Änderungen abgleichen: neue / zusätzliche Ämter anlegen,
// ohne bestehende Amtsinhaber bei Reduktion zu entfernen. // ohne bestehende Amtsinhaber bei Reduktion zu entfernen.
let _ = Self::sync_offices_with_types(pool)?; Self::sync_offices_with_types(pool)?;
// 3) Ämter, die bald auslaufen, benachrichtigen // 3) Ämter, die bald auslaufen, benachrichtigen
Self::notify_office_expirations(pool, broker)?; Self::notify_office_expirations(pool, broker)?;
@@ -608,8 +165,9 @@ impl PoliticsWorker {
conn.prepare( conn.prepare(
"count_offices_per_region", "count_offices_per_region",
QUERY_COUNT_OFFICES_PER_REGION, QUERY_COUNT_OFFICES_PER_REGION,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare count_offices_per_region: {e}")))?;
let rows = conn.execute("count_offices_per_region", &[])?; let rows = conn.execute("count_offices_per_region", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec count_offices_per_region: {e}")))?;
let mut result = Vec::with_capacity(rows.len()); let mut result = Vec::with_capacity(rows.len());
for row in rows { for row in rows {
@@ -645,7 +203,7 @@ impl PoliticsWorker {
conn.prepare( conn.prepare(
"trim_excess_offices_global", "trim_excess_offices_global",
QUERY_TRIM_EXCESS_OFFICES_GLOBAL, QUERY_TRIM_EXCESS_OFFICES_GLOBAL,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare trim_excess_offices_global: {e}")))?;
if let Err(err) = conn.execute("trim_excess_offices_global", &[]) { if let Err(err) = conn.execute("trim_excess_offices_global", &[]) {
eprintln!( eprintln!(
@@ -672,8 +230,10 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("find_office_gaps", QUERY_FIND_OFFICE_GAPS)?; conn.prepare("find_office_gaps", QUERY_FIND_OFFICE_GAPS)
let rows = conn.execute("find_office_gaps", &[])?; .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare find_office_gaps: {e}")))?;
let rows = conn.execute("find_office_gaps", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec find_office_gaps: {e}")))?;
if rows.is_empty() { if rows.is_empty() {
return Ok(()); return Ok(());
@@ -755,8 +315,10 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("select_needed_elections", QUERY_SELECT_NEEDED_ELECTIONS)?; conn.prepare("select_needed_elections", QUERY_SELECT_NEEDED_ELECTIONS)
let rows = conn.execute("select_needed_elections", &[])?; .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare select_needed_elections: {e}")))?;
let rows = conn.execute("select_needed_elections", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec select_needed_elections: {e}")))?;
let mut elections = Vec::with_capacity(rows.len()); let mut elections = Vec::with_capacity(rows.len());
for row in rows { for row in rows {
@@ -785,8 +347,9 @@ impl PoliticsWorker {
conn.prepare( conn.prepare(
"select_elections_needing_candidates", "select_elections_needing_candidates",
QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES, QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare select_elections_needing_candidates: {e}")))?;
let rows = conn.execute("select_elections_needing_candidates", &[])?; let rows = conn.execute("select_elections_needing_candidates", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec select_elections_needing_candidates: {e}")))?;
let mut elections = Vec::with_capacity(rows.len()); let mut elections = Vec::with_capacity(rows.len());
for row in rows { for row in rows {
@@ -813,13 +376,14 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("insert_candidates", QUERY_INSERT_CANDIDATES)?; conn.prepare("insert_candidates", QUERY_INSERT_CANDIDATES)
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare insert_candidates: {e}")))?;
for e in elections { for e in elections {
conn.execute( conn.execute(
"insert_candidates", "insert_candidates",
&[&e.election_id, &e.region_id, &e.posts_to_fill], &[&e.election_id, &e.region_id, &e.posts_to_fill],
)?; ).map_err(|err| DbError::new(format!("[PoliticsWorker] exec insert_candidates eid={} rid={}: {}", e.election_id, e.region_id, err)))?;
} }
Ok(()) Ok(())
@@ -832,8 +396,10 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("process_expired_and_fill", QUERY_PROCESS_EXPIRED_AND_FILL)?; conn.prepare("process_expired_and_fill", QUERY_PROCESS_EXPIRED_AND_FILL)
let rows = conn.execute("process_expired_and_fill", &[])?; .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare process_expired_and_fill: {e}")))?;
let rows = conn.execute("process_expired_and_fill", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec process_expired_and_fill: {e}")))?;
Ok(rows Ok(rows
.into_iter() .into_iter()
@@ -876,14 +442,17 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("notify_office_expiration", QUERY_NOTIFY_OFFICE_EXPIRATION)?; conn.prepare("notify_office_expiration", QUERY_NOTIFY_OFFICE_EXPIRATION)
conn.execute("notify_office_expiration", &[])?; .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_office_expiration: {e}")))?;
conn.execute("notify_office_expiration", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_office_expiration: {e}")))?;
conn.prepare( conn.prepare(
"get_users_with_expiring_offices", "get_users_with_expiring_offices",
QUERY_GET_USERS_WITH_EXPIRING_OFFICES, QUERY_GET_USERS_WITH_EXPIRING_OFFICES,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_with_expiring_offices: {e}")))?;
let rows = conn.execute("get_users_with_expiring_offices", &[])?; let rows = conn.execute("get_users_with_expiring_offices", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_with_expiring_offices: {e}")))?;
for row in rows { for row in rows {
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {
@@ -905,17 +474,20 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("notify_election_created", QUERY_NOTIFY_ELECTION_CREATED)?; conn.prepare("notify_election_created", QUERY_NOTIFY_ELECTION_CREATED)
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_election_created: {e}")))?;
for uid in user_ids { for uid in user_ids {
conn.execute("notify_election_created", &[uid])?; conn.execute("notify_election_created", &[uid])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_election_created uid={}: {}", uid, e)))?;
} }
conn.prepare( conn.prepare(
"get_users_in_regions_with_elections", "get_users_in_regions_with_elections",
QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS, QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_in_regions_with_elections: {e}")))?;
let rows = conn.execute("get_users_in_regions_with_elections", &[])?; let rows = conn.execute("get_users_in_regions_with_elections", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_in_regions_with_elections: {e}")))?;
for row in rows { for row in rows {
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {
@@ -937,7 +509,8 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("notify_office_filled", QUERY_NOTIFY_OFFICE_FILLED)?; conn.prepare("notify_office_filled", QUERY_NOTIFY_OFFICE_FILLED)
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_office_filled: {e}")))?;
for office in new_offices { for office in new_offices {
// Debug-Logging mit allen Feldern, damit sie aktiv genutzt werden // Debug-Logging mit allen Feldern, damit sie aktiv genutzt werden
@@ -945,14 +518,16 @@ impl PoliticsWorker {
"[PoliticsWorker] Office filled: id={}, type={}, character={}, region={}", "[PoliticsWorker] Office filled: id={}, type={}, character={}, region={}",
office.office_id, office.office_type_id, office.character_id, office.region_id office.office_id, office.office_type_id, office.character_id, office.region_id
); );
conn.execute("notify_office_filled", &[&office.character_id])?; conn.execute("notify_office_filled", &[&office.character_id])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_office_filled office_id={} character_id={}: {}", office.office_id, office.character_id, e)))?;
} }
conn.prepare( conn.prepare(
"get_users_with_filled_offices", "get_users_with_filled_offices",
QUERY_GET_USERS_WITH_FILLED_OFFICES, QUERY_GET_USERS_WITH_FILLED_OFFICES,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_with_filled_offices: {e}")))?;
let rows = conn.execute("get_users_with_filled_offices", &[])?; let rows = conn.execute("get_users_with_filled_offices", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_with_filled_offices: {e}")))?;
for row in rows { for row in rows {
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {

View File

@@ -7,6 +7,14 @@ use std::time::{Duration, Instant};
use crate::db::ConnectionPool; use crate::db::ConnectionPool;
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_FINISHED_PRODUCTIONS,
QUERY_GET_AVAILABLE_STOCKS,
QUERY_DELETE_PRODUCTION,
QUERY_INSERT_INVENTORY,
QUERY_INSERT_UPDATE_PRODUCTION_LOG,
QUERY_ADD_OVERPRODUCTION_NOTIFICATION,
};
/// Abbildet eine abgeschlossene Produktion aus der Datenbank. /// Abbildet eine abgeschlossene Produktion aus der Datenbank.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -28,128 +36,6 @@ struct StockInfo {
filled: i32, filled: i32,
} }
// SQL-Queries analog zur C++-Implementierung
// Wichtig: Pro `production.id` darf hier **genau eine Zeile** zurückkommen.
// Durch die Joins auf Director/Knowledge/Wetter kann es sonst zu Mehrfachzeilen mit
// unterschiedlicher berechneter Qualität kommen. Deshalb wird die Qualität
// über MAX() aggregiert und nach `production_id` gruppiert.
const QUERY_GET_FINISHED_PRODUCTIONS: &str = r#"
SELECT
p.id AS production_id,
p.branch_id,
p.product_id,
p.quantity,
p.start_timestamp,
pr.production_time,
-- Aggregierte Qualitätsbewertung pro Produktion inkl. Wettereinfluss
MAX(
GREATEST(
0,
LEAST(
100,
ROUND(
(
CASE
WHEN k2.id IS NOT NULL
THEN (k.knowledge * 2 + k2.knowledge) / 3
ELSE k.knowledge
END
)::numeric
+ COALESCE(pwe.quality_effect, 0) * 2.5
)
)
)::int
) AS quality,
br.region_id,
br.falukant_user_id AS user_id
FROM falukant_data.production p
JOIN falukant_type.product pr
ON p.product_id = pr.id
JOIN falukant_data.branch br
ON p.branch_id = br.id
JOIN falukant_data.character c
ON c.user_id = br.falukant_user_id
JOIN falukant_data.knowledge k
ON p.product_id = k.product_id
AND k.character_id = c.id
JOIN falukant_data.stock s
ON s.branch_id = br.id
-- Optionaler Wettereinfluss: pro (Produkt, Wetter) genau ein Datensatz
LEFT JOIN falukant_type.product_weather_effect pwe
ON pwe.product_id = p.product_id
AND pwe.weather_type_id = p.weather_type_id
LEFT JOIN falukant_data.director d
ON d.employer_user_id = c.user_id
LEFT JOIN falukant_data.knowledge k2
ON k2.character_id = d.director_character_id
AND k2.product_id = p.product_id
WHERE p.start_timestamp + INTERVAL '1 minute' * pr.production_time <= NOW()
GROUP BY
p.id,
p.branch_id,
p.product_id,
p.quantity,
p.start_timestamp,
pr.production_time,
br.region_id,
br.falukant_user_id
ORDER BY p.start_timestamp;
"#;
const QUERY_GET_AVAILABLE_STOCKS: &str = r#"
SELECT
stock.id,
stock.quantity AS total_capacity,
(
SELECT COALESCE(SUM(inventory.quantity), 0)
FROM falukant_data.inventory
WHERE inventory.stock_id = stock.id
) AS filled,
stock.branch_id
FROM falukant_data.stock stock
JOIN falukant_data.branch branch
ON stock.branch_id = branch.id
WHERE branch.id = $1
ORDER BY total_capacity DESC;
"#;
const QUERY_DELETE_PRODUCTION: &str = r#"
DELETE FROM falukant_data.production
WHERE id = $1;
"#;
const QUERY_INSERT_INVENTORY: &str = r#"
INSERT INTO falukant_data.inventory (
stock_id,
product_id,
quantity,
quality,
produced_at
) VALUES ($1, $2, $3, $4, NOW());
"#;
const QUERY_INSERT_UPDATE_PRODUCTION_LOG: &str = r#"
INSERT INTO falukant_log.production (
region_id,
product_id,
quantity,
producer_id,
production_date
) VALUES ($1, $2, $3, $4, CURRENT_DATE)
ON CONFLICT (producer_id, product_id, region_id, production_date)
DO UPDATE
SET quantity = falukant_log.production.quantity + EXCLUDED.quantity;
"#;
const QUERY_ADD_OVERPRODUCTION_NOTIFICATION: &str = r#"
INSERT INTO falukant_log.notification (
user_id,
tr,
shown,
created_at,
updated_at
) VALUES ($1, $2, FALSE, NOW(), NOW());
"#;
pub struct ProduceWorker { pub struct ProduceWorker {
base: BaseWorker, base: BaseWorker,
@@ -240,6 +126,8 @@ impl ProduceWorker {
self.base self.base
.set_current_step("Process Finished Productions"); .set_current_step("Process Finished Productions");
// Nur Fehler loggen; keine Debug-Infos
for production in finished_productions { for production in finished_productions {
self.handle_finished_production(&production); self.handle_finished_production(&production);
} }
@@ -287,21 +175,26 @@ impl ProduceWorker {
} }
}; };
// Ruhemodus: keine Info-Logs, nur Fehler
for stock in stocks { for stock in stocks {
if remaining_quantity <= 0 { if remaining_quantity <= 0 {
break; break;
} }
let free_capacity = stock.total_capacity - stock.filled; let free_capacity = stock.total_capacity - stock.filled;
// keine Debug-Ausgabe
if free_capacity <= 0 { if free_capacity <= 0 {
continue; continue;
} }
let to_store = min(remaining_quantity, free_capacity); let to_store = min(remaining_quantity, free_capacity);
// keine Debug-Ausgabe
if !self.store_in_stock(stock.stock_id, product_id, to_store, quality) { if !self.store_in_stock(stock.stock_id, product_id, to_store, quality) {
return false; return false;
} }
remaining_quantity -= to_store; remaining_quantity -= to_store;
// keine Debug-Ausgabe
} }
if remaining_quantity == 0 { if remaining_quantity == 0 {
@@ -330,11 +223,24 @@ impl ProduceWorker {
quantity: i32, quantity: i32,
quality: i32, quality: i32,
) -> bool { ) -> bool {
if let Err(err) = self.insert_inventory(stock_id, product_id, quantity, quality) { // Versuch: vorhandenen Inventory-Posten für (stock, product) erhöhen
eprintln!("[ProduceWorker] Fehler in storeInStock: {err}"); match self.update_inventory_by_stock_product(stock_id, product_id, quantity, quality) {
return false; Ok(updated) => {
if updated {
return true;
}
// Wenn kein Update stattfand, Insert versuchen
if let Err(err) = self.insert_inventory(stock_id, product_id, quantity, quality) {
eprintln!("[ProduceWorker] Fehler beim Insert in storeInStock: {err}");
return false;
}
true
}
Err(err) => {
eprintln!("[ProduceWorker] Fehler beim Update in storeInStock: {err}");
false
}
} }
true
} }
fn delete_production(&self, production_id: i32) { fn delete_production(&self, production_id: i32) {
@@ -420,10 +326,32 @@ impl ProduceWorker {
.map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("insert_inventory", QUERY_INSERT_INVENTORY)?; conn.prepare("insert_inventory", QUERY_INSERT_INVENTORY)?;
conn.execute("insert_inventory", &[&stock_id, &product_id, &quantity, &quality])?; let _rows = conn.execute("insert_inventory", &[&stock_id, &product_id, &quantity, &quality])?;
Ok(()) Ok(())
} }
fn update_inventory_by_stock_product(
&self,
stock_id: i32,
product_id: i32,
quantity: i32,
quality: i32,
) -> Result<bool, crate::db::DbError> {
use crate::worker::sql::QUERY_UPDATE_INVENTORY_BY_STOCK_PRODUCT;
let mut conn = self
.base
.pool
.get()
.map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("update_inventory_by_stock_product", QUERY_UPDATE_INVENTORY_BY_STOCK_PRODUCT)?;
let rows = conn.execute(
"update_inventory_by_stock_product",
&[&stock_id, &product_id, &quantity, &quality],
)?;
Ok(!rows.is_empty())
}
fn remove_production(&self, production_id: i32) -> Result<(), crate::db::DbError> { fn remove_production(&self, production_id: i32) -> Result<(), crate::db::DbError> {
let mut conn = self let mut conn = self
.base .base

1684
src/worker/sql.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -8,49 +8,17 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_TOWNS,
QUERY_INSERT_STOCK,
QUERY_CLEANUP_STOCK,
QUERY_GET_REGION_USERS,
};
pub struct StockageManager { pub struct StockageManager {
base: BaseWorker, base: BaseWorker,
} }
// SQL-Queries analog zu `stockagemanager.h`
const QUERY_GET_TOWNS: &str = r#"
SELECT fdr.id
FROM falukant_data.region fdr
JOIN falukant_type.region ftr
ON ftr.id = fdr.region_type_id
WHERE ftr.label_tr = 'city';
"#;
const QUERY_INSERT_STOCK: &str = r#"
INSERT INTO falukant_data.buyable_stock (region_id, stock_type_id, quantity)
SELECT
$1 AS region_id,
s.id AS stock_type_id,
GREATEST(1, ROUND(RANDOM() * 5 * COUNT(br.id))) AS quantity
FROM falukant_data.branch AS br
CROSS JOIN falukant_type.stock AS s
WHERE br.region_id = $1
GROUP BY s.id
ORDER BY RANDOM()
LIMIT GREATEST(
ROUND(RANDOM() * (SELECT COUNT(id) FROM falukant_type.stock)),
1
);
"#;
const QUERY_CLEANUP_STOCK: &str = r#"
DELETE FROM falukant_data.buyable_stock
WHERE quantity <= 0;
"#;
const QUERY_GET_REGION_USERS: &str = r#"
SELECT c.user_id
FROM falukant_data.character c
WHERE c.region_id = $1
AND c.user_id IS NOT NULL;
"#;
impl StockageManager { impl StockageManager {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self { Self {

View File

@@ -2,11 +2,19 @@ use crate::db::{ConnectionPool, DbError};
use crate::message_broker::MessageBroker; use crate::message_broker::MessageBroker;
use std::cmp::min; use std::cmp::min;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_ARRIVED_TRANSPORTS,
QUERY_GET_AVAILABLE_STOCKS,
QUERY_INSERT_INVENTORY,
QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT,
QUERY_DELETE_TRANSPORT,
QUERY_GET_BRANCH_REGION,
QUERY_UPDATE_TRANSPORT_SIZE,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct ArrivedTransport { struct ArrivedTransport {
id: i32, id: i32,
@@ -26,98 +34,6 @@ struct StockInfo {
filled: i32, filled: i32,
} }
// Ermittelt alle Transporte, die gemäß Distanz und Fahrzeuggeschwindigkeit bereits
// angekommen sein sollten. Die Reisezeit wird hier vereinfacht als
// travel_minutes = distance / speed
// interpretiert, d.h. `speed` gibt die Einheiten "Distanz pro Minute" an.
const QUERY_GET_ARRIVED_TRANSPORTS: &str = r#"
SELECT
t.id,
t.product_id,
t.size,
t.vehicle_id,
t.source_region_id,
t.target_region_id,
b_target.id AS target_branch_id,
b_source.id AS source_branch_id,
rd.distance AS distance,
v.falukant_user_id AS user_id
FROM falukant_data.transport AS t
JOIN falukant_data.vehicle AS v
ON v.id = t.vehicle_id
JOIN falukant_type.vehicle AS vt
ON vt.id = v.vehicle_type_id
JOIN falukant_data.region_distance AS rd
ON ((rd.source_region_id = t.source_region_id
AND rd.target_region_id = t.target_region_id)
OR (rd.source_region_id = t.target_region_id
AND rd.target_region_id = t.source_region_id))
AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
LEFT JOIN falukant_data.branch AS b_target
ON b_target.region_id = t.target_region_id
AND b_target.falukant_user_id = v.falukant_user_id
LEFT JOIN falukant_data.branch AS b_source
ON b_source.region_id = t.source_region_id
AND b_source.falukant_user_id = v.falukant_user_id
WHERE vt.speed > 0
AND t.created_at
+ (rd.distance / vt.speed::double precision) * INTERVAL '1 minute'
<= NOW();
"#;
// Verfügbare Lagerplätze in einem Branch, analog zur Logik im ProduceWorker.
const QUERY_GET_AVAILABLE_STOCKS: &str = r#"
SELECT
stock.id,
stock.quantity AS total_capacity,
(
SELECT COALESCE(SUM(inventory.quantity), 0)
FROM falukant_data.inventory
WHERE inventory.stock_id = stock.id
) AS filled
FROM falukant_data.stock stock
JOIN falukant_data.branch branch
ON stock.branch_id = branch.id
WHERE branch.id = $1
ORDER BY total_capacity DESC;
"#;
const QUERY_INSERT_INVENTORY: &str = r#"
INSERT INTO falukant_data.inventory (
stock_id,
product_id,
quantity,
quality,
produced_at
) VALUES ($1, $2, $3, $4, NOW());
"#;
const QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT: &str = r#"
UPDATE falukant_data.vehicle
SET region_id = $2,
condition = GREATEST(0, condition - $3::int),
available_from = NOW(),
updated_at = NOW()
WHERE id = $1;
"#;
const QUERY_DELETE_TRANSPORT: &str = r#"
DELETE FROM falukant_data.transport
WHERE id = $1;
"#;
/// Notification-Eintrag, analog zur Overproduction-Notification im ProduceWorker.
/// `tr` wird hier als JSON-String mit Übersetzungs-Key und Werten gespeichert.
const QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION: &str = r#"
INSERT INTO falukant_log.notification (
user_id,
tr,
shown,
created_at,
updated_at
) VALUES ($1, $2, FALSE, NOW(), NOW());
"#;
pub struct TransportWorker { pub struct TransportWorker {
base: BaseWorker, base: BaseWorker,
} }
@@ -135,8 +51,8 @@ impl TransportWorker {
eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}"); eprintln!("[TransportWorker] Fehler in process_arrived_transports: {err}");
} }
// Einmal pro Sekunde prüfen // Minütlich prüfen (nicht sekündlich pollen)
for _ in 0..1 { for _ in 0..60 {
if !state.running_worker.load(Ordering::Relaxed) { if !state.running_worker.load(Ordering::Relaxed) {
break; break;
} }
@@ -151,11 +67,18 @@ impl TransportWorker {
) -> Result<(), DbError> { ) -> Result<(), DbError> {
let transports = Self::load_arrived_transports(pool)?; let transports = Self::load_arrived_transports(pool)?;
if !transports.is_empty() {
eprintln!(
"[TransportWorker] {} angekommene Transport(e) gefunden",
transports.len()
);
}
for t in transports { for t in transports {
if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) { if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) {
eprintln!( eprintln!(
"[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}", "[TransportWorker] Fehler beim Verarbeiten von Transport {} (vehicle_id={}, product_id={:?}, size={}): {}",
t.id t.id, t.vehicle_id, t.product_id, t.size, err
); );
} }
} }
@@ -171,6 +94,19 @@ impl TransportWorker {
conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?; conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?;
let rows = conn.execute("get_arrived_transports", &[])?; let rows = conn.execute("get_arrived_transports", &[])?;
if rows.is_empty() {
// Nur alle 60 Sekunden loggen, um Log-Flut zu vermeiden
static LAST_LOG: Mutex<Option<Instant>> = Mutex::new(None);
let mut last_log = LAST_LOG.lock().unwrap();
let should_log = last_log
.map(|t| t.elapsed().as_secs() >= 60)
.unwrap_or(true);
if should_log {
eprintln!("[TransportWorker] Keine angekommenen Transporte gefunden");
*last_log = Some(Instant::now());
}
}
let mut result = Vec::with_capacity(rows.len()); let mut result = Vec::with_capacity(rows.len());
for row in rows { for row in rows {
let id = parse_i32(&row, "id", -1); let id = parse_i32(&row, "id", -1);
@@ -224,10 +160,7 @@ impl TransportWorker {
// Leere Transporte (ohne Produkt) werden anders behandelt // Leere Transporte (ohne Produkt) werden anders behandelt
if t.product_id.is_none() { if t.product_id.is_none() {
// Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen // Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen
eprintln!( // keine Info-Logs
"[TransportWorker] Leerer Transport {} angekommen: Fahrzeug {} zurückgeholt",
t.id, t.vehicle_id
);
Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?; Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?;
Self::delete_transport(pool, t.id)?; Self::delete_transport(pool, t.id)?;
@@ -310,21 +243,16 @@ impl TransportWorker {
); );
broker.publish(target_inventory_message); broker.publish(target_inventory_message);
} }
} else {
// Nichts konnte eingelagert werden - Transport bleibt unverändert
// Logge dies, damit wir sehen, dass der Transport wartet
eprintln!(
"[TransportWorker] Transport {} wartet: Kein Lagerplatz verfügbar (branch_id={}, product_id={}, size={})",
t.id, t.target_branch_id, product_id, t.size
);
} }
// Nutzer informieren, dass Ware noch im Transportmittel liegt. // Keine Notification für wartende Transporte, um Notification-System zu entlasten.
if t.user_id > 0 {
if let Err(err) = Self::insert_transport_waiting_notification(
pool,
t.user_id,
product_id,
remaining_quantity,
) {
eprintln!(
"[TransportWorker] Fehler beim Schreiben der Transport-Waiting-Notification: {err}"
);
}
}
} }
Ok(()) Ok(())
@@ -420,19 +348,12 @@ impl TransportWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Region des Branches abrufen // Region des Branches abrufen
const QUERY_GET_BRANCH_REGION: &str = r#" conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
SELECT region_id
FROM falukant_data.branch
WHERE id = $1
LIMIT 1;
"#;
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
let rows = conn.execute("get_branch_region", &[&target_branch_id])?; let rows = conn.execute("get_branch_region", &[&target_branch_id])?;
let region_id = rows let region_id = rows
.get(0) .first()
.and_then(|r| r.get("region_id")) .and_then(|r| r.get("region_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1); .unwrap_or(-1);
@@ -479,46 +400,12 @@ impl TransportWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
const QUERY_UPDATE_TRANSPORT_SIZE: &str = r#" conn.prepare("update_transport_size", QUERY_UPDATE_TRANSPORT_SIZE)?;
UPDATE falukant_data.transport
SET size = $2,
updated_at = NOW()
WHERE id = $1;
"#;
conn.prepare("update_transport_size", QUERY_UPDATE_TRANSPORT_SIZE)?;
conn.execute("update_transport_size", &[&transport_id, &new_size])?; conn.execute("update_transport_size", &[&transport_id, &new_size])?;
Ok(()) Ok(())
} }
fn insert_transport_waiting_notification(
pool: &ConnectionPool,
user_id: i32,
product_id: i32,
remaining_quantity: i32,
) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare(
"add_transport_waiting_notification",
QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION,
)?;
let notification = format!(
r#"{{"tr":"transport.waiting","productId":{},"value":{}}}"#,
product_id, remaining_quantity
);
conn.execute(
"add_transport_waiting_notification",
&[&user_id, &notification],
)?;
Ok(())
}
} }
impl Worker for TransportWorker { impl Worker for TransportWorker {

View File

@@ -11,6 +11,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::QUERY_UPDATE_MONEY;
pub struct UndergroundWorker { pub struct UndergroundWorker {
base: BaseWorker, base: BaseWorker,
@@ -118,10 +119,7 @@ const Q_SELECT_FALUKANT_USER: &str = r#"
LIMIT 1; LIMIT 1;
"#; "#;
// Query für Geldänderungen (lokale Variante von BaseWorker::change_falukant_user_money) // Use centralized QUERY_UPDATE_MONEY from src/worker/sql.rs
const QUERY_UPDATE_MONEY: &str = r#"
SELECT falukant_data.update_money($1, $2, $3);
"#;
impl UndergroundWorker { impl UndergroundWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -192,7 +190,7 @@ impl UndergroundWorker {
let task_type = r.get("underground_type").cloned().unwrap_or_default(); let task_type = r.get("underground_type").cloned().unwrap_or_default();
let params = r.get("parameters").cloned().unwrap_or_else(|| "{}".into()); let params = r.get("parameters").cloned().unwrap_or_else(|| "{}".into());
Ok(Self::handle_task(pool, &task_type, performer_id, victim_id, &params)?) Self::handle_task(pool, &task_type, performer_id, victim_id, &params)
} }
fn handle_task( fn handle_task(
@@ -350,7 +348,7 @@ impl UndergroundWorker {
let rows = conn.execute("ug_select_char_user", &[&character_id])?; let rows = conn.execute("ug_select_char_user", &[&character_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("user_id")) .and_then(|r| r.get("user_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1)) .unwrap_or(-1))
@@ -515,10 +513,10 @@ impl UndergroundWorker {
let mut out = Vec::new(); let mut out = Vec::new();
for r in rows { for r in rows {
if let Some(t) = r.get("stock_type_id").and_then(|v| v.parse::<i32>().ok()) { if let Some(t) = r.get("stock_type_id").and_then(|v| v.parse::<i32>().ok())
if allowed.contains(&t) { && allowed.contains(&t)
out.push(r.clone()); {
} out.push(r.clone());
} }
} }
out out

View File

@@ -8,6 +8,41 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_USERS_TO_UPDATE,
QUERY_UPDATE_CHARACTERS_HEALTH,
QUERY_UPDATE_MOOD,
QUERY_UPDATE_GET_ITEMS_TO_UPDATE,
QUERY_UPDATE_GET_CHARACTER_IDS,
QUERY_UPDATE_KNOWLEDGE,
QUERY_DELETE_LOG_ENTRY,
QUERY_GET_OPEN_CREDITS,
QUERY_UPDATE_CREDIT,
QUERY_CLEANUP_CREDITS,
QUERY_ADD_CHARACTER_TO_DEBTORS_PRISM,
QUERY_GET_CURRENT_MONEY,
QUERY_GET_HOUSE_VALUE,
QUERY_GET_SETTLEMENT_VALUE,
QUERY_GET_INVENTORY_VALUE,
QUERY_GET_CREDIT_DEBT,
QUERY_COUNT_CHILDREN,
QUERY_GET_HEIR,
QUERY_RANDOM_HEIR,
QUERY_SET_CHARACTER_USER,
QUERY_UPDATE_USER_MONEY,
QUERY_GET_FALUKANT_USER_ID,
QUERY_AUTOBATISM,
QUERY_GET_PREGNANCY_CANDIDATES,
QUERY_INSERT_CHILD,
QUERY_INSERT_CHILD_RELATION,
QUERY_DELETE_DIRECTOR,
QUERY_DELETE_RELATIONSHIP,
QUERY_DELETE_CHILD_RELATION,
QUERY_DELETE_KNOWLEDGE,
QUERY_DELETE_DEBTORS_PRISM,
QUERY_DELETE_POLITICAL_OFFICE,
QUERY_DELETE_ELECTION_CANDIDATE,
};
/// Vereinfachtes Abbild eines Characters aus `QUERY_GET_USERS_TO_UPDATE`. /// Vereinfachtes Abbild eines Characters aus `QUERY_GET_USERS_TO_UPDATE`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -26,418 +61,7 @@ pub struct UserCharacterWorker {
last_mood_run: Option<Instant>, last_mood_run: Option<Instant>,
} }
// SQL-Queries (1:1 aus der C++-Implementierung übernommen, gruppiert nach Themen) // SQL moved to `src/worker/sql.rs`
const QUERY_GET_USERS_TO_UPDATE: &str = r#"
SELECT id, CURRENT_DATE - birthdate::date AS age, health
FROM falukant_data."character"
WHERE user_id IS NOT NULL;
"#;
const QUERY_UPDATE_CHARACTERS_HEALTH: &str = r#"
UPDATE falukant_data."character"
SET health = $1
WHERE id = $2;
"#;
// Mood-Update mit zufälliger Auswahl pro Charakter:
// Jeder lebende Charakter hat pro Aufruf (ca. 1x pro Minute)
// eine kleine Chance auf Mood-Wechsel. Die Bedingung `random() < 1.0 / 50.0`
// ergibt im Erwartungswert ca. alle 50 Minuten einen Wechsel, verteilt
// individuell und zufällig.
const QUERY_UPDATE_MOOD: &str = r#"
UPDATE falukant_data."character" AS c
SET mood_id = falukant_data.get_random_mood_id()
WHERE c.health > 0
AND random() < (1.0 / 50.0);
"#;
const QUERY_UPDATE_GET_ITEMS_TO_UPDATE: &str = r#"
SELECT id, product_id, producer_id, quantity
FROM falukant_log.production p
WHERE p.production_timestamp::date < current_date;
"#;
const QUERY_UPDATE_GET_CHARACTER_IDS: &str = r#"
SELECT fu.id AS user_id,
c.id AS character_id,
c2.id AS director_id
FROM falukant_data.falukant_user fu
JOIN falukant_data.character c
ON c.user_id = fu.id
LEFT JOIN falukant_data.director d
ON d.employer_user_id = fu.id
LEFT JOIN falukant_data.character c2
ON c2.id = d.director_character_id
WHERE fu.id = $1;
"#;
const QUERY_UPDATE_KNOWLEDGE: &str = r#"
UPDATE falukant_data.knowledge
SET knowledge = LEAST(knowledge + $3, 100)
WHERE character_id = $1
AND product_id = $2;
"#;
const QUERY_DELETE_LOG_ENTRY: &str = r#"
DELETE FROM falukant_log.production
WHERE id = $1;
"#;
// Kredit- und Vermögens-Queries
const QUERY_GET_OPEN_CREDITS: &str = r#"
SELECT
c.id AS credit_id,
c.amount,
c.remaining_amount,
c.interest_rate,
fu.id AS user_id,
fu.money,
c2.id AS character_id,
dp.created_at AS debitor_prism_start,
dp.created_at::date < current_date AS prism_started_previously
FROM falukant_data.credit c
JOIN falukant_data.falukant_user fu
ON fu.id = c.id
JOIN falukant_data.character c2
ON c2.user_id = c.falukant_user_id
LEFT JOIN falukant_data.debtors_prism dp
ON dp.character_id = c2.id
WHERE c.remaining_amount > 0
AND c.updated_at::date < current_date;
"#;
const QUERY_UPDATE_CREDIT: &str = r#"
UPDATE falukant_data.credit c
SET remaining_amount = $1
WHERE falukant_user_id = $2;
"#;
const QUERY_CLEANUP_CREDITS: &str = r#"
DELETE FROM falukant_data.credit
WHERE remaining_amount >= 0.01;
"#;
const QUERY_ADD_CHARACTER_TO_DEBTORS_PRISM: &str = r#"
INSERT INTO falukant_data.debtors_prism (character_id)
VALUES ($1);
"#;
const QUERY_GET_CURRENT_MONEY: &str = r#"
SELECT COALESCE(money, 0) AS sum
FROM falukant_data.falukant_user
WHERE user_id = $1;
"#;
const QUERY_HOUSE_VALUE: &str = r#"
SELECT COALESCE(SUM(h.cost), 0) AS sum
FROM falukant_data.user_house AS uh
JOIN falukant_type.house AS h
ON uh.house_type_id = h.id
WHERE uh.user_id = $1;
"#;
const QUERY_SETTLEMENT_VALUE: &str = r#"
SELECT COALESCE(SUM(b.base_cost), 0) AS sum
FROM falukant_data.branch AS br
JOIN falukant_type.branch AS b
ON br.branch_type_id = b.id
WHERE br.falukant_user_id = $1;
"#;
const QUERY_INVENTORY_VALUE: &str = r#"
SELECT COALESCE(SUM(i.quantity * p.sell_cost), 0) AS sum
FROM falukant_data.inventory AS i
JOIN falukant_type.product AS p
ON i.product_id = p.id
JOIN falukant_data.branch AS br
ON i.stock_id = br.id
WHERE br.falukant_user_id = $1;
"#;
const QUERY_CREDIT_DEBT: &str = r#"
SELECT COALESCE(SUM(remaining_amount), 0) AS sum
FROM falukant_data.credit
WHERE falukant_user_id = $1;
"#;
const QUERY_COUNT_CHILDREN: &str = r#"
SELECT COUNT(*) AS cnt
FROM falukant_data.child_relation
WHERE father_character_id = $1
OR mother_character_id = $1;
"#;
// Vererbungs-Queries
const QUERY_GET_HEIR: &str = r#"
SELECT child_character_id
FROM falukant_data.child_relation
WHERE father_character_id = $1
OR mother_character_id = $1
ORDER BY (is_heir IS TRUE) DESC,
updated_at DESC
LIMIT 1;
"#;
const QUERY_RANDOM_HEIR: &str = r#"
WITH chosen AS (
SELECT
cr.id AS relation_id,
cr.child_character_id
FROM
falukant_data.child_relation AS cr
JOIN
falukant_data.character AS ch
ON ch.id = cr.child_character_id
WHERE
(cr.father_character_id = $1 OR cr.mother_character_id = $1)
AND ch.region_id = (
SELECT region_id
FROM falukant_data.character
WHERE id = $1
)
AND ch.birthdate >= NOW() - INTERVAL '10 days'
AND ch.title_of_nobility = (
SELECT id
FROM falukant_type.title
WHERE label_tr = 'noncivil'
)
ORDER BY RANDOM()
LIMIT 1
)
UPDATE
falukant_data.child_relation AS cr2
SET
is_heir = TRUE,
updated_at = NOW()
FROM
chosen
WHERE
cr2.id = chosen.relation_id
RETURNING
chosen.child_character_id;
"#;
const QUERY_SET_CHARACTER_USER: &str = r#"
UPDATE falukant_data.character
SET user_id = $1,
updated_at = NOW()
WHERE id = $2;
"#;
const QUERY_UPDATE_USER_MONEY: &str = r#"
UPDATE falukant_data.falukant_user
SET money = $1,
updated_at = NOW()
WHERE user_id = $2;
"#;
const QUERY_GET_FALUKANT_USER_ID: &str = r#"
SELECT user_id
FROM falukant_data.character
WHERE id = $1
LIMIT 1;
"#;
// Schwangerschafts-Queries
const QUERY_AUTOBATISM: &str = r#"
UPDATE falukant_data.child_relation
SET name_set = TRUE
WHERE id IN (
SELECT cr.id
FROM falukant_data.child_relation cr
JOIN falukant_data.character c
ON c.id = cr.child_character_id
WHERE cr.name_set = FALSE
AND c.birthdate < current_date - INTERVAL '5 days'
);
"#;
const QUERY_GET_PREGNANCY_CANDIDATES: &str = r#"
SELECT
r.character1_id AS father_cid,
r.character2_id AS mother_cid,
c1.title_of_nobility,
c1.last_name,
c1.region_id,
fu1.id AS father_uid,
fu2.id AS mother_uid,
((CURRENT_DATE - c1.birthdate::date)
+ (CURRENT_DATE - c2.birthdate::date)) / 2 AS avg_age_days,
100.0 /
(1 + EXP(
0.0647 * (
((CURRENT_DATE - c1.birthdate::date)
+ (CURRENT_DATE - c2.birthdate::date)) / 2
) - 0.0591
)) AS prob_pct
FROM falukant_data.relationship r
JOIN falukant_type.relationship r2
ON r2.id = r.relationship_type_id
AND r2.tr = 'married'
JOIN falukant_data.character c1
ON c1.id = r.character1_id
JOIN falukant_data.character c2
ON c2.id = r.character2_id
LEFT JOIN falukant_data.falukant_user fu1
ON fu1.id = c1.user_id
LEFT JOIN falukant_data.falukant_user fu2
ON fu2.id = c2.user_id
WHERE random() * 100 < (
100.0 /
(1 + EXP(
0.11166347 * (
((CURRENT_DATE - c1.birthdate::date)
+ (CURRENT_DATE - c2.birthdate::date)) / 2
) - 2.638267
))
) / 2;
-- Hinweis: Der Divisor `/ 2` halbiert die Wahrscheinlichkeit und ist Teil der
-- ursprünglichen Formel. Wurde vorübergehend entfernt, um die Geburtenrate zu erhöhen,
-- wurde aber wiederhergestellt, um die mathematische Korrektheit der Formel zu gewährleisten.
-- Um die Geburtenrate anzupassen, sollte stattdessen die Formel selbst angepasst werden.
"#;
const QUERY_INSERT_CHILD: &str = r#"
INSERT INTO falukant_data.character (
user_id,
region_id,
first_name,
last_name,
birthdate,
gender,
title_of_nobility,
mood_id,
created_at,
updated_at
) VALUES (
NULL,
$1::int,
(
SELECT id
FROM falukant_predefine.firstname
WHERE gender = $2
ORDER BY RANDOM()
LIMIT 1
),
$3::int,
NOW(),
$2::varchar,
$4::int,
(
SELECT id
FROM falukant_type.mood
ORDER BY RANDOM()
LIMIT 1
),
NOW(),
NOW()
)
RETURNING id AS child_cid;
"#;
const QUERY_INSERT_CHILD_RELATION: &str = r#"
INSERT INTO falukant_data.child_relation (
father_character_id,
mother_character_id,
child_character_id,
name_set,
created_at,
updated_at
)
VALUES (
$1::int,
$2::int,
$3::int,
FALSE,
NOW(), NOW()
);
"#;
// Aufräum-Queries beim Tod eines Characters
const QUERY_DELETE_DIRECTOR: &str = r#"
DELETE FROM falukant_data.director
WHERE director_character_id = $1;
"#;
const QUERY_DELETE_RELATIONSHIP: &str = r#"
DELETE FROM falukant_data.relationship
WHERE character1_id = $1
OR character2_id = $1;
"#;
const QUERY_DELETE_CHILD_RELATION: &str = r#"
DELETE FROM falukant_data.child_relation
WHERE child_character_id = $1
OR father_character_id = $1
OR mother_character_id = $1;
"#;
const QUERY_DELETE_KNOWLEDGE: &str = r#"
DELETE FROM falukant_data.knowledge
WHERE character_id = $1;
"#;
const QUERY_DELETE_DEBTORS_PRISM: &str = r#"
DELETE FROM falukant_data.debtors_prism
WHERE character_id = $1;
"#;
/// Löscht alle Ämter eines Charakters und stellt anschließend sicher, dass
/// für die betroffenen Amtstyp/Region-Kombinationen nicht mehr Ämter
/// besetzt sind als durch `seats_per_region` vorgesehen.
///
/// Die überzähligen Ämter werden deterministisch nach `created_at DESC`
/// gekappt, d.h. neuere Amtsinhaber bleiben bevorzugt im Amt.
const QUERY_DELETE_POLITICAL_OFFICE: &str = r#"
WITH removed AS (
DELETE FROM falukant_data.political_office
WHERE character_id = $1
RETURNING office_type_id, region_id
),
affected AS (
SELECT DISTINCT office_type_id, region_id
FROM removed
),
seats AS (
SELECT
pot.id AS office_type_id,
rt.id AS region_id,
pot.seats_per_region AS seats_total
FROM falukant_type.political_office_type AS pot
JOIN falukant_type.region AS rt
ON pot.region_type = rt.label_tr
JOIN affected AS a
ON a.office_type_id = pot.id
AND a.region_id = rt.id
),
ranked AS (
SELECT
po.id,
po.office_type_id,
po.region_id,
s.seats_total,
ROW_NUMBER() OVER (
PARTITION BY po.office_type_id, po.region_id
ORDER BY po.created_at DESC
) AS rn
FROM falukant_data.political_office AS po
JOIN seats AS s
ON s.office_type_id = po.office_type_id
AND s.region_id = po.region_id
),
to_delete AS (
SELECT id
FROM ranked
WHERE rn > seats_total
)
DELETE FROM falukant_data.political_office
WHERE id IN (SELECT id FROM to_delete);
"#;
const QUERY_DELETE_ELECTION_CANDIDATE: &str = r#"
DELETE FROM falukant_data.election_candidate
WHERE character_id = $1;
"#;
impl UserCharacterWorker { impl UserCharacterWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -470,7 +94,7 @@ impl UserCharacterWorker {
} }
if !state.running_worker.load(Ordering::Relaxed) { if !state.running_worker.load(Ordering::Relaxed) {
return; // worker stopping
} }
} }
@@ -893,8 +517,8 @@ impl UserCharacterWorker {
let inserted = let inserted =
conn.execute("insert_child", &[&region_id, &gender, &last_name, &title_of_nobility])?; conn.execute("insert_child", &[&region_id, &gender, &last_name, &title_of_nobility])?;
let child_cid = inserted let child_cid = inserted
.get(0) .first()
.and_then(|r| r.get("child_cid")) .and_then(|r| r.get("child_cid"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1); .unwrap_or(-1);
@@ -1002,7 +626,7 @@ impl UserCharacterWorker {
let rows = conn.execute("get_falukant_user_id", &[&character_id])?; let rows = conn.execute("get_falukant_user_id", &[&character_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("user_id")) .and_then(|r| r.get("user_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1)) .unwrap_or(-1))
@@ -1019,7 +643,7 @@ impl UserCharacterWorker {
let rows = conn.execute("get_heir", &[&deceased_character_id])?; let rows = conn.execute("get_heir", &[&deceased_character_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("child_character_id")) .and_then(|r| r.get("child_character_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1)) .unwrap_or(-1))
@@ -1036,7 +660,7 @@ impl UserCharacterWorker {
let rows = conn.execute("random_heir", &[&deceased_character_id])?; let rows = conn.execute("random_heir", &[&deceased_character_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("child_character_id")) .and_then(|r| r.get("child_character_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1)) .unwrap_or(-1))
@@ -1084,7 +708,7 @@ impl UserCharacterWorker {
let rows = conn.execute("get_current_money", &[&falukant_user_id])?; let rows = conn.execute("get_current_money", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1097,11 +721,11 @@ impl UserCharacterWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("house_value", QUERY_HOUSE_VALUE)?; conn.prepare("house_value", QUERY_GET_HOUSE_VALUE)?;
let rows = conn.execute("house_value", &[&falukant_user_id])?; let rows = conn.execute("house_value", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1114,11 +738,11 @@ impl UserCharacterWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("settlement_value", QUERY_SETTLEMENT_VALUE)?; conn.prepare("settlement_value", QUERY_GET_SETTLEMENT_VALUE)?;
let rows = conn.execute("settlement_value", &[&falukant_user_id])?; let rows = conn.execute("settlement_value", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1131,11 +755,11 @@ impl UserCharacterWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("inventory_value", QUERY_INVENTORY_VALUE)?; conn.prepare("inventory_value", QUERY_GET_INVENTORY_VALUE)?;
let rows = conn.execute("inventory_value", &[&falukant_user_id])?; let rows = conn.execute("inventory_value", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1148,11 +772,11 @@ impl UserCharacterWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("credit_debt", QUERY_CREDIT_DEBT)?; conn.prepare("credit_debt", QUERY_GET_CREDIT_DEBT)?;
let rows = conn.execute("credit_debt", &[&falukant_user_id])?; let rows = conn.execute("credit_debt", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1169,7 +793,7 @@ impl UserCharacterWorker {
let rows = conn.execute("count_children", &[&deceased_user_id])?; let rows = conn.execute("count_children", &[&deceased_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("cnt")) .and_then(|r| r.get("cnt"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0)) .unwrap_or(0))

View File

@@ -6,291 +6,27 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER,
QUERY_DELETE_OLD_PRODUCTIONS,
QUERY_GET_PRODUCERS_LAST_DAY,
QUERY_UPDATE_REGION_SELL_PRICE,
QUERY_DELETE_REGION_SELL_PRICE,
QUERY_GET_SELL_REGIONS,
QUERY_HOURLY_PRICE_RECALCULATION,
QUERY_SET_MARRIAGES_BY_PARTY,
QUERY_GET_STUDYINGS_TO_EXECUTE,
QUERY_GET_OWN_CHARACTER_ID,
QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE,
QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE,
QUERY_SET_LEARNING_DONE,
};
pub struct ValueRecalculationWorker { pub struct ValueRecalculationWorker {
base: BaseWorker, base: BaseWorker,
} }
// Produktwissen / Produktions-Logs
const QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER: &str = r#"
UPDATE falukant_data.knowledge k
SET knowledge = LEAST(100, k.knowledge + 1)
FROM falukant_data.character c
JOIN falukant_log.production p
ON DATE(p.production_timestamp) = CURRENT_DATE - INTERVAL '1 day'
WHERE c.id = k.character_id
AND c.user_id = 18
AND k.product_id = 10;
"#;
const QUERY_DELETE_OLD_PRODUCTIONS: &str = r#"
DELETE FROM falukant_log.production flp
WHERE DATE(flp.production_timestamp) < CURRENT_DATE;
"#;
const QUERY_GET_PRODUCERS_LAST_DAY: &str = r#"
SELECT p.producer_id
FROM falukant_log.production p
WHERE DATE(p.production_timestamp) = CURRENT_DATE - INTERVAL '1 day'
GROUP BY producer_id;
"#;
// Regionale Verkaufspreise
const QUERY_UPDATE_REGION_SELL_PRICE: &str = r#"
UPDATE falukant_data.town_product_worth tpw
SET worth_percent =
GREATEST(
0,
LEAST(
CASE
WHEN s.quantity > avg_sells THEN tpw.worth_percent - 1
WHEN s.quantity < avg_sells THEN tpw.worth_percent + 1
ELSE tpw.worth_percent
END,
100
)
)
FROM (
SELECT region_id,
product_id,
quantity,
(SELECT AVG(quantity)
FROM falukant_log.sell avs
WHERE avs.product_id = s.product_id) AS avg_sells
FROM falukant_log.sell s
WHERE DATE(s.sell_timestamp) = CURRENT_DATE - INTERVAL '1 day'
) s
WHERE tpw.region_id = s.region_id
AND tpw.product_id = s.product_id;
"#;
const QUERY_DELETE_REGION_SELL_PRICE: &str = r#"
DELETE FROM falukant_log.sell s
WHERE DATE(s.sell_timestamp) < CURRENT_DATE;
"#;
const QUERY_GET_SELL_REGIONS: &str = r#"
SELECT s.region_id
FROM falukant_log.sell s
WHERE DATE(s.sell_timestamp) = CURRENT_DATE - INTERVAL '1 day'
GROUP BY region_id;
"#;
// Stündliche Preisneuberechnung basierend auf Verkäufen der letzten Stunde
// Zwei Ebenen der Preisberechnung:
// 1. Weltweit: Vergleich Stadt-Verkäufe vs. weltweiter Durchschnitt
// - ±5% Toleranz: Preis bleibt gleich
// - Mehr Verkäufe (>5% über Durchschnitt): Preis +10%
// - Weniger Verkäufe (<5% unter Durchschnitt): Preis -10%
// 2. Parent-Region: Vergleich Stadt-Verkäufe vs. Durchschnitt der parent-region
// - ±5% Toleranz: Preis bleibt gleich
// - Abweichung >±5%: Preis ±5%
const QUERY_HOURLY_PRICE_RECALCULATION: &str = r#"
WITH city_sales AS (
SELECT
s.region_id,
s.product_id,
SUM(s.quantity) AS total_sold
FROM falukant_log.sell s
WHERE s.sell_timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY s.region_id, s.product_id
),
world_avg_sales AS (
SELECT
product_id,
AVG(total_sold) AS avg_sold
FROM city_sales
GROUP BY product_id
),
parent_region_sales AS (
SELECT
r.parent_region_id,
cs.product_id,
AVG(cs.total_sold) AS avg_sold
FROM city_sales cs
JOIN falukant_data.region r ON r.id = cs.region_id
WHERE r.parent_region_id IS NOT NULL
GROUP BY r.parent_region_id, cs.product_id
),
price_updates_world AS (
SELECT
cs.region_id,
cs.product_id,
cs.total_sold,
COALESCE(wa.avg_sold, 0) AS world_avg,
tpw.worth_percent AS current_price,
CASE
-- Mehr als 5% über dem weltweiten Durchschnitt: 10% teurer
WHEN cs.total_sold > COALESCE(wa.avg_sold, 0) * 1.05
THEN tpw.worth_percent * 1.1
-- Weniger als 5% unter dem weltweiten Durchschnitt: 10% billiger
WHEN cs.total_sold < COALESCE(wa.avg_sold, 0) * 0.95
THEN tpw.worth_percent * 0.9
-- Innerhalb ±5%: Preis bleibt gleich
ELSE tpw.worth_percent
END AS price_after_world
FROM city_sales cs
JOIN world_avg_sales wa ON wa.product_id = cs.product_id
JOIN falukant_data.town_product_worth tpw
ON tpw.region_id = cs.region_id
AND tpw.product_id = cs.product_id
-- Nur updaten wenn es eine Änderung gibt (außerhalb der ±5% Toleranz)
WHERE cs.total_sold > COALESCE(wa.avg_sold, 0) * 1.05
OR cs.total_sold < COALESCE(wa.avg_sold, 0) * 0.95
),
all_cities_with_prices AS (
SELECT
cs.region_id,
cs.product_id,
cs.total_sold,
r.parent_region_id,
tpw.worth_percent AS original_price,
COALESCE(puw.price_after_world, tpw.worth_percent) AS price_after_world
FROM city_sales cs
JOIN falukant_data.region r ON r.id = cs.region_id
JOIN falukant_data.town_product_worth tpw
ON tpw.region_id = cs.region_id
AND tpw.product_id = cs.product_id
LEFT JOIN price_updates_world puw
ON puw.region_id = cs.region_id
AND puw.product_id = cs.product_id
),
price_updates_parent AS (
SELECT
acwp.region_id,
acwp.product_id,
acwp.total_sold,
acwp.parent_region_id,
COALESCE(prs.avg_sold, 0) AS parent_avg,
acwp.price_after_world AS current_price,
CASE
-- Mehr als 5% über dem parent-region Durchschnitt: 5% teurer
WHEN acwp.total_sold > COALESCE(prs.avg_sold, 0) * 1.05
THEN acwp.price_after_world * 1.05
-- Weniger als 5% unter dem parent-region Durchschnitt: 5% billiger
WHEN acwp.total_sold < COALESCE(prs.avg_sold, 0) * 0.95
THEN acwp.price_after_world * 0.95
-- Innerhalb ±5%: Preis bleibt gleich (vom world-update)
ELSE acwp.price_after_world
END AS new_price
FROM all_cities_with_prices acwp
LEFT JOIN parent_region_sales prs
ON prs.parent_region_id = acwp.parent_region_id
AND prs.product_id = acwp.product_id
WHERE acwp.parent_region_id IS NOT NULL
AND (
acwp.total_sold > COALESCE(prs.avg_sold, 0) * 1.05
OR acwp.total_sold < COALESCE(prs.avg_sold, 0) * 0.95
)
),
final_price_updates AS (
SELECT
COALESCE(pup.region_id, puw.region_id) AS region_id,
COALESCE(pup.product_id, puw.product_id) AS product_id,
COALESCE(pup.new_price, puw.price_after_world, acwp.original_price) AS final_price
FROM all_cities_with_prices acwp
LEFT JOIN price_updates_world puw
ON puw.region_id = acwp.region_id
AND puw.product_id = acwp.product_id
LEFT JOIN price_updates_parent pup
ON pup.region_id = acwp.region_id
AND pup.product_id = acwp.product_id
WHERE puw.region_id IS NOT NULL
OR pup.region_id IS NOT NULL
)
UPDATE falukant_data.town_product_worth tpw
SET worth_percent = GREATEST(
0,
LEAST(
100,
fpu.final_price
)
)
FROM final_price_updates fpu
WHERE tpw.region_id = fpu.region_id
AND tpw.product_id = fpu.product_id;
"#;
// Ehen / Beziehungen
const QUERY_SET_MARRIAGES_BY_PARTY: &str = r#"
WITH updated_relations AS (
UPDATE falukant_data.relationship AS rel
SET relationship_type_id = (
SELECT id
FROM falukant_type.relationship AS rt
WHERE rt.tr = 'married'
)
WHERE rel.id IN (
SELECT rel2.id
FROM falukant_data.party AS p
JOIN falukant_type.party AS pt
ON pt.id = p.party_type_id
AND pt.tr = 'wedding'
JOIN falukant_data.falukant_user AS fu
ON fu.id = p.falukant_user_id
JOIN falukant_data.character AS c
ON c.user_id = fu.id
JOIN falukant_data.relationship AS rel2
ON rel2.character1_id = c.id
OR rel2.character2_id = c.id
JOIN falukant_type.relationship AS rt2
ON rt2.id = rel2.relationship_type_id
AND rt2.tr = 'engaged'
WHERE p.created_at <= NOW() - INTERVAL '1 day'
)
RETURNING character1_id, character2_id
)
SELECT
c1.user_id AS character1_user,
c2.user_id AS character2_user
FROM updated_relations AS ur
JOIN falukant_data.character AS c1
ON c1.id = ur.character1_id
JOIN falukant_data.character AS c2
ON c2.id = ur.character2_id;
"#;
// Lernen / Studium
const QUERY_GET_STUDYINGS_TO_EXECUTE: &str = r#"
SELECT
l.id,
l.associated_falukant_user_id,
l.associated_learning_character_id,
l.learn_all_products,
l.learning_recipient_id,
l.product_id,
lr.tr
FROM falukant_data.learning l
JOIN falukant_type.learn_recipient lr
ON lr.id = l.learning_recipient_id
WHERE l.learning_is_executed = FALSE
AND l.created_at + INTERVAL '1 day' < NOW();
"#;
const QUERY_GET_OWN_CHARACTER_ID: &str = r#"
SELECT id
FROM falukant_data.character c
WHERE c.user_id = $1;
"#;
const QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE: &str = r#"
UPDATE falukant_data.knowledge k
SET knowledge = LEAST(100, k.knowledge + $1)
WHERE k.character_id = $2
AND k.product_id = $3;
"#;
const QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE: &str = r#"
UPDATE falukant_data.knowledge k
SET knowledge = LEAST(100, k.knowledge + $1)
WHERE k.character_id = $2;
"#;
const QUERY_SET_LEARNING_DONE: &str = r#"
UPDATE falukant_data.learning
SET learning_is_executed = TRUE
WHERE id = $1;
"#;
impl ValueRecalculationWorker { impl ValueRecalculationWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -601,7 +337,7 @@ impl ValueRecalculationWorker {
let rows = conn.execute("get_own_character_id", &[&falukant_user_id])?; let rows = conn.execute("get_own_character_id", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("id")) .and_then(|r| r.get("id"))
.and_then(|v| v.parse::<i32>().ok())) .and_then(|v| v.parse::<i32>().ok()))
} }

View File

@@ -5,40 +5,12 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::QUERY_UPDATE_WEATHER;
pub struct WeatherWorker { pub struct WeatherWorker {
base: BaseWorker, base: BaseWorker,
} }
// Query zum Aktualisieren des Wetters für alle Regionen
// Wählt für jede Region ein zufälliges Wetter aus allen verfügbaren Wettertypen aus
// Wichtig: Jede Region bekommt ein individuelles, zufälliges Wetter. Die vorherige
// Variante konnte vom Planner unter Umständen die Zufalls-Subquery nur einmal
// auswerten; mit LATERAL wird die Zufallsauswahl pro Region garantiert ausgeführt.
const QUERY_UPDATE_WEATHER: &str = r#"
WITH all_regions AS (
SELECT DISTINCT r.id AS region_id
FROM falukant_data.region r
JOIN falukant_type.region tr ON r.region_type_id = tr.id
WHERE tr.label_tr = 'city'
),
random_weather AS (
SELECT ar.region_id, wt.id AS weather_type_id
FROM all_regions ar
CROSS JOIN LATERAL (
SELECT wt.id
FROM falukant_type.weather wt
ORDER BY RANDOM()
LIMIT 1
) wt
)
INSERT INTO falukant_data.weather (region_id, weather_type_id)
SELECT rw.region_id, rw.weather_type_id
FROM random_weather rw
ON CONFLICT (region_id)
DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id;
"#;
impl WeatherWorker { impl WeatherWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self { Self {