10 Commits

Author SHA1 Message Date
Torsten Schulz (local)
f7710b64c9 Refactor WeatherWorker SQL query: Replace the existing weather update logic with a new query that assigns random weather types to each region, ensuring individual updates. Clean up logging messages for clarity. 2025-12-18 15:34:16 +01:00
Torsten Schulz (local)
6d12c70d84 Entferne Info-Logs aus den Worker-Klassen: Reduziere die Protokollierung in ProduceWorker, DirectorWorker und TransportWorker, um die Ausgabe zu optimieren und nur Fehler zu protokollieren. 2025-12-16 12:58:37 +01:00
Torsten Schulz (local)
d2e253b79a Füge erweiterte SQL-Abfragen und Logging in ProduceWorker hinzu: Implementiere Update- und Insert-Logik für das Inventar und verbessere die Fehlerausgaben. 2025-12-16 10:52:06 +01:00
Torsten Schulz (local)
74fee2d4c9 Verbessere Fehlerbehandlung in SQL-Abfragen: Füge detaillierte Fehlermeldungen für Vorbereitungs- und Ausführungsfehler in den Director- und Politics-Workern hinzu. 2025-12-16 08:52:08 +01:00
Torsten Schulz (local)
b45990c1b6 Aktualisiere Benachrichtigungsabfragen: Ersetze character_id durch user_id in den Benachrichtigungs-INSERT-Abfragen und optimiere die Abfragen zur Auswahl des Benutzers. 2025-12-16 08:24:46 +01:00
Torsten Schulz (local)
ae90166adb Aktualisiere Benachrichtigungsabfragen: Ersetze user_id durch character_id und optimiere die Abfragen zur Auswahl des Charakters. 2025-12-15 21:20:30 +01:00
Torsten Schulz (local)
2aa4e7c666 Füge VSCode-Build-Tasks hinzu: Erstelle Aufgaben für den Build-Prozess und Clippy-Überprüfungen. 2025-12-13 23:40:20 +01:00
Torsten Schulz (local)
ce06b1a4f0 Verbessere SQL-Abfragen im Produktionsworker: Optimiere die Abfrage für abgeschlossene Produktionen und verbessere die Lesbarkeit der Preisberechnung. 2025-12-13 12:17:00 +01:00
Torsten Schulz (local)
10bc1e5a52 Refactor SQL queries into a dedicated module
- Moved SQL queries from multiple worker files into `src/worker/sql.rs` for better organization and maintainability.
- Updated references in `stockage_manager.rs`, `transport.rs`, `underground.rs`, `user_character.rs`, and `value_recalculation.rs` to use the new centralized SQL queries.
- Improved code readability by replacing `.get(0)` with `.first()` for better clarity when retrieving the first row from query results.
- Cleaned up unnecessary comments and consolidated related SQL queries.
2025-12-13 11:57:28 +01:00
Torsten Schulz (local)
a9d490ce38 Refactor SQL queries into centralized module
- Moved various SQL query strings from individual worker files into a new `sql.rs` module for better organization and reusability.
- Updated `events.rs`, `underground.rs`, and `weather.rs` to use the centralized SQL queries.
- Removed redundant query definitions from `events.rs`, `underground.rs`, and `weather.rs`.
2025-12-12 15:18:30 +01:00
18 changed files with 2397 additions and 2903 deletions

27
.vscode/tasks.json vendored Normal file
View File

@@ -0,0 +1,27 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "Build and clippy",
"type": "shell",
"command": "cargo build && cargo clippy",
"group": "build"
},
{
"label": "cargo build",
"type": "shell",
"command": "cargo build",
"group": "build"
},
{
"label": "Build YpDaemon",
"type": "shell",
"command": "cargo build",
"isBackground": false,
"problemMatcher": [
"$rustc"
],
"group": "build"
}
]
}

View File

@@ -92,8 +92,32 @@ impl Database {
.get(name) .get(name)
.ok_or_else(|| DbError::new(format!("Unbekanntes Statement: {name}")))?; .ok_or_else(|| DbError::new(format!("Unbekanntes Statement: {name}")))?;
let rows = self.client.query(sql.as_str(), params)?; match self.client.query(sql.as_str(), params) {
Ok(rows.into_iter().map(Self::row_to_map).collect()) Ok(rows) => Ok(rows.into_iter().map(Self::row_to_map).collect()),
Err(err) => {
if let Some(db_err) = err.as_db_error() {
let code = db_err.code().code().to_string();
let message = db_err.message();
let detail = db_err.detail().unwrap_or_default();
let hint = db_err.hint().unwrap_or_default();
// SQL ggf. kürzen, um Log-Flut zu vermeiden
let mut sql_preview = sql.clone();
const MAX_SQL_PREVIEW: usize = 800;
if sql_preview.len() > MAX_SQL_PREVIEW {
sql_preview.truncate(MAX_SQL_PREVIEW);
sql_preview.push_str("");
}
Err(DbError::new(format!(
"Postgres-Fehler bei Statement '{name}': {} (SQLSTATE: {}, Detail: {}, Hint: {}) | SQL: {}",
message, code, detail, hint, sql_preview
)))
} else {
Err(DbError::new(format!(
"Postgres-Fehler (Client) bei Statement '{name}': {err}"
)))
}
}
}
} }
fn row_to_map(row: postgres::Row) -> Row { fn row_to_map(row: postgres::Row) -> Row {

View File

@@ -86,15 +86,7 @@ async fn append_ws_log(
.map(|s| s.to_string()); .map(|s| s.to_string());
let target_user = json let target_user = json
.get("user_id") .get("user_id")
.and_then(|v| { .and_then(|v| v.as_str().map(|s| s.to_string()).or_else(|| v.as_i64().map(|n| n.to_string())));
if let Some(s) = v.as_str() {
Some(s.to_string())
} else if let Some(n) = v.as_i64() {
Some(n.to_string())
} else {
None
}
});
(event, target_user) (event, target_user)
} else { } else {
(None, None) (None, None)
@@ -510,10 +502,8 @@ async fn handle_connection<S>(
.and_then(|v| { .and_then(|v| {
if let Some(s) = v.as_str() { if let Some(s) = v.as_str() {
s.parse::<i64>().ok() s.parse::<i64>().ok()
} else if let Some(n) = v.as_i64() {
Some(n)
} else { } else {
None v.as_i64()
} }
}) })
.map(|v| v == numeric_uid) .map(|v| v == numeric_uid)

View File

@@ -1,4 +1,5 @@
use crate::db::{ConnectionPool, DbError}; use crate::db::{ConnectionPool, DbError};
use crate::worker::sql::{QUERY_UPDATE_MONEY, QUERY_GET_MONEY};
use crate::message_broker::MessageBroker; use crate::message_broker::MessageBroker;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -96,7 +97,6 @@ impl BaseWorker {
} }
let state = Arc::clone(&self.state); let state = Arc::clone(&self.state);
let name = self.name.clone();
self.watchdog_thread = Some(thread::spawn(move || { self.watchdog_thread = Some(thread::spawn(move || {
while state.running_watchdog.load(Ordering::Relaxed) { while state.running_watchdog.load(Ordering::Relaxed) {
@@ -116,7 +116,7 @@ impl BaseWorker {
// beim Debuggen selten. Deshalb nur loggen, wenn der Worker // beim Debuggen selten. Deshalb nur loggen, wenn der Worker
// sich nicht im Idle-Zustand befindet. // sich nicht im Idle-Zustand befindet.
if !step.ends_with(" idle") { if !step.ends_with(" idle") {
eprintln!("[{name}] Watchdog: current step = {step}"); // keine Info-Logs im Watchdog
} }
} }
})); }));
@@ -147,10 +147,6 @@ impl BaseWorker {
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection // Verwende parametrisierte Queries für Sicherheit gegen SQL-Injection
const QUERY_UPDATE_MONEY: &str = r#"
SELECT falukant_data.update_money($1, $2, $3);
"#;
conn.prepare("update_money", QUERY_UPDATE_MONEY)?; conn.prepare("update_money", QUERY_UPDATE_MONEY)?;
// Validate float to avoid passing NaN/inf which the postgres client // Validate float to avoid passing NaN/inf which the postgres client
@@ -165,14 +161,11 @@ impl BaseWorker {
// We must ensure the resulting money fits in numeric(10,2). // We must ensure the resulting money fits in numeric(10,2).
// numeric(10,2) max absolute value is < 10^8 (100_000_000) before rounding. // numeric(10,2) max absolute value is < 10^8 (100_000_000) before rounding.
// Fetch current money for the user and clamp the delta if needed. // Fetch current money for the user and clamp the delta if needed.
const QUERY_GET_MONEY: &str = r#"
SELECT money FROM falukant_data.falukant_user WHERE id = $1;
"#;
conn.prepare("get_money_for_clamp", QUERY_GET_MONEY)?; conn.prepare("get_money_for_clamp", QUERY_GET_MONEY)?;
let rows = conn.execute("get_money_for_clamp", &[&falukant_user_id])?; let rows = conn.execute("get_money_for_clamp", &[&falukant_user_id])?;
let current_money: f64 = rows let current_money: f64 = rows
.get(0) .first()
.and_then(|r| r.get("money")) .and_then(|r| r.get("money"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0); .unwrap_or(0.0);
@@ -183,8 +176,6 @@ impl BaseWorker {
// numeric(10,2) allows values with absolute < 10^8 (100_000_000) // numeric(10,2) allows values with absolute < 10^8 (100_000_000)
const MAX_ABS: f64 = 100_000_000.0 - 0.01; // leave room for scale const MAX_ABS: f64 = 100_000_000.0 - 0.01; // leave room for scale
let _allowed = MAX_ABS - current_money;
let adjusted_money_change = if tentative >= MAX_ABS { let adjusted_money_change = if tentative >= MAX_ABS {
let clipped = MAX_ABS - current_money; let clipped = MAX_ABS - current_money;
eprintln!( eprintln!(
@@ -203,19 +194,12 @@ impl BaseWorker {
money_change money_change
}; };
// Keep only important clamp logging: when clipping occurs we log it above.
// Send exact types matching the DB function signature: // Send exact types matching the DB function signature:
let uid_i32: i32 = falukant_user_id; let uid_i32: i32 = falukant_user_id;
let money_str = format!("{:.2}", adjusted_money_change); let money_str = format!("{:.2}", adjusted_money_change);
// Note: we intentionally avoid parameterized call due to serialization // Note: we intentionally avoid parameterized call due to serialization
// issues in this environment and instead execute a literal SQL below. // issues in this environment and instead execute a literal SQL below.
// Execute update; avoid noisy logging here.
// Use a literal SQL call because parameterized execution keeps failing
// with "error serializing parameter 1" in this environment.
fn escape_sql_literal(s: &str) -> String { fn escape_sql_literal(s: &str) -> String {
s.replace('\'', "''") s.replace('\'', "''")
} }

View File

@@ -10,6 +10,19 @@ use std::thread;
use std::time::Duration; use std::time::Duration;
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED,
QUERY_GET_TOWN_REGION_IDS,
QUERY_LOAD_FIRST_NAMES,
QUERY_LOAD_LAST_NAMES,
QUERY_INSERT_CHARACTER,
QUERY_GET_ELIGIBLE_NPC_FOR_DEATH,
QUERY_DELETE_DIRECTOR,
QUERY_DELETE_RELATIONSHIP,
QUERY_DELETE_CHILD_RELATION,
QUERY_INSERT_NOTIFICATION,
QUERY_MARK_CHARACTER_DECEASED,
};
pub struct CharacterCreationWorker { pub struct CharacterCreationWorker {
pub(crate) base: BaseWorker, pub(crate) base: BaseWorker,
@@ -21,145 +34,7 @@ pub struct CharacterCreationWorker {
death_thread: Option<thread::JoinHandle<()>>, death_thread: Option<thread::JoinHandle<()>>,
} }
// SQL-Queries analog zur C++-Implementierung
const QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED: &str = r#"
SELECT created_at
FROM falukant_data."character"
WHERE user_id IS NULL
AND created_at::date = CURRENT_DATE
ORDER BY created_at DESC
LIMIT 1;
"#;
const QUERY_GET_TOWN_REGION_IDS: &str = r#"
SELECT fdr.id
FROM falukant_data.region fdr
JOIN falukant_type.region ftr ON fdr.region_type_id = ftr.id
WHERE ftr.label_tr = 'city';
"#;
const QUERY_LOAD_FIRST_NAMES: &str = r#"
SELECT id, gender
FROM falukant_predefine.firstname;
"#;
const QUERY_LOAD_LAST_NAMES: &str = r#"
SELECT id
FROM falukant_predefine.lastname;
"#;
const QUERY_INSERT_CHARACTER: &str = r#"
INSERT INTO falukant_data.character(
user_id,
region_id,
first_name,
last_name,
birthdate,
gender,
created_at,
updated_at,
title_of_nobility
) VALUES (
NULL,
$1,
$2,
$3,
NOW(),
$4,
NOW(),
NOW(),
$5
);
"#;
const QUERY_GET_ELIGIBLE_NPC_FOR_DEATH: &str = r#"
WITH aged AS (
SELECT
c.id,
(current_date - c.birthdate::date) AS age,
c.user_id
FROM
falukant_data.character c
WHERE
c.user_id IS NULL
AND (current_date - c.birthdate::date) > 60
),
always_sel AS (
SELECT *
FROM aged
WHERE age > 85
),
random_sel AS (
SELECT *
FROM aged
WHERE age <= 85
ORDER BY random()
LIMIT 10
)
SELECT *
FROM always_sel
UNION ALL
SELECT *
FROM random_sel;
"#;
const QUERY_DELETE_DIRECTOR: &str = r#"
DELETE FROM falukant_data.director
WHERE director_character_id = $1
RETURNING employer_user_id;
"#;
const QUERY_DELETE_RELATIONSHIP: &str = r#"
WITH deleted AS (
DELETE FROM falukant_data.relationship
WHERE character1_id = $1
OR character2_id = $1
RETURNING
CASE
WHEN character1_id = $1 THEN character2_id
ELSE character1_id
END AS related_character_id,
relationship_type_id
)
SELECT
c.user_id AS related_user_id
FROM deleted d
JOIN falukant_data.character c
ON c.id = d.related_character_id;
"#;
const QUERY_DELETE_CHILD_RELATION: &str = r#"
WITH deleted AS (
DELETE FROM falukant_data.child_relation
WHERE child_character_id = $1
RETURNING
father_character_id,
mother_character_id
)
SELECT
cf.user_id AS father_user_id,
cm.user_id AS mother_user_id
FROM deleted d
JOIN falukant_data.character cf
ON cf.id = d.father_character_id
JOIN falukant_data.character cm
ON cm.id = d.mother_character_id;
"#;
const QUERY_INSERT_NOTIFICATION: &str = r#"
INSERT INTO falukant_log.notification (
user_id,
tr,
shown,
created_at,
updated_at
) VALUES ($1, 'director_death', FALSE, NOW(), NOW());
"#;
const QUERY_MARK_CHARACTER_DECEASED: &str = r#"
DELETE FROM falukant_data.character
WHERE id = $1;
"#;
impl CharacterCreationWorker { impl CharacterCreationWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -329,12 +204,12 @@ impl CharacterCreationWorker {
} }
fn load_names(&mut self) { fn load_names(&mut self) {
if self.first_name_cache.is_empty() || self.last_name_cache.is_empty() { if (self.first_name_cache.is_empty() || self.last_name_cache.is_empty())
if let Err(err) = self.load_first_and_last_names() { && let Err(err) = self.load_first_and_last_names()
{
eprintln!("[CharacterCreationWorker] Fehler in loadNames: {err}"); eprintln!("[CharacterCreationWorker] Fehler in loadNames: {err}");
} }
} }
}
fn load_first_and_last_names(&mut self) -> Result<(), crate::db::DbError> { fn load_first_and_last_names(&mut self) -> Result<(), crate::db::DbError> {
let mut conn = self let mut conn = self
@@ -491,14 +366,14 @@ impl CharacterCreationWorker {
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0); .unwrap_or(0);
if character_id > 0 && Self::calculate_death_probability(age) { if character_id > 0 && Self::calculate_death_probability(age)
if let Err(err) = Self::handle_character_death(pool, broker, character_id) { && let Err(err) = Self::handle_character_death(pool, broker, character_id)
{
eprintln!( eprintln!(
"[CharacterCreationWorker] Fehler beim Bearbeiten des NPC-Todes (id={character_id}): {err}" "[CharacterCreationWorker] Fehler beim Bearbeiten des NPC-Todes (id={character_id}): {err}"
); );
} }
} }
}
Ok(()) Ok(())
} }
@@ -531,14 +406,13 @@ impl CharacterCreationWorker {
// 1) Director löschen und User benachrichtigen // 1) Director löschen und User benachrichtigen
conn.prepare("delete_director", QUERY_DELETE_DIRECTOR)?; conn.prepare("delete_director", QUERY_DELETE_DIRECTOR)?;
let dir_result = conn.execute("delete_director", &[&character_id])?; let dir_result = conn.execute("delete_director", &[&character_id])?;
if let Some(row) = dir_result.get(0) { if let Some(row) = dir_result.first()
if let Some(user_id) = row && let Some(user_id) = row
.get("employer_user_id") .get("employer_user_id")
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
{ {
Self::notify_user(pool, broker, user_id, "director_death")?; Self::notify_user(pool, broker, user_id, "director_death")?;
} }
}
// 2) Relationships löschen und betroffene User benachrichtigen // 2) Relationships löschen und betroffene User benachrichtigen
conn.prepare("delete_relationship", QUERY_DELETE_RELATIONSHIP)?; conn.prepare("delete_relationship", QUERY_DELETE_RELATIONSHIP)?;

View File

@@ -7,6 +7,35 @@ use std::time::{Duration, Instant};
use crate::db::ConnectionPool; use crate::db::ConnectionPool;
use super::base::{BaseWorker, Worker, WorkerState, DEFAULT_TAX_PERCENT, DEFAULT_TREASURY_USER_ID}; use super::base::{BaseWorker, Worker, WorkerState, DEFAULT_TAX_PERCENT, DEFAULT_TREASURY_USER_ID};
use crate::worker::sql::{
QUERY_GET_DIRECTORS,
QUERY_GET_BEST_PRODUCTION,
QUERY_INSERT_PRODUCTION,
QUERY_GET_BRANCH_CAPACITY,
QUERY_GET_INVENTORY,
QUERY_REMOVE_INVENTORY,
QUERY_ADD_SELL_LOG,
QUERY_GET_REGION_WORTH_FOR_PRODUCT,
QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE,
QUERY_INSERT_TRANSPORT,
QUERY_INSERT_EMPTY_TRANSPORT,
QUERY_GET_USER_BRANCHES,
QUERY_GET_FREE_VEHICLES_IN_REGION,
QUERY_GET_SALARY_TO_PAY,
QUERY_SET_SALARY_PAYED,
QUERY_UPDATE_SATISFACTION,
QUERY_GET_DIRECTOR_USER,
QUERY_COUNT_VEHICLES_IN_BRANCH_REGION,
QUERY_COUNT_VEHICLES_IN_REGION,
QUERY_CHECK_ROUTE,
QUERY_GET_BRANCH_REGION,
QUERY_GET_AVERAGE_WORTH,
QUERY_UPDATE_INVENTORY_QTY,
QUERY_GET_PRODUCT_COST,
QUERY_GET_USER_OFFICES,
QUERY_CUMULATIVE_TAX_NO_EXEMPT,
QUERY_CUMULATIVE_TAX_WITH_EXEMPT,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Director { struct Director {
@@ -65,279 +94,10 @@ pub struct DirectorWorker {
// Maximale Anzahl paralleler Produktionen pro Branch // Maximale Anzahl paralleler Produktionen pro Branch
const MAX_PARALLEL_PRODUCTIONS: i32 = 2; const MAX_PARALLEL_PRODUCTIONS: i32 = 2;
// SQL-Queries (1:1 aus director_worker.h) // ...existing code...
const QUERY_GET_DIRECTORS: &str = r#"
SELECT
d.may_produce,
d.may_sell,
d.may_start_transport,
b.id AS branch_id,
fu.id AS falukantUserId,
d.id
FROM falukant_data.director d
JOIN falukant_data.falukant_user fu
ON fu.id = d.employer_user_id
JOIN falukant_data.character c
ON c.id = d.director_character_id
JOIN falukant_data.branch b
ON b.region_id = c.region_id
AND b.falukant_user_id = fu.id
WHERE current_time BETWEEN '08:00:00' AND '17:00:00';
"#;
const QUERY_GET_BEST_PRODUCTION: &str = r#"
SELECT
fdu.id falukant_user_id,
-- Geld explizit als Text casten, damit das Mapping im Rust-Code
-- zuverlässig funktioniert (unabhängig vom nativen DB-Typ wie `money`).
CAST(fdu.money AS text) AS money,
fdu.certificate,
ftp.id product_id,
ftp.label_tr,
fdb.region_id,
(
SELECT SUM(quantity)
FROM falukant_data.stock fds
WHERE fds.branch_id = fdb.id
) AS stock_size,
COALESCE((
SELECT SUM(COALESCE(fdi.quantity, 0))
FROM falukant_data.stock fds
JOIN falukant_data.inventory fdi
ON fdi.stock_id = fds.id
WHERE fds.branch_id = fdb.id
), 0) AS used_in_stock,
(ftp.sell_cost * (fdtpw.worth_percent + (fdk_character.knowledge * 2 + fdk_director.knowledge) / 3) / 100 - 6 * ftp.category)
/ (300.0 * ftp.production_time) AS worth,
fdb.id AS branch_id,
(
SELECT COUNT(id)
FROM falukant_data.production
WHERE branch_id = fdb.id
) AS running_productions,
COALESCE((
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
FROM falukant_data.production fdp
WHERE fdp.branch_id = fdb.id
), 0) AS running_productions_quantity
FROM falukant_data.director fdd
JOIN falukant_data.character fdc
ON fdc.id = fdd.director_character_id
JOIN falukant_data.falukant_user fdu
ON fdd.employer_user_id = fdu.id
JOIN falukant_data.character user_character
ON user_character.user_id = fdu.id
JOIN falukant_data.branch fdb
ON fdb.falukant_user_id = fdu.id
AND fdb.region_id = fdc.region_id
JOIN falukant_data.town_product_worth fdtpw
ON fdtpw.region_id = fdb.region_id
JOIN falukant_data.knowledge fdk_character
ON fdk_character.product_id = fdtpw.product_id
AND fdk_character.character_id = user_character.id
JOIN falukant_data.knowledge fdk_director
ON fdk_director.product_id = fdtpw.product_id
AND fdk_director.character_id = fdd.director_character_id
JOIN falukant_type.product ftp
ON ftp.id = fdtpw.product_id
AND ftp.category <= fdu.certificate
WHERE fdd.id = $1
AND fdb.id = $2
ORDER BY worth DESC
LIMIT 1;
"#;
const QUERY_INSERT_PRODUCTION: &str = r#"
INSERT INTO falukant_data.production (branch_id, product_id, quantity, weather_type_id)
VALUES ($1, $2, $3, (
SELECT weather_type_id
FROM falukant_data.weather
WHERE region_id = $4
));
"#;
// Query zum Abfragen der aktuellen Lager- und Produktionswerte für einen Branch
// (ohne den kompletten Produktionsplan neu zu berechnen)
const QUERY_GET_BRANCH_CAPACITY: &str = r#"
SELECT
(
SELECT SUM(quantity)
FROM falukant_data.stock fds
WHERE fds.branch_id = $1
) AS stock_size,
COALESCE((
SELECT SUM(COALESCE(fdi.quantity, 0))
FROM falukant_data.stock fds
JOIN falukant_data.inventory fdi
ON fdi.stock_id = fds.id
WHERE fds.branch_id = $1
), 0) AS used_in_stock,
(
SELECT COUNT(id)
FROM falukant_data.production
WHERE branch_id = $1
) AS running_productions,
COALESCE((
SELECT SUM(COALESCE(fdp.quantity, 0)) quantity
FROM falukant_data.production fdp
WHERE fdp.branch_id = $1
), 0) AS running_productions_quantity;
"#;
const QUERY_GET_INVENTORY: &str = r#"
SELECT
i.id,
i.product_id,
i.quantity,
i.quality,
p.sell_cost,
fu.id AS user_id,
b.region_id,
b.id AS branch_id,
COALESCE(tpw.worth_percent, 100.0) AS worth_percent
FROM falukant_data.inventory i
JOIN falukant_data.stock s
ON s.id = i.stock_id
JOIN falukant_data.branch b
ON b.id = s.branch_id
JOIN falukant_data.falukant_user fu
ON fu.id = b.falukant_user_id
JOIN falukant_data.director d
ON d.employer_user_id = fu.id
JOIN falukant_type.product p
ON p.id = i.product_id
LEFT JOIN falukant_data.town_product_worth tpw
ON tpw.region_id = b.region_id
AND tpw.product_id = i.product_id
WHERE d.id = $1
AND b.id = $2;
"#;
const QUERY_REMOVE_INVENTORY: &str = r#"
DELETE FROM falukant_data.inventory
WHERE id = $1;
"#;
const QUERY_ADD_SELL_LOG: &str = r#"
INSERT INTO falukant_log.sell (region_id, product_id, quantity, seller_id)
VALUES ($1, $2, $3, $4)
ON CONFLICT (region_id, product_id, seller_id)
DO UPDATE
SET quantity = falukant_log.sell.quantity + EXCLUDED.quantity;
"#;
// Regionale Verkaufswürdigkeit pro Produkt/Region für alle Branches eines Users
const QUERY_GET_REGION_WORTH_FOR_PRODUCT: &str = r#"
SELECT
tpw.region_id,
tpw.product_id,
tpw.worth_percent
FROM falukant_data.town_product_worth tpw
JOIN falukant_data.branch b
ON b.region_id = tpw.region_id
WHERE b.falukant_user_id = $1
AND tpw.product_id = $2;
"#;
// Verfügbare Transportmittel für eine Route (source_region -> target_region) // Verfügbare Transportmittel für eine Route (source_region -> target_region)
const QUERY_GET_TRANSPORT_VEHICLES_FOR_ROUTE: &str = r#" // ...existing code...
SELECT
v.id AS vehicle_id,
vt.capacity AS capacity
FROM falukant_data.vehicle v
JOIN falukant_type.vehicle vt
ON vt.id = v.vehicle_type_id
JOIN falukant_data.region_distance rd
ON (
(rd.source_region_id = v.region_id AND rd.target_region_id = $3)
OR (rd.source_region_id = $3 AND rd.target_region_id = v.region_id)
)
AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
WHERE v.falukant_user_id = $1
AND v.region_id = $2;
"#;
// Transport-Eintrag anlegen
const QUERY_INSERT_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport
(source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW());
"#;
// Leere Transporte (product_id = NULL, size = 0) zum Zurückholen von Fahrzeugen
const QUERY_INSERT_EMPTY_TRANSPORT: &str = r#"
INSERT INTO falukant_data.transport
(source_region_id, target_region_id, product_id, size, vehicle_id, created_at, updated_at)
VALUES ($1, $2, NULL, 0, $3, NOW(), NOW());
"#;
// Alle Branches des Users mit ihren Regionen
const QUERY_GET_USER_BRANCHES: &str = r#"
SELECT DISTINCT
b.region_id,
b.id AS branch_id
FROM falukant_data.branch b
WHERE b.falukant_user_id = $1
AND b.region_id != $2;
"#;
// Freie Transportmittel in einer Region (nicht in aktiven Transporten)
// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert
const QUERY_GET_FREE_VEHICLES_IN_REGION: &str = r#"
SELECT
v.id AS vehicle_id,
vt.capacity AS capacity
FROM falukant_data.vehicle v
JOIN falukant_type.vehicle vt
ON vt.id = v.vehicle_type_id
WHERE v.falukant_user_id = $1
AND v.region_id = $2
AND v.id NOT IN (
SELECT DISTINCT t.vehicle_id
FROM falukant_data.transport t
WHERE t.vehicle_id IS NOT NULL
);
"#;
const QUERY_GET_SALARY_TO_PAY: &str = r#"
SELECT d.id, d.employer_user_id, d.income
FROM falukant_data.director d
WHERE DATE(d.last_salary_payout) < DATE(NOW());
"#;
const QUERY_SET_SALARY_PAYED: &str = r#"
UPDATE falukant_data.director
SET last_salary_payout = NOW()
WHERE id = $1;
"#;
const QUERY_UPDATE_SATISFACTION: &str = r#"
WITH new_sats AS (
SELECT
d.id,
ROUND(
d.income::numeric
/
(
c.title_of_nobility
* POWER(1.231, AVG(k.knowledge) / 1.5)
)
* 100
) AS new_satisfaction
FROM falukant_data.director d
JOIN falukant_data.knowledge k
ON d.director_character_id = k.character_id
JOIN falukant_data.character c
ON c.id = d.director_character_id
GROUP BY d.id, c.title_of_nobility, d.income
)
UPDATE falukant_data.director dir
SET satisfaction = ns.new_satisfaction
FROM new_sats ns
WHERE dir.id = ns.id
AND dir.satisfaction IS DISTINCT FROM ns.new_satisfaction
RETURNING dir.employer_user_id;
"#;
impl DirectorWorker { impl DirectorWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -347,7 +107,7 @@ impl DirectorWorker {
} }
} }
fn run_iteration(&mut self, state: &WorkerState) { fn run_iteration(&mut self, _state: &WorkerState) {
self.base.set_current_step("DirectorWorker iteration"); self.base.set_current_step("DirectorWorker iteration");
let now = Instant::now(); let now = Instant::now();
@@ -364,10 +124,6 @@ impl DirectorWorker {
} }
std::thread::sleep(Duration::from_secs(1)); std::thread::sleep(Duration::from_secs(1));
if !state.running_worker.load(Ordering::Relaxed) {
return;
}
} }
fn perform_all_tasks(&mut self) -> Result<(), DbError> { fn perform_all_tasks(&mut self) -> Result<(), DbError> {
@@ -397,7 +153,7 @@ impl DirectorWorker {
.collect(); .collect();
if directors.is_empty() { if directors.is_empty() {
eprintln!("[DirectorWorker] Keine Direktoren für Aktionen gefunden (Zeitfenster oder DB-Daten)."); // keine Info-Logs
} }
for director in directors { for director in directors {
@@ -466,7 +222,7 @@ impl DirectorWorker {
return Ok(()); return Ok(());
} }
let mut base_plan = match Self::map_row_to_production_plan(&rows[0]) { let mut base_plan = match rows.first().and_then(Self::map_row_to_production_plan) {
Some(p) => p, Some(p) => p,
None => { None => {
eprintln!( eprintln!(
@@ -614,41 +370,11 @@ impl DirectorWorker {
conn: &mut DbConnection, conn: &mut DbConnection,
plan: &ProductionPlan, plan: &ProductionPlan,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
// Freie Lagerkapazität: Gesamtbestand minus bereits belegter Bestand let free_capacity = Self::calc_free_capacity(plan);
// (Inventar) minus bereits eingeplante Produktionsmengen. let one_piece_cost = Self::calc_one_piece_cost(plan);
let free_capacity = let max_money_production = Self::calc_max_money_production(plan, one_piece_cost);
plan.stock_size - plan.used_in_stock - plan.running_productions_quantity;
// Stückkosten monetär berechnen. Da money ein f64 ist, arbeiten wir hier ebenfalls let to_produce = (free_capacity.min(max_money_production)).clamp(0, 100);
// mit Gleitkomma und runden erst am Ende auf eine ganze Stückzahl ab.
let one_piece_cost = (plan.certificate * 6) as f64;
let mut max_money_production: i32 = 0;
if one_piece_cost > 0.0 {
if plan.money > 0.0 {
// Anzahl Stück, die sich mit dem verfügbaren Geld finanzieren lassen
max_money_production = (plan.money / one_piece_cost).floor() as i32;
} else {
// Falls das Geld aus der DB unerwartet als 0 eingelesen wurde, aber
// eigentlich ausreichend Guthaben vorhanden ist (bekannter Migrationsfall),
// lassen wir die Geldbegrenzung vorläufig fallen und begrenzen nur über
// Lagerkapazität und Hard-Limit 100.
eprintln!(
"[DirectorWorker] Warnung: money=0 für falukant_user_id={}, \
verwende nur Lagerkapazität als Limit.",
plan.falukant_user_id
);
max_money_production = i32::MAX;
}
}
// Maximale Produktionsmenge begrenzen:
// - nie mehr als der freie Lagerplatz (`free_capacity`)
// - nie mehr als durch das verfügbare Geld finanzierbar
// - absolut maximal 100 Einheiten pro Produktion
let to_produce = free_capacity
.min(max_money_production)
.min(100)
.max(0);
eprintln!( eprintln!(
"[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}", "[DirectorWorker] Produktionsberechnung: free_capacity={}, one_piece_cost={}, max_money_production={}, to_produce={}, running_productions={}",
@@ -711,6 +437,30 @@ impl DirectorWorker {
Ok(()) Ok(())
} }
fn calc_free_capacity(plan: &ProductionPlan) -> i32 {
plan.stock_size - plan.used_in_stock - plan.running_productions_quantity
}
fn calc_one_piece_cost(plan: &ProductionPlan) -> f64 {
(plan.certificate * 6) as f64
}
fn calc_max_money_production(plan: &ProductionPlan, one_piece_cost: f64) -> i32 {
if one_piece_cost > 0.0 {
if plan.money > 0.0 {
(plan.money / one_piece_cost).floor() as i32
} else {
eprintln!(
"[DirectorWorker] Warnung: money=0 für falukant_user_id={}, verwende nur Lagerkapazität als Limit.",
plan.falukant_user_id
);
i32::MAX
}
} else {
0
}
}
fn start_transports_stub(&mut self, director: &Director) -> Result<(), DbError> { fn start_transports_stub(&mut self, director: &Director) -> Result<(), DbError> {
self.base self.base
.set_current_step("DirectorWorker: start_transports"); .set_current_step("DirectorWorker: start_transports");
@@ -736,11 +486,6 @@ impl DirectorWorker {
// sein (Arbeitgeber des Directors). // sein (Arbeitgeber des Directors).
let falukant_user_id = if items.is_empty() { let falukant_user_id = if items.is_empty() {
// Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln // Wenn keine Items vorhanden sind, müssen wir die user_id anders ermitteln
const QUERY_GET_DIRECTOR_USER: &str = r#"
SELECT employer_user_id
FROM falukant_data.director
WHERE id = $1;
"#;
conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?; conn.prepare("get_director_user", QUERY_GET_DIRECTOR_USER)?;
let user_rows = conn.execute("get_director_user", &[&director.id])?; let user_rows = conn.execute("get_director_user", &[&director.id])?;
user_rows user_rows
@@ -754,20 +499,6 @@ impl DirectorWorker {
// Prüfe, ob Transportmittel im aktuellen Branch vorhanden sind // Prüfe, ob Transportmittel im aktuellen Branch vorhanden sind
// Ein Transport ist aktiv, wenn er noch in der Tabelle existiert // Ein Transport ist aktiv, wenn er noch in der Tabelle existiert
const QUERY_COUNT_VEHICLES_IN_BRANCH_REGION: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.vehicle v
JOIN falukant_data.branch b
ON b.region_id = v.region_id
WHERE v.falukant_user_id = $1
AND b.id = $2
AND v.id NOT IN (
SELECT DISTINCT t.vehicle_id
FROM falukant_data.transport t
WHERE t.vehicle_id IS NOT NULL
);
"#;
conn.prepare("count_vehicles_in_branch", QUERY_COUNT_VEHICLES_IN_BRANCH_REGION)?; conn.prepare("count_vehicles_in_branch", QUERY_COUNT_VEHICLES_IN_BRANCH_REGION)?;
let vehicle_count_rows = conn.execute( let vehicle_count_rows = conn.execute(
"count_vehicles_in_branch", "count_vehicles_in_branch",
@@ -976,45 +707,50 @@ impl DirectorWorker {
}) })
} }
fn sell_single_inventory_item( // Helper: compute piece sell price from item fields
&mut self, fn compute_piece_sell_price(item: &InventoryItem) -> f64 {
conn: &mut DbConnection,
item: &InventoryItem,
) -> Result<(), DbError> {
if item.quantity <= 0 {
conn.execute("remove_inventory", &[&item.id])?;
return Ok(());
}
// Neue Preisberechnung gemäß Spezifikation:
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let base_price = item.sell_cost * (item.worth_percent / 100.0); let base_price = item.sell_cost * (item.worth_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let min_price = base_price * 0.6; let min_price = base_price * 0.6;
let max_price = base_price; let max_price = base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
// knowledgeFactor ist hier item.quality
let knowledge_factor = item.quality as f64; let knowledge_factor = item.quality as f64;
let piece_sell_price = min_price + (max_price - min_price) * (knowledge_factor / 100.0); min_price + (max_price - min_price) * (knowledge_factor / 100.0)
}
let sell_price = piece_sell_price * item.quantity as f64; // Helper: get one_piece_cost from DB row fallback logic
fn resolve_one_piece_cost(conn: &mut DbConnection, product_id: i32, fallback: f64) -> Result<f64, DbError> {
conn.prepare("get_product_cost", QUERY_GET_PRODUCT_COST)?;
let rows = conn.execute("get_product_cost", &[&product_id])?;
if let Some(row) = rows.first()
&& let Some(osc) = row.get("original_sell_cost")
&& let Ok(v) = osc.parse::<f64>()
{
return Ok(v);
}
if let Some(row) = rows.first()
&& let Some(sc) = row.get("sell_cost")
&& let Ok(v) = sc.parse::<f64>()
{
return Ok(v);
}
Ok(fallback)
}
// Steuerberechnung: 1) Region ermitteln, 2) user offices, 3) cumulative tax (mit Befreiungen) // Helper: determine cumulative tax percent for a branch/user
fn get_cumulative_tax_percent(conn: &mut DbConnection, branch_id: i32, user_id: i32) -> Result<f64, DbError> {
// Default
let mut cumulative_tax_percent = DEFAULT_TAX_PERCENT; let mut cumulative_tax_percent = DEFAULT_TAX_PERCENT;
conn.prepare("get_branch_region", "SELECT region_id FROM falukant_data.branch WHERE id = $1;")?; conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)
let branch_rows = conn.execute("get_branch_region", &[&item.branch_id])?; .map_err(|e| DbError::new(format!("[DirectorWorker] prepare get_branch_region: {e}")))?;
let branch_region_id: Option<i32> = branch_rows.get(0).and_then(|r| r.get("region_id")).and_then(|v| v.parse().ok()); let branch_rows = conn.execute("get_branch_region", &[&branch_id])
.map_err(|e| DbError::new(format!("[DirectorWorker] exec get_branch_region branch_id={}: {e}", branch_id)))?;
let branch_region_id: Option<i32> = branch_rows.first().and_then(|r| r.get("region_id")).and_then(|v| v.parse().ok());
if let Some(region_id) = branch_region_id { if let Some(region_id) = branch_region_id {
// user offices conn.prepare("get_user_offices", QUERY_GET_USER_OFFICES)
conn.prepare( .map_err(|e| DbError::new(format!("[DirectorWorker] prepare get_user_offices: {e}")))?;
"get_user_offices", let offices = conn.execute("get_user_offices", &[&user_id])
"SELECT po.id AS office_id, pot.name AS office_name, po.region_id, rt.label_tr AS region_type FROM falukant_data.political_office po JOIN falukant_type.political_office_type pot ON pot.id = po.office_type_id JOIN falukant_data.region r ON r.id = po.region_id JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE po.holder_id = $1 AND (po.end_date IS NULL OR po.end_date > NOW());", .map_err(|e| DbError::new(format!("[DirectorWorker] exec get_user_offices user_id={}: {e}", user_id)))?;
)?;
let offices = conn.execute("get_user_offices", &[&item.user_id])?;
let mut exempt_types: Vec<String> = Vec::new(); let mut exempt_types: Vec<String> = Vec::new();
let mut has_chancellor = false; let mut has_chancellor = false;
@@ -1032,52 +768,51 @@ impl DirectorWorker {
} }
if has_chancellor { if has_chancellor {
cumulative_tax_percent = 0.0; return Ok(0.0);
} else { }
if exempt_types.is_empty() { if exempt_types.is_empty() {
conn.prepare( conn.prepare("cumulative_tax_no_exempt", QUERY_CUMULATIVE_TAX_NO_EXEMPT)
"cumulative_tax_no_exempt", .map_err(|e| DbError::new(format!("[DirectorWorker] prepare cumulative_tax_no_exempt: {e}")))?;
"WITH RECURSIVE ancestors AS (SELECT id, parent_id, COALESCE(tax_percent,0.0) AS tax_percent FROM falukant_data.region WHERE id = $1 UNION ALL SELECT r.id, r.parent_id, COALESCE(r.tax_percent,0.0) FROM falukant_data.region r JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;", let res = conn.execute("cumulative_tax_no_exempt", &[&region_id])
)?; .map_err(|e| DbError::new(format!("[DirectorWorker] exec cumulative_tax_no_exempt region_id={}: {e}", region_id)))?;
let res = conn.execute("cumulative_tax_no_exempt", &[&region_id])?; if let Some(row) = res.first()
if let Some(row) = res.get(0) { && let Some(tp) = row.get("total_percent")
if let Some(tp) = row.get("total_percent") { {
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT); cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
} }
}
} else { } else {
conn.prepare( conn.prepare("cumulative_tax_with_exempt", QUERY_CUMULATIVE_TAX_WITH_EXEMPT)
"cumulative_tax_with_exempt", .map_err(|e| DbError::new(format!("[DirectorWorker] prepare cumulative_tax_with_exempt: {e}")))?;
"WITH RECURSIVE ancestors AS (SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END AS tax_percent FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id WHERE r.id = $1 UNION ALL SELECT r.id, r.parent_id, CASE WHEN rt.label_tr = ANY($2::text[]) THEN 0.0 ELSE COALESCE(r.tax_percent,0.0) END FROM falukant_data.region r JOIN falukant_type.region_type rt ON rt.id = r.region_type_id JOIN ancestors a ON r.id = a.parent_id) SELECT COALESCE(SUM(tax_percent),0.0) AS total_percent FROM ancestors;",
)?;
let exempt_array: Vec<&str> = exempt_types.iter().map(|s| s.as_str()).collect(); let exempt_array: Vec<&str> = exempt_types.iter().map(|s| s.as_str()).collect();
let res = conn.execute("cumulative_tax_with_exempt", &[&region_id, &exempt_array])?; let res = conn.execute("cumulative_tax_with_exempt", &[&region_id, &exempt_array])
if let Some(row) = res.get(0) { .map_err(|e| DbError::new(format!("[DirectorWorker] exec cumulative_tax_with_exempt region_id={} exempt={:?}: {}", region_id, exempt_array, e)))?;
if let Some(tp) = row.get("total_percent") { if let Some(row) = res.first() && let Some(tp) = row.get("total_percent") {
cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT); cumulative_tax_percent = tp.parse::<f64>().unwrap_or(DEFAULT_TAX_PERCENT);
} }
} }
} }
}
Ok(cumulative_tax_percent)
} }
// Produktkosten (original_sell_cost fallback sell_cost) fn sell_single_inventory_item(
conn.prepare("get_product_cost", "SELECT original_sell_cost, sell_cost FROM falukant_type.product WHERE id = $1")?; &mut self,
let cost_rows = conn.execute("get_product_cost", &[&item.product_id])?; conn: &mut DbConnection,
let mut one_piece_cost = item.sell_cost; item: &InventoryItem,
if let Some(row) = cost_rows.get(0) { ) -> Result<(), DbError> {
if let Some(osc) = row.get("original_sell_cost") { if item.quantity <= 0 {
if let Ok(v) = osc.parse::<f64>() { conn.execute("remove_inventory", &[&item.id])?;
one_piece_cost = v; return Ok(());
}
} else if let Some(sc) = row.get("sell_cost") {
if let Ok(v) = sc.parse::<f64>() {
one_piece_cost = v;
}
}
} }
// cents-based arithmetic // compute piece price and full sell price
let piece_price = Self::compute_piece_sell_price(item);
let sell_price = piece_price * item.quantity as f64;
let one_piece_cost = Self::resolve_one_piece_cost(conn, item.product_id, item.sell_cost)?;
let cumulative_tax_percent = Self::get_cumulative_tax_percent(conn, item.branch_id, item.user_id)?;
let revenue_cents = (sell_price * 100.0).round() as i64; let revenue_cents = (sell_price * 100.0).round() as i64;
let cost_cents = (one_piece_cost * item.quantity as f64 * 100.0).round() as i64; let cost_cents = (one_piece_cost * item.quantity as f64 * 100.0).round() as i64;
let profit_cents = (revenue_cents - cost_cents).max(0); let profit_cents = (revenue_cents - cost_cents).max(0);
@@ -1094,11 +829,9 @@ impl DirectorWorker {
} }
let payout_amount = (payout_cents as f64) / 100.0; let payout_amount = (payout_cents as f64) / 100.0;
if payout_cents != 0 { if payout_cents != 0 && let Err(err) = self.base.change_falukant_user_money(item.user_id, payout_amount, "sell products") {
if let Err(err) = self.base.change_falukant_user_money(item.user_id, payout_amount, "sell products") {
eprintln!("[DirectorWorker] Fehler bei change_falukant_user_money (sell products): {err}"); eprintln!("[DirectorWorker] Fehler bei change_falukant_user_money (sell products): {err}");
} }
}
// Debug: Log vor dem DB-Aufruf // Debug: Log vor dem DB-Aufruf
eprintln!( eprintln!(
@@ -1156,66 +889,22 @@ impl DirectorWorker {
return Ok(0); return Ok(0);
} }
// Regionale worth_percent-Werte für dieses Produkt laden // Load worth_percent by region for this product
conn.prepare( let worth_by_region = Self::get_worth_by_region(conn, falukant_user_id, item.product_id)?;
"get_region_worth_for_product",
QUERY_GET_REGION_WORTH_FOR_PRODUCT,
)?;
let rows = conn.execute(
"get_region_worth_for_product",
&[&falukant_user_id, &item.product_id],
)?;
if rows.is_empty() {
return Ok(0);
}
let mut worth_by_region: HashMap<i32, f64> = HashMap::new();
for row in rows {
let region_id = row
.get("region_id")
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1);
let percent = row
.get("worth_percent")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(100.0);
if region_id >= 0 {
worth_by_region.insert(region_id, percent);
}
}
if worth_by_region.is_empty() { if worth_by_region.is_empty() {
eprintln!( eprintln!("[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden", item.product_id);
"[DirectorWorker] Keine worth_percent-Werte für Produkt {} gefunden",
item.product_id
);
return Ok(0); return Ok(0);
} }
eprintln!( eprintln!(
"[DirectorWorker] Gefundene Regionen für Produkt {}: {} Regionen", "[DirectorWorker] Gefundene Regionen für Produkt {}: {} Regionen",
item.product_id, worth_by_region.len() item.product_id,
worth_by_region.len()
); );
// Lokalen Stückpreis berechnen (neue Preisberechnung) // Compute local piece price
let local_percent = worth_by_region let local_percent = worth_by_region.get(&item.region_id).copied().unwrap_or(100.0);
.get(&item.region_id) let local_piece_price = Self::compute_piece_price_for_percent(item, local_percent);
.copied()
.unwrap_or(100.0);
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let local_base_price = item.sell_cost * (local_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let local_min_price = local_base_price * 0.6;
let local_max_price = local_base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
let knowledge_factor = item.quality as f64;
let local_piece_price = local_min_price + (local_max_price - local_min_price) * (knowledge_factor / 100.0);
eprintln!( eprintln!(
"[DirectorWorker] Lokaler Preis für Produkt {}: {:.2} (worth_percent={:.2}, quality={})", "[DirectorWorker] Lokaler Preis für Produkt {}: {:.2} (worth_percent={:.2}, quality={})",
item.product_id, local_piece_price, local_percent, item.quality item.product_id, local_piece_price, local_percent, item.quality
@@ -1232,18 +921,7 @@ impl DirectorWorker {
continue; continue;
} }
// Remote-Stückpreis berechnen (neue Preisberechnung) let remote_piece_price = Self::compute_piece_price_for_percent(item, remote_percent);
// 1. Basispreis = product.sellCost * (worthPercent / 100)
let remote_base_price = item.sell_cost * (remote_percent / 100.0);
// 2. min = basePrice * 0.6, max = basePrice
let remote_min_price = remote_base_price * 0.6;
let remote_max_price = remote_base_price;
// 3. price = min + (max - min) * (knowledgeFactor / 100)
let knowledge_factor = item.quality as f64;
let remote_piece_price = remote_min_price + (remote_max_price - remote_min_price) * (knowledge_factor / 100.0);
let delta_per_unit = remote_piece_price - local_piece_price; let delta_per_unit = remote_piece_price - local_piece_price;
eprintln!( eprintln!(
"[DirectorWorker] Region {}: Preis {:.2}, Delta {:.2}", "[DirectorWorker] Region {}: Preis {:.2}, Delta {:.2}",
@@ -1280,10 +958,7 @@ impl DirectorWorker {
} }
// Maximale transportierbare Menge anhand der Kapazität ermitteln // Maximale transportierbare Menge anhand der Kapazität ermitteln
let mut max_capacity: i32 = 0; let max_capacity = Self::calc_max_capacity(&vehicles);
for v in &vehicles {
max_capacity = max_capacity.saturating_add(v.capacity);
}
if max_capacity <= 0 { if max_capacity <= 0 {
continue; continue;
@@ -1295,15 +970,7 @@ impl DirectorWorker {
} }
let extra_revenue = delta_per_unit * qty as f64; let extra_revenue = delta_per_unit * qty as f64;
let total_value = remote_piece_price * qty as f64; let transport_cost = Self::calc_transport_cost(remote_piece_price, qty);
let transport_cost = total_value * 0.01_f64;
// Kostenformel: max(0.01, totalValue * 0.01)
let transport_cost = if transport_cost < 0.01 {
0.01
} else {
transport_cost
};
let net_gain = extra_revenue - transport_cost; let net_gain = extra_revenue - transport_cost;
eprintln!( eprintln!(
"[DirectorWorker] Region {}: extra_revenue={:.2}, transport_cost={:.2}, net_gain={:.2}, qty={}", "[DirectorWorker] Region {}: extra_revenue={:.2}, transport_cost={:.2}, net_gain={:.2}, qty={}",
@@ -1340,46 +1007,8 @@ impl DirectorWorker {
return Ok(0); return Ok(0);
} }
// Nochmals verfügbare Transportmittel für die gewählte Route laden // Build and insert transports for chosen route
let vehicles = Self::get_transport_vehicles_for_route( let shipped = Self::insert_transports_for_route(conn, item, target_region, best_quantity)?;
conn,
falukant_user_id,
item.region_id,
target_region,
)?;
if vehicles.is_empty() {
return Ok(0);
}
// Transporte anlegen, begrenzt durch best_quantity und Kapazitäten
conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?;
let mut remaining = best_quantity;
for v in &vehicles {
if remaining <= 0 {
break;
}
let size = std::cmp::min(remaining, v.capacity);
if size <= 0 {
continue;
}
conn.execute(
"insert_transport",
&[
&item.region_id,
&target_region,
&item.product_id,
&size,
&v.id,
],
)?;
remaining -= size;
}
let shipped = best_quantity - remaining.max(0);
// Inventar sofort reduzieren, nachdem Transporte erfolgreich angelegt wurden // Inventar sofort reduzieren, nachdem Transporte erfolgreich angelegt wurden
// Dies stellt sicher, dass Inventar und Transporte immer konsistent sind // Dies stellt sicher, dass Inventar und Transporte immer konsistent sind
@@ -1412,13 +1041,6 @@ impl DirectorWorker {
target_region: i32, target_region: i32,
) -> Result<Vec<TransportVehicle>, DbError> { ) -> Result<Vec<TransportVehicle>, DbError> {
// Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren // Debug: Prüfe zuerst, ob Fahrzeuge in der Quellregion existieren
const QUERY_COUNT_VEHICLES_IN_REGION: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.vehicle v
WHERE v.falukant_user_id = $1
AND v.region_id = $2;
"#;
conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?; conn.prepare("count_vehicles_in_region", QUERY_COUNT_VEHICLES_IN_REGION)?;
let vehicle_count_rows = conn.execute( let vehicle_count_rows = conn.execute(
"count_vehicles_in_region", "count_vehicles_in_region",
@@ -1437,13 +1059,6 @@ impl DirectorWorker {
); );
// Debug: Prüfe, ob eine Route existiert // Debug: Prüfe, ob eine Route existiert
const QUERY_CHECK_ROUTE: &str = r#"
SELECT COUNT(*) AS count
FROM falukant_data.region_distance rd
WHERE (rd.source_region_id = $1 AND rd.target_region_id = $2)
OR (rd.source_region_id = $2 AND rd.target_region_id = $1);
"#;
conn.prepare("check_route", QUERY_CHECK_ROUTE)?; conn.prepare("check_route", QUERY_CHECK_ROUTE)?;
let route_rows = conn.execute( let route_rows = conn.execute(
"check_route", "check_route",
@@ -1494,6 +1109,55 @@ impl DirectorWorker {
Ok(result) Ok(result)
} }
// Helper: load worth_percent values for a product across all regions of a user's branches
fn get_worth_by_region(conn: &mut DbConnection, falukant_user_id: i32, product_id: i32) -> Result<HashMap<i32,f64>, DbError> {
conn.prepare("get_region_worth_for_product", QUERY_GET_REGION_WORTH_FOR_PRODUCT)?;
let rows = conn.execute("get_region_worth_for_product", &[&falukant_user_id, &product_id])?;
let mut map = HashMap::new();
for row in rows {
if let Some(rid) = row.get("region_id").and_then(|v| v.parse::<i32>().ok()) {
let percent = row.get("worth_percent").and_then(|v| v.parse::<f64>().ok()).unwrap_or(100.0);
map.insert(rid, percent);
}
}
Ok(map)
}
// Helper: compute piece price for an arbitrary worth_percent
fn compute_piece_price_for_percent(item: &InventoryItem, percent: f64) -> f64 {
let base_price = item.sell_cost * (percent / 100.0);
let min_price = base_price * 0.6;
let max_price = base_price;
let knowledge_factor = item.quality as f64;
min_price + (max_price - min_price) * (knowledge_factor / 100.0)
}
fn calc_max_capacity(vehicles: &[TransportVehicle]) -> i32 {
vehicles.iter().fold(0i32, |acc, v| acc.saturating_add(v.capacity))
}
fn calc_transport_cost(remote_piece_price: f64, qty: i32) -> f64 {
let total_value = remote_piece_price * qty as f64;
let transport_cost = total_value * 0.01_f64;
if transport_cost < 0.01 { 0.01 } else { transport_cost }
}
fn insert_transports_for_route(conn: &mut DbConnection, item: &InventoryItem, target_region: i32, desired: i32) -> Result<i32, DbError> {
let vehicles = Self::get_transport_vehicles_for_route(conn, item.user_id, item.region_id, target_region)?;
if vehicles.is_empty() { return Ok(0); }
conn.prepare("insert_transport", QUERY_INSERT_TRANSPORT)?;
let mut remaining = desired;
for v in &vehicles {
if remaining <= 0 { break; }
let size = std::cmp::min(remaining, v.capacity);
if size <= 0 { continue; }
conn.execute("insert_transport", &[&item.region_id, &target_region, &item.product_id, &size, &v.id])?;
remaining -= size;
}
Ok(desired - remaining.max(0))
}
/// Plant leere Transporte, um Fahrzeuge zurückzuholen, wenn: /// Plant leere Transporte, um Fahrzeuge zurückzuholen, wenn:
/// - Keine Transportmittel im aktuellen Branch vorhanden sind /// - Keine Transportmittel im aktuellen Branch vorhanden sind
/// - Aber bessere Verkaufspreise in anderen Branches existieren /// - Aber bessere Verkaufspreise in anderen Branches existieren
@@ -1505,12 +1169,6 @@ impl DirectorWorker {
current_branch_id: i32, current_branch_id: i32,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
// Aktuelle Branch-Region ermitteln // Aktuelle Branch-Region ermitteln
const QUERY_GET_BRANCH_REGION: &str = r#"
SELECT region_id
FROM falukant_data.branch
WHERE id = $1;
"#;
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?; conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
let branch_rows = conn.execute("get_branch_region", &[&current_branch_id])?; let branch_rows = conn.execute("get_branch_region", &[&current_branch_id])?;
@@ -1585,14 +1243,6 @@ impl DirectorWorker {
// Berechne Preisvorteil (vereinfacht: verwende worth_percent-Differenz) // Berechne Preisvorteil (vereinfacht: verwende worth_percent-Differenz)
// Hole worth_percent für beide Regionen (für ein beliebiges Produkt) // Hole worth_percent für beide Regionen (für ein beliebiges Produkt)
let mut price_delta = 0.0; let mut price_delta = 0.0;
const QUERY_GET_AVERAGE_WORTH: &str = r#"
SELECT
AVG(CASE WHEN region_id = $1 THEN worth_percent ELSE NULL END) AS current_worth,
AVG(CASE WHEN region_id = $2 THEN worth_percent ELSE NULL END) AS target_worth
FROM falukant_data.town_product_worth
WHERE region_id IN ($1, $2);
"#;
conn.prepare("get_average_worth", QUERY_GET_AVERAGE_WORTH)?; conn.prepare("get_average_worth", QUERY_GET_AVERAGE_WORTH)?;
let worth_rows = conn.execute( let worth_rows = conn.execute(
"get_average_worth", "get_average_worth",
@@ -1667,12 +1317,6 @@ impl DirectorWorker {
inventory_id: i32, inventory_id: i32,
new_quantity: i32, new_quantity: i32,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
const QUERY_UPDATE_INVENTORY_QTY: &str = r#"
UPDATE falukant_data.inventory
SET quantity = $2
WHERE id = $1;
"#;
conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?; conn.prepare("update_inventory_qty", QUERY_UPDATE_INVENTORY_QTY)?;
conn.execute("update_inventory_qty", &[&inventory_id, &new_quantity])?; conn.execute("update_inventory_qty", &[&inventory_id, &new_quantity])?;

File diff suppressed because it is too large Load Diff

View File

@@ -5,48 +5,17 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_NEW_HOUSE_DATA,
QUERY_ADD_NEW_BUYABLE_HOUSE,
QUERY_UPDATE_BUYABLE_HOUSE_STATE,
QUERY_UPDATE_USER_HOUSE_STATE,
};
pub struct HouseWorker { pub struct HouseWorker {
base: BaseWorker, base: BaseWorker,
} }
// SQL-Queries analog zu `houseworker.h`
const QUERY_GET_NEW_HOUSE_DATA: &str = r#"
SELECT
h.id AS house_id
FROM
falukant_type.house AS h
WHERE
random() < 0.0001
AND label_tr <> 'under_bridge';
"#;
const QUERY_ADD_NEW_BUYABLE_HOUSE: &str = r#"
INSERT INTO falukant_data.buyable_house (house_type_id)
VALUES ($1);
"#;
const QUERY_UPDATE_BUYABLE_HOUSE_STATE: &str = r#"
UPDATE falukant_data.buyable_house
SET roof_condition = ROUND(roof_condition - random() * (3 + 0 * id)),
floor_condition = ROUND(floor_condition - random() * (3 + 0 * id)),
wall_condition = ROUND(wall_condition - random() * (3 + 0 * id)),
window_condition = ROUND(window_condition - random() * (3 + 0 * id));
"#;
const QUERY_UPDATE_USER_HOUSE_STATE: &str = r#"
UPDATE falukant_data.user_house
SET roof_condition = ROUND(roof_condition - random() * (3 + 0 * id)),
floor_condition = ROUND(floor_condition - random() * (3 + 0 * id)),
wall_condition = ROUND(wall_condition - random() * (3 + 0 * id)),
window_condition = ROUND(window_condition - random() * (3 + 0 * id))
WHERE house_type_id NOT IN (
SELECT id
FROM falukant_type.house h
WHERE h.label_tr = 'under_bridge'
);
"#;
impl HouseWorker { impl HouseWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self { Self {

View File

@@ -11,6 +11,7 @@ mod user_character;
mod transport; mod transport;
mod weather; mod weather;
mod events; mod events;
mod sql;
pub use base::Worker; pub use base::Worker;
pub use crate::db::ConnectionPool; pub use crate::db::ConnectionPool;

View File

@@ -6,6 +6,23 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_COUNT_OFFICES_PER_REGION,
QUERY_FIND_OFFICE_GAPS,
QUERY_SELECT_NEEDED_ELECTIONS,
QUERY_INSERT_CANDIDATES,
QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES,
QUERY_PROCESS_EXPIRED_AND_FILL,
QUERY_USERS_IN_CITIES_OF_REGIONS,
QUERY_NOTIFY_OFFICE_EXPIRATION,
QUERY_NOTIFY_ELECTION_CREATED,
QUERY_NOTIFY_OFFICE_FILLED,
QUERY_GET_USERS_WITH_EXPIRING_OFFICES,
QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS,
QUERY_GET_USERS_WITH_FILLED_OFFICES,
QUERY_PROCESS_ELECTIONS,
QUERY_TRIM_EXCESS_OFFICES_GLOBAL,
};
pub struct PoliticsWorker { pub struct PoliticsWorker {
base: BaseWorker, base: BaseWorker,
@@ -42,466 +59,6 @@ struct Office {
// --- SQL-Konstanten (1:1 aus politics_worker.h übernommen) ------------------ // --- SQL-Konstanten (1:1 aus politics_worker.h übernommen) ------------------
const QUERY_COUNT_OFFICES_PER_REGION: &str = r#"
WITH
seats_per_region AS (
SELECT
pot.id AS office_type_id,
rt.id AS region_id,
pot.seats_per_region AS seats_total
FROM falukant_type.political_office_type AS pot
JOIN falukant_type.region AS rt
ON pot.region_type = rt.label_tr
),
occupied AS (
SELECT
po.office_type_id,
po.region_id,
COUNT(*) AS occupied_count
FROM falukant_data.political_office AS po
GROUP BY po.office_type_id, po.region_id
),
combined AS (
SELECT
spr.region_id,
spr.seats_total AS required_count,
COALESCE(o.occupied_count, 0) AS occupied_count
FROM seats_per_region AS spr
LEFT JOIN occupied AS o
ON spr.office_type_id = o.office_type_id
AND spr.region_id = o.region_id
)
SELECT
region_id,
SUM(required_count) AS required_count,
SUM(occupied_count) AS occupied_count
FROM combined
GROUP BY region_id;
"#;
/// Findet alle Kombinationen aus Amtstyp und Region, für die laut
/// `seats_per_region` mehr Sitze existieren sollten, als aktuell in
/// `falukant_data.political_office` belegt sind. Es werden ausschließlich
/// positive Differenzen (Gaps) zurückgegeben wenn `occupied > required`
/// (z.B. nach Reduktion der Sitzzahl), wird **nichts** gelöscht und die
/// Kombination erscheint hier nicht.
const QUERY_FIND_OFFICE_GAPS: &str = r#"
WITH
seats AS (
SELECT
pot.id AS office_type_id,
rt.id AS region_id,
pot.seats_per_region AS seats_total
FROM falukant_type.political_office_type AS pot
JOIN falukant_type.region AS rt
ON pot.region_type = rt.label_tr
),
occupied AS (
SELECT
po.office_type_id,
po.region_id,
COUNT(*) AS occupied_count
FROM falukant_data.political_office AS po
GROUP BY po.office_type_id, po.region_id
)
SELECT
s.office_type_id,
s.region_id,
(s.seats_total - COALESCE(o.occupied_count, 0)) AS gaps
FROM seats AS s
LEFT JOIN occupied AS o
ON s.office_type_id = o.office_type_id
AND s.region_id = o.region_id
WHERE (s.seats_total - COALESCE(o.occupied_count, 0)) > 0;
"#;
const QUERY_SELECT_NEEDED_ELECTIONS: &str = r#"
WITH
target_date AS (
SELECT NOW()::date AS election_date
),
expired_today AS (
DELETE FROM falukant_data.political_office AS po
USING falukant_type.political_office_type AS pot
WHERE po.office_type_id = pot.id
AND (po.created_at + (pot.term_length * INTERVAL '1 day'))::date
= (SELECT election_date FROM target_date)
RETURNING
pot.id AS office_type_id,
po.region_id AS region_id
),
gaps_per_region AS (
SELECT
office_type_id,
region_id,
COUNT(*) AS gaps
FROM expired_today
GROUP BY office_type_id, region_id
),
to_schedule AS (
SELECT
g.office_type_id,
g.region_id,
g.gaps,
td.election_date
FROM gaps_per_region AS g
CROSS JOIN target_date AS td
WHERE NOT EXISTS (
SELECT 1
FROM falukant_data.election AS e
WHERE e.office_type_id = g.office_type_id
AND e.region_id = g.region_id
AND e.date::date = td.election_date
)
),
new_elections AS (
INSERT INTO falukant_data.election
(office_type_id, date, posts_to_fill, created_at, updated_at, region_id)
SELECT
ts.office_type_id,
ts.election_date,
ts.gaps,
NOW(),
NOW(),
ts.region_id
FROM to_schedule AS ts
RETURNING
id AS election_id,
region_id,
posts_to_fill
)
SELECT
ne.election_id,
ne.region_id,
ne.posts_to_fill
FROM new_elections AS ne
ORDER BY ne.region_id, ne.election_id;
"#;
const QUERY_INSERT_CANDIDATES: &str = r#"
INSERT INTO falukant_data.candidate
(election_id, character_id, created_at, updated_at)
SELECT
$1 AS election_id,
sub.id AS character_id,
NOW() AS created_at,
NOW() AS updated_at
FROM (
WITH RECURSIVE region_tree AS (
SELECT r.id
FROM falukant_data.region AS r
WHERE r.id = $2
UNION ALL
SELECT r2.id
FROM falukant_data.region AS r2
JOIN region_tree AS rt
ON r2.parent_id = rt.id
)
SELECT ch.id
FROM falukant_data.character AS ch
JOIN region_tree AS rt2
ON ch.region_id = rt2.id
WHERE ch.user_id IS NULL
AND ch.birthdate <= NOW() - INTERVAL '21 days'
AND ch.title_of_nobility IN (
SELECT id
FROM falukant_type.title
WHERE label_tr != 'noncivil'
)
ORDER BY RANDOM()
LIMIT ($3 * 2)
) AS sub(id);
"#;
/// Wahlen finden, für die noch keine Kandidaten existieren.
/// Das umfasst sowohl frisch eingetragene Wahlen als auch manuell
/// angelegte Wahlen, solange:
/// - das Wahl-Datum heute oder in der Zukunft liegt und
/// - noch kein Eintrag in `falukant_data.candidate` existiert.
const QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES: &str = r#"
SELECT
e.id AS election_id,
e.region_id AS region_id,
e.posts_to_fill
FROM falukant_data.election AS e
WHERE e.region_id IS NOT NULL
AND e.posts_to_fill > 0
AND e.date::date >= CURRENT_DATE
AND NOT EXISTS (
SELECT 1
FROM falukant_data.candidate AS c
WHERE c.election_id = e.id
);
"#;
const QUERY_PROCESS_EXPIRED_AND_FILL: &str = r#"
WITH
expired_offices AS (
DELETE FROM falukant_data.political_office AS po
USING falukant_type.political_office_type AS pot
WHERE po.office_type_id = pot.id
AND (po.created_at + (pot.term_length * INTERVAL '1 day')) <= NOW()
RETURNING
pot.id AS office_type_id,
po.region_id AS region_id
),
distinct_types AS (
SELECT DISTINCT office_type_id, region_id FROM expired_offices
),
votes_per_candidate AS (
SELECT
dt.office_type_id,
dt.region_id,
c.character_id,
COUNT(v.id) AS vote_count
FROM distinct_types AS dt
JOIN falukant_data.election AS e
ON e.office_type_id = dt.office_type_id
JOIN falukant_data.vote AS v
ON v.election_id = e.id
JOIN falukant_data.candidate AS c
ON c.election_id = e.id
AND c.id = v.candidate_id
WHERE e.date >= (NOW() - INTERVAL '30 days')
GROUP BY dt.office_type_id, dt.region_id, c.character_id
),
ranked_winners AS (
SELECT
vpc.office_type_id,
vpc.region_id,
vpc.character_id,
ROW_NUMBER() OVER (
PARTITION BY vpc.office_type_id, vpc.region_id
ORDER BY vpc.vote_count DESC
) AS rn
FROM votes_per_candidate AS vpc
),
selected_winners AS (
SELECT
rw.office_type_id,
rw.region_id,
rw.character_id
FROM ranked_winners AS rw
JOIN falukant_type.political_office_type AS pot
ON pot.id = rw.office_type_id
WHERE rw.rn <= pot.seats_per_region
),
insert_winners AS (
INSERT INTO falukant_data.political_office
(office_type_id, character_id, created_at, updated_at, region_id)
SELECT
sw.office_type_id,
sw.character_id,
NOW(),
NOW(),
sw.region_id
FROM selected_winners AS sw
RETURNING id AS new_office_id, office_type_id, character_id, region_id
),
count_inserted AS (
SELECT
office_type_id,
region_id,
COUNT(*) AS inserted_count
FROM insert_winners
GROUP BY office_type_id, region_id
),
needed_to_fill AS (
SELECT
dt.office_type_id,
dt.region_id,
(pot.seats_per_region - COALESCE(ci.inserted_count, 0)) AS gaps
FROM distinct_types AS dt
JOIN falukant_type.political_office_type AS pot
ON pot.id = dt.office_type_id
LEFT JOIN count_inserted AS ci
ON ci.office_type_id = dt.office_type_id
AND ci.region_id = dt.region_id
WHERE (pot.seats_per_region - COALESCE(ci.inserted_count, 0)) > 0
),
random_candidates AS (
SELECT
rtf.office_type_id,
rtf.region_id,
ch.id AS character_id,
ROW_NUMBER() OVER (
PARTITION BY rtf.office_type_id, rtf.region_id
ORDER BY RANDOM()
) AS rn
FROM needed_to_fill AS rtf
JOIN falukant_data.character AS ch
ON ch.region_id = rtf.region_id
AND ch.user_id IS NULL
AND ch.birthdate <= NOW() - INTERVAL '21 days'
AND ch.title_of_nobility IN (
SELECT id FROM falukant_type.title WHERE label_tr != 'noncivil'
)
AND NOT EXISTS (
SELECT 1
FROM falukant_data.political_office AS po2
JOIN falukant_type.political_office_type AS pot2
ON pot2.id = po2.office_type_id
WHERE po2.character_id = ch.id
AND (po2.created_at + (pot2.term_length * INTERVAL '1 day')) >
NOW() + INTERVAL '2 days'
)
),
insert_random AS (
INSERT INTO falukant_data.political_office
(office_type_id, character_id, created_at, updated_at, region_id)
SELECT
rc.office_type_id,
rc.character_id,
NOW(),
NOW(),
rc.region_id
FROM random_candidates AS rc
JOIN needed_to_fill AS rtf
ON rtf.office_type_id = rc.office_type_id
AND rtf.region_id = rc.region_id
WHERE rc.rn <= rtf.gaps
RETURNING id AS new_office_id, office_type_id, character_id, region_id
)
SELECT
new_office_id AS office_id,
office_type_id,
character_id,
region_id
FROM insert_winners
UNION ALL
SELECT
new_office_id AS office_id,
office_type_id,
character_id,
region_id
FROM insert_random;
"#;
const QUERY_USERS_IN_CITIES_OF_REGIONS: &str = r#"
WITH RECURSIVE region_tree AS (
SELECT id
FROM falukant_data.region
WHERE id = $1
UNION ALL
SELECT r2.id
FROM falukant_data.region AS r2
JOIN region_tree AS rt
ON r2.parent_id = rt.id
)
SELECT DISTINCT ch.user_id
FROM falukant_data.character AS ch
JOIN region_tree AS rt2
ON ch.region_id = rt2.id
WHERE ch.user_id IS NOT NULL;
"#;
const QUERY_NOTIFY_OFFICE_EXPIRATION: &str = r#"
INSERT INTO falukant_log.notification
(user_id, tr, created_at, updated_at)
SELECT
po.character_id,
'notify_office_expiring',
NOW(),
NOW()
FROM falukant_data.political_office AS po
JOIN falukant_type.political_office_type AS pot
ON po.office_type_id = pot.id
WHERE (po.created_at + (pot.term_length * INTERVAL '1 day'))
BETWEEN (NOW() + INTERVAL '2 days')
AND (NOW() + INTERVAL '2 days' + INTERVAL '1 second');
"#;
const QUERY_NOTIFY_ELECTION_CREATED: &str = r#"
INSERT INTO falukant_log.notification
(user_id, tr, created_at, updated_at)
VALUES
($1, 'notify_election_created', NOW(), NOW());
"#;
const QUERY_NOTIFY_OFFICE_FILLED: &str = r#"
INSERT INTO falukant_log.notification
(user_id, tr, created_at, updated_at)
VALUES
($1, 'notify_office_filled', NOW(), NOW());
"#;
const QUERY_GET_USERS_WITH_EXPIRING_OFFICES: &str = r#"
SELECT DISTINCT ch.user_id
FROM falukant_data.political_office AS po
JOIN falukant_type.political_office_type AS pot
ON po.office_type_id = pot.id
JOIN falukant_data.character AS ch
ON po.character_id = ch.id
WHERE ch.user_id IS NOT NULL
AND (po.created_at + (pot.term_length * INTERVAL '1 day'))
BETWEEN (NOW() + INTERVAL '2 days')
AND (NOW() + INTERVAL '2 days' + INTERVAL '1 second');
"#;
const QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS: &str = r#"
SELECT DISTINCT ch.user_id
FROM falukant_data.election AS e
JOIN falukant_data.character AS ch
ON ch.region_id = e.region_id
WHERE ch.user_id IS NOT NULL
AND e.date >= NOW() - INTERVAL '1 day';
"#;
const QUERY_GET_USERS_WITH_FILLED_OFFICES: &str = r#"
SELECT DISTINCT ch.user_id
FROM falukant_data.political_office AS po
JOIN falukant_data.character AS ch
ON po.character_id = ch.id
WHERE ch.user_id IS NOT NULL
AND po.created_at >= NOW() - INTERVAL '1 minute';
"#;
const QUERY_PROCESS_ELECTIONS: &str = r#"
SELECT office_id, office_type_id, character_id, region_id
FROM falukant_data.process_elections();
"#;
/// Schneidet für alle Amtstyp/Region-Kombinationen überzählige Einträge in
/// `falukant_data.political_office` ab, so dass höchstens
/// `seats_per_region` Ämter pro Kombination übrig bleiben.
///
/// Die Auswahl, welche Ämter entfernt werden, erfolgt deterministisch über
/// `created_at DESC`: die **neuesten** Ämter bleiben bevorzugt im Amt,
/// ältere Einträge werden zuerst entfernt. Damit lässt sich das Verhalten
/// später leicht anpassen (z.B. nach bestimmten Prioritäten).
const QUERY_TRIM_EXCESS_OFFICES_GLOBAL: &str = r#"
WITH seats AS (
SELECT
pot.id AS office_type_id,
rt.id AS region_id,
pot.seats_per_region AS seats_total
FROM falukant_type.political_office_type AS pot
JOIN falukant_type.region AS rt
ON pot.region_type = rt.label_tr
),
ranked AS (
SELECT
po.id,
po.office_type_id,
po.region_id,
s.seats_total,
ROW_NUMBER() OVER (
PARTITION BY po.office_type_id, po.region_id
ORDER BY po.created_at DESC
) AS rn
FROM falukant_data.political_office AS po
JOIN seats AS s
ON s.office_type_id = po.office_type_id
AND s.region_id = po.region_id
),
to_delete AS (
SELECT id
FROM ranked
WHERE rn > seats_total
)
DELETE FROM falukant_data.political_office
WHERE id IN (SELECT id FROM to_delete);
"#;
impl PoliticsWorker { impl PoliticsWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -542,11 +99,11 @@ impl PoliticsWorker {
broker: &MessageBroker, broker: &MessageBroker,
) -> Result<(), DbError> { ) -> Result<(), DbError> {
// 1) Optional: Positionen evaluieren (aktuell nur Logging/Struktur) // 1) Optional: Positionen evaluieren (aktuell nur Logging/Struktur)
let _ = Self::evaluate_political_positions(pool)?; Self::evaluate_political_positions(pool)?;
// 2) Schema-Änderungen abgleichen: neue / zusätzliche Ämter anlegen, // 2) Schema-Änderungen abgleichen: neue / zusätzliche Ämter anlegen,
// ohne bestehende Amtsinhaber bei Reduktion zu entfernen. // ohne bestehende Amtsinhaber bei Reduktion zu entfernen.
let _ = Self::sync_offices_with_types(pool)?; Self::sync_offices_with_types(pool)?;
// 3) Ämter, die bald auslaufen, benachrichtigen // 3) Ämter, die bald auslaufen, benachrichtigen
Self::notify_office_expirations(pool, broker)?; Self::notify_office_expirations(pool, broker)?;
@@ -608,8 +165,9 @@ impl PoliticsWorker {
conn.prepare( conn.prepare(
"count_offices_per_region", "count_offices_per_region",
QUERY_COUNT_OFFICES_PER_REGION, QUERY_COUNT_OFFICES_PER_REGION,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare count_offices_per_region: {e}")))?;
let rows = conn.execute("count_offices_per_region", &[])?; let rows = conn.execute("count_offices_per_region", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec count_offices_per_region: {e}")))?;
let mut result = Vec::with_capacity(rows.len()); let mut result = Vec::with_capacity(rows.len());
for row in rows { for row in rows {
@@ -645,7 +203,7 @@ impl PoliticsWorker {
conn.prepare( conn.prepare(
"trim_excess_offices_global", "trim_excess_offices_global",
QUERY_TRIM_EXCESS_OFFICES_GLOBAL, QUERY_TRIM_EXCESS_OFFICES_GLOBAL,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare trim_excess_offices_global: {e}")))?;
if let Err(err) = conn.execute("trim_excess_offices_global", &[]) { if let Err(err) = conn.execute("trim_excess_offices_global", &[]) {
eprintln!( eprintln!(
@@ -672,8 +230,10 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("find_office_gaps", QUERY_FIND_OFFICE_GAPS)?; conn.prepare("find_office_gaps", QUERY_FIND_OFFICE_GAPS)
let rows = conn.execute("find_office_gaps", &[])?; .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare find_office_gaps: {e}")))?;
let rows = conn.execute("find_office_gaps", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec find_office_gaps: {e}")))?;
if rows.is_empty() { if rows.is_empty() {
return Ok(()); return Ok(());
@@ -755,8 +315,10 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("select_needed_elections", QUERY_SELECT_NEEDED_ELECTIONS)?; conn.prepare("select_needed_elections", QUERY_SELECT_NEEDED_ELECTIONS)
let rows = conn.execute("select_needed_elections", &[])?; .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare select_needed_elections: {e}")))?;
let rows = conn.execute("select_needed_elections", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec select_needed_elections: {e}")))?;
let mut elections = Vec::with_capacity(rows.len()); let mut elections = Vec::with_capacity(rows.len());
for row in rows { for row in rows {
@@ -785,8 +347,9 @@ impl PoliticsWorker {
conn.prepare( conn.prepare(
"select_elections_needing_candidates", "select_elections_needing_candidates",
QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES, QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare select_elections_needing_candidates: {e}")))?;
let rows = conn.execute("select_elections_needing_candidates", &[])?; let rows = conn.execute("select_elections_needing_candidates", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec select_elections_needing_candidates: {e}")))?;
let mut elections = Vec::with_capacity(rows.len()); let mut elections = Vec::with_capacity(rows.len());
for row in rows { for row in rows {
@@ -813,13 +376,14 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("insert_candidates", QUERY_INSERT_CANDIDATES)?; conn.prepare("insert_candidates", QUERY_INSERT_CANDIDATES)
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare insert_candidates: {e}")))?;
for e in elections { for e in elections {
conn.execute( conn.execute(
"insert_candidates", "insert_candidates",
&[&e.election_id, &e.region_id, &e.posts_to_fill], &[&e.election_id, &e.region_id, &e.posts_to_fill],
)?; ).map_err(|err| DbError::new(format!("[PoliticsWorker] exec insert_candidates eid={} rid={}: {}", e.election_id, e.region_id, err)))?;
} }
Ok(()) Ok(())
@@ -832,8 +396,10 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("process_expired_and_fill", QUERY_PROCESS_EXPIRED_AND_FILL)?; conn.prepare("process_expired_and_fill", QUERY_PROCESS_EXPIRED_AND_FILL)
let rows = conn.execute("process_expired_and_fill", &[])?; .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare process_expired_and_fill: {e}")))?;
let rows = conn.execute("process_expired_and_fill", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec process_expired_and_fill: {e}")))?;
Ok(rows Ok(rows
.into_iter() .into_iter()
@@ -876,14 +442,17 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("notify_office_expiration", QUERY_NOTIFY_OFFICE_EXPIRATION)?; conn.prepare("notify_office_expiration", QUERY_NOTIFY_OFFICE_EXPIRATION)
conn.execute("notify_office_expiration", &[])?; .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_office_expiration: {e}")))?;
conn.execute("notify_office_expiration", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_office_expiration: {e}")))?;
conn.prepare( conn.prepare(
"get_users_with_expiring_offices", "get_users_with_expiring_offices",
QUERY_GET_USERS_WITH_EXPIRING_OFFICES, QUERY_GET_USERS_WITH_EXPIRING_OFFICES,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_with_expiring_offices: {e}")))?;
let rows = conn.execute("get_users_with_expiring_offices", &[])?; let rows = conn.execute("get_users_with_expiring_offices", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_with_expiring_offices: {e}")))?;
for row in rows { for row in rows {
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {
@@ -905,17 +474,20 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("notify_election_created", QUERY_NOTIFY_ELECTION_CREATED)?; conn.prepare("notify_election_created", QUERY_NOTIFY_ELECTION_CREATED)
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_election_created: {e}")))?;
for uid in user_ids { for uid in user_ids {
conn.execute("notify_election_created", &[uid])?; conn.execute("notify_election_created", &[uid])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_election_created uid={}: {}", uid, e)))?;
} }
conn.prepare( conn.prepare(
"get_users_in_regions_with_elections", "get_users_in_regions_with_elections",
QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS, QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_in_regions_with_elections: {e}")))?;
let rows = conn.execute("get_users_in_regions_with_elections", &[])?; let rows = conn.execute("get_users_in_regions_with_elections", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_in_regions_with_elections: {e}")))?;
for row in rows { for row in rows {
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {
@@ -937,7 +509,8 @@ impl PoliticsWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("notify_office_filled", QUERY_NOTIFY_OFFICE_FILLED)?; conn.prepare("notify_office_filled", QUERY_NOTIFY_OFFICE_FILLED)
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_office_filled: {e}")))?;
for office in new_offices { for office in new_offices {
// Debug-Logging mit allen Feldern, damit sie aktiv genutzt werden // Debug-Logging mit allen Feldern, damit sie aktiv genutzt werden
@@ -945,14 +518,16 @@ impl PoliticsWorker {
"[PoliticsWorker] Office filled: id={}, type={}, character={}, region={}", "[PoliticsWorker] Office filled: id={}, type={}, character={}, region={}",
office.office_id, office.office_type_id, office.character_id, office.region_id office.office_id, office.office_type_id, office.character_id, office.region_id
); );
conn.execute("notify_office_filled", &[&office.character_id])?; conn.execute("notify_office_filled", &[&office.character_id])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_office_filled office_id={} character_id={}: {}", office.office_id, office.character_id, e)))?;
} }
conn.prepare( conn.prepare(
"get_users_with_filled_offices", "get_users_with_filled_offices",
QUERY_GET_USERS_WITH_FILLED_OFFICES, QUERY_GET_USERS_WITH_FILLED_OFFICES,
)?; ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_with_filled_offices: {e}")))?;
let rows = conn.execute("get_users_with_filled_offices", &[])?; let rows = conn.execute("get_users_with_filled_offices", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_with_filled_offices: {e}")))?;
for row in rows { for row in rows {
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {

View File

@@ -7,6 +7,14 @@ use std::time::{Duration, Instant};
use crate::db::ConnectionPool; use crate::db::ConnectionPool;
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_FINISHED_PRODUCTIONS,
QUERY_GET_AVAILABLE_STOCKS,
QUERY_DELETE_PRODUCTION,
QUERY_INSERT_INVENTORY,
QUERY_INSERT_UPDATE_PRODUCTION_LOG,
QUERY_ADD_OVERPRODUCTION_NOTIFICATION,
};
/// Abbildet eine abgeschlossene Produktion aus der Datenbank. /// Abbildet eine abgeschlossene Produktion aus der Datenbank.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -28,128 +36,6 @@ struct StockInfo {
filled: i32, filled: i32,
} }
// SQL-Queries analog zur C++-Implementierung
// Wichtig: Pro `production.id` darf hier **genau eine Zeile** zurückkommen.
// Durch die Joins auf Director/Knowledge/Wetter kann es sonst zu Mehrfachzeilen mit
// unterschiedlicher berechneter Qualität kommen. Deshalb wird die Qualität
// über MAX() aggregiert und nach `production_id` gruppiert.
const QUERY_GET_FINISHED_PRODUCTIONS: &str = r#"
SELECT
p.id AS production_id,
p.branch_id,
p.product_id,
p.quantity,
p.start_timestamp,
pr.production_time,
-- Aggregierte Qualitätsbewertung pro Produktion inkl. Wettereinfluss
MAX(
GREATEST(
0,
LEAST(
100,
ROUND(
(
CASE
WHEN k2.id IS NOT NULL
THEN (k.knowledge * 2 + k2.knowledge) / 3
ELSE k.knowledge
END
)::numeric
+ COALESCE(pwe.quality_effect, 0) * 2.5
)
)
)::int
) AS quality,
br.region_id,
br.falukant_user_id AS user_id
FROM falukant_data.production p
JOIN falukant_type.product pr
ON p.product_id = pr.id
JOIN falukant_data.branch br
ON p.branch_id = br.id
JOIN falukant_data.character c
ON c.user_id = br.falukant_user_id
JOIN falukant_data.knowledge k
ON p.product_id = k.product_id
AND k.character_id = c.id
JOIN falukant_data.stock s
ON s.branch_id = br.id
-- Optionaler Wettereinfluss: pro (Produkt, Wetter) genau ein Datensatz
LEFT JOIN falukant_type.product_weather_effect pwe
ON pwe.product_id = p.product_id
AND pwe.weather_type_id = p.weather_type_id
LEFT JOIN falukant_data.director d
ON d.employer_user_id = c.user_id
LEFT JOIN falukant_data.knowledge k2
ON k2.character_id = d.director_character_id
AND k2.product_id = p.product_id
WHERE p.start_timestamp + INTERVAL '1 minute' * pr.production_time <= NOW()
GROUP BY
p.id,
p.branch_id,
p.product_id,
p.quantity,
p.start_timestamp,
pr.production_time,
br.region_id,
br.falukant_user_id
ORDER BY p.start_timestamp;
"#;
const QUERY_GET_AVAILABLE_STOCKS: &str = r#"
SELECT
stock.id,
stock.quantity AS total_capacity,
(
SELECT COALESCE(SUM(inventory.quantity), 0)
FROM falukant_data.inventory
WHERE inventory.stock_id = stock.id
) AS filled,
stock.branch_id
FROM falukant_data.stock stock
JOIN falukant_data.branch branch
ON stock.branch_id = branch.id
WHERE branch.id = $1
ORDER BY total_capacity DESC;
"#;
const QUERY_DELETE_PRODUCTION: &str = r#"
DELETE FROM falukant_data.production
WHERE id = $1;
"#;
const QUERY_INSERT_INVENTORY: &str = r#"
INSERT INTO falukant_data.inventory (
stock_id,
product_id,
quantity,
quality,
produced_at
) VALUES ($1, $2, $3, $4, NOW());
"#;
const QUERY_INSERT_UPDATE_PRODUCTION_LOG: &str = r#"
INSERT INTO falukant_log.production (
region_id,
product_id,
quantity,
producer_id,
production_date
) VALUES ($1, $2, $3, $4, CURRENT_DATE)
ON CONFLICT (producer_id, product_id, region_id, production_date)
DO UPDATE
SET quantity = falukant_log.production.quantity + EXCLUDED.quantity;
"#;
const QUERY_ADD_OVERPRODUCTION_NOTIFICATION: &str = r#"
INSERT INTO falukant_log.notification (
user_id,
tr,
shown,
created_at,
updated_at
) VALUES ($1, $2, FALSE, NOW(), NOW());
"#;
pub struct ProduceWorker { pub struct ProduceWorker {
base: BaseWorker, base: BaseWorker,
@@ -240,6 +126,8 @@ impl ProduceWorker {
self.base self.base
.set_current_step("Process Finished Productions"); .set_current_step("Process Finished Productions");
// Nur Fehler loggen; keine Debug-Infos
for production in finished_productions { for production in finished_productions {
self.handle_finished_production(&production); self.handle_finished_production(&production);
} }
@@ -287,21 +175,26 @@ impl ProduceWorker {
} }
}; };
// Ruhemodus: keine Info-Logs, nur Fehler
for stock in stocks { for stock in stocks {
if remaining_quantity <= 0 { if remaining_quantity <= 0 {
break; break;
} }
let free_capacity = stock.total_capacity - stock.filled; let free_capacity = stock.total_capacity - stock.filled;
// keine Debug-Ausgabe
if free_capacity <= 0 { if free_capacity <= 0 {
continue; continue;
} }
let to_store = min(remaining_quantity, free_capacity); let to_store = min(remaining_quantity, free_capacity);
// keine Debug-Ausgabe
if !self.store_in_stock(stock.stock_id, product_id, to_store, quality) { if !self.store_in_stock(stock.stock_id, product_id, to_store, quality) {
return false; return false;
} }
remaining_quantity -= to_store; remaining_quantity -= to_store;
// keine Debug-Ausgabe
} }
if remaining_quantity == 0 { if remaining_quantity == 0 {
@@ -330,12 +223,25 @@ impl ProduceWorker {
quantity: i32, quantity: i32,
quality: i32, quality: i32,
) -> bool { ) -> bool {
// Versuch: vorhandenen Inventory-Posten für (stock, product) erhöhen
match self.update_inventory_by_stock_product(stock_id, product_id, quantity, quality) {
Ok(updated) => {
if updated {
return true;
}
// Wenn kein Update stattfand, Insert versuchen
if let Err(err) = self.insert_inventory(stock_id, product_id, quantity, quality) { if let Err(err) = self.insert_inventory(stock_id, product_id, quantity, quality) {
eprintln!("[ProduceWorker] Fehler in storeInStock: {err}"); eprintln!("[ProduceWorker] Fehler beim Insert in storeInStock: {err}");
return false; return false;
} }
true true
} }
Err(err) => {
eprintln!("[ProduceWorker] Fehler beim Update in storeInStock: {err}");
false
}
}
}
fn delete_production(&self, production_id: i32) { fn delete_production(&self, production_id: i32) {
if let Err(err) = self.remove_production(production_id) { if let Err(err) = self.remove_production(production_id) {
@@ -420,10 +326,32 @@ impl ProduceWorker {
.map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("insert_inventory", QUERY_INSERT_INVENTORY)?; conn.prepare("insert_inventory", QUERY_INSERT_INVENTORY)?;
conn.execute("insert_inventory", &[&stock_id, &product_id, &quantity, &quality])?; let _rows = conn.execute("insert_inventory", &[&stock_id, &product_id, &quantity, &quality])?;
Ok(()) Ok(())
} }
fn update_inventory_by_stock_product(
&self,
stock_id: i32,
product_id: i32,
quantity: i32,
quality: i32,
) -> Result<bool, crate::db::DbError> {
use crate::worker::sql::QUERY_UPDATE_INVENTORY_BY_STOCK_PRODUCT;
let mut conn = self
.base
.pool
.get()
.map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("update_inventory_by_stock_product", QUERY_UPDATE_INVENTORY_BY_STOCK_PRODUCT)?;
let rows = conn.execute(
"update_inventory_by_stock_product",
&[&stock_id, &product_id, &quantity, &quality],
)?;
Ok(!rows.is_empty())
}
fn remove_production(&self, production_id: i32) -> Result<(), crate::db::DbError> { fn remove_production(&self, production_id: i32) -> Result<(), crate::db::DbError> {
let mut conn = self let mut conn = self
.base .base

1632
src/worker/sql.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -8,49 +8,17 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_TOWNS,
QUERY_INSERT_STOCK,
QUERY_CLEANUP_STOCK,
QUERY_GET_REGION_USERS,
};
pub struct StockageManager { pub struct StockageManager {
base: BaseWorker, base: BaseWorker,
} }
// SQL-Queries analog zu `stockagemanager.h`
const QUERY_GET_TOWNS: &str = r#"
SELECT fdr.id
FROM falukant_data.region fdr
JOIN falukant_type.region ftr
ON ftr.id = fdr.region_type_id
WHERE ftr.label_tr = 'city';
"#;
const QUERY_INSERT_STOCK: &str = r#"
INSERT INTO falukant_data.buyable_stock (region_id, stock_type_id, quantity)
SELECT
$1 AS region_id,
s.id AS stock_type_id,
GREATEST(1, ROUND(RANDOM() * 5 * COUNT(br.id))) AS quantity
FROM falukant_data.branch AS br
CROSS JOIN falukant_type.stock AS s
WHERE br.region_id = $1
GROUP BY s.id
ORDER BY RANDOM()
LIMIT GREATEST(
ROUND(RANDOM() * (SELECT COUNT(id) FROM falukant_type.stock)),
1
);
"#;
const QUERY_CLEANUP_STOCK: &str = r#"
DELETE FROM falukant_data.buyable_stock
WHERE quantity <= 0;
"#;
const QUERY_GET_REGION_USERS: &str = r#"
SELECT c.user_id
FROM falukant_data.character c
WHERE c.region_id = $1
AND c.user_id IS NOT NULL;
"#;
impl StockageManager { impl StockageManager {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self { Self {

View File

@@ -6,7 +6,15 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_ARRIVED_TRANSPORTS,
QUERY_GET_AVAILABLE_STOCKS,
QUERY_INSERT_INVENTORY,
QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT,
QUERY_DELETE_TRANSPORT,
QUERY_GET_BRANCH_REGION,
QUERY_UPDATE_TRANSPORT_SIZE,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct ArrivedTransport { struct ArrivedTransport {
id: i32, id: i32,
@@ -26,98 +34,6 @@ struct StockInfo {
filled: i32, filled: i32,
} }
// Ermittelt alle Transporte, die gemäß Distanz und Fahrzeuggeschwindigkeit bereits
// angekommen sein sollten. Die Reisezeit wird hier vereinfacht als
// travel_minutes = distance / speed
// interpretiert, d.h. `speed` gibt die Einheiten "Distanz pro Minute" an.
const QUERY_GET_ARRIVED_TRANSPORTS: &str = r#"
SELECT
t.id,
t.product_id,
t.size,
t.vehicle_id,
t.source_region_id,
t.target_region_id,
b_target.id AS target_branch_id,
b_source.id AS source_branch_id,
rd.distance AS distance,
v.falukant_user_id AS user_id
FROM falukant_data.transport AS t
JOIN falukant_data.vehicle AS v
ON v.id = t.vehicle_id
JOIN falukant_type.vehicle AS vt
ON vt.id = v.vehicle_type_id
JOIN falukant_data.region_distance AS rd
ON ((rd.source_region_id = t.source_region_id
AND rd.target_region_id = t.target_region_id)
OR (rd.source_region_id = t.target_region_id
AND rd.target_region_id = t.source_region_id))
AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
LEFT JOIN falukant_data.branch AS b_target
ON b_target.region_id = t.target_region_id
AND b_target.falukant_user_id = v.falukant_user_id
LEFT JOIN falukant_data.branch AS b_source
ON b_source.region_id = t.source_region_id
AND b_source.falukant_user_id = v.falukant_user_id
WHERE vt.speed > 0
AND t.created_at
+ (rd.distance / vt.speed::double precision) * INTERVAL '1 minute'
<= NOW();
"#;
// Verfügbare Lagerplätze in einem Branch, analog zur Logik im ProduceWorker.
const QUERY_GET_AVAILABLE_STOCKS: &str = r#"
SELECT
stock.id,
stock.quantity AS total_capacity,
(
SELECT COALESCE(SUM(inventory.quantity), 0)
FROM falukant_data.inventory
WHERE inventory.stock_id = stock.id
) AS filled
FROM falukant_data.stock stock
JOIN falukant_data.branch branch
ON stock.branch_id = branch.id
WHERE branch.id = $1
ORDER BY total_capacity DESC;
"#;
const QUERY_INSERT_INVENTORY: &str = r#"
INSERT INTO falukant_data.inventory (
stock_id,
product_id,
quantity,
quality,
produced_at
) VALUES ($1, $2, $3, $4, NOW());
"#;
const QUERY_UPDATE_VEHICLE_AFTER_TRANSPORT: &str = r#"
UPDATE falukant_data.vehicle
SET region_id = $2,
condition = GREATEST(0, condition - $3::int),
available_from = NOW(),
updated_at = NOW()
WHERE id = $1;
"#;
const QUERY_DELETE_TRANSPORT: &str = r#"
DELETE FROM falukant_data.transport
WHERE id = $1;
"#;
/// Notification-Eintrag, analog zur Overproduction-Notification im ProduceWorker.
/// `tr` wird hier als JSON-String mit Übersetzungs-Key und Werten gespeichert.
const QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION: &str = r#"
INSERT INTO falukant_log.notification (
user_id,
tr,
shown,
created_at,
updated_at
) VALUES ($1, $2, FALSE, NOW(), NOW());
"#;
pub struct TransportWorker { pub struct TransportWorker {
base: BaseWorker, base: BaseWorker,
} }
@@ -224,10 +140,7 @@ impl TransportWorker {
// Leere Transporte (ohne Produkt) werden anders behandelt // Leere Transporte (ohne Produkt) werden anders behandelt
if t.product_id.is_none() { if t.product_id.is_none() {
// Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen // Leerer Transport: Nur Fahrzeug-Region aktualisieren und Transport löschen
eprintln!( // keine Info-Logs
"[TransportWorker] Leerer Transport {} angekommen: Fahrzeug {} zurückgeholt",
t.id, t.vehicle_id
);
Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?; Self::update_vehicle_after_transport(pool, t.vehicle_id, t.target_branch_id, t.distance)?;
Self::delete_transport(pool, t.id)?; Self::delete_transport(pool, t.id)?;
@@ -312,19 +225,7 @@ impl TransportWorker {
} }
} }
// Nutzer informieren, dass Ware noch im Transportmittel liegt. // Keine Notification für wartende Transporte, um Notification-System zu entlasten.
if t.user_id > 0 {
if let Err(err) = Self::insert_transport_waiting_notification(
pool,
t.user_id,
product_id,
remaining_quantity,
) {
eprintln!(
"[TransportWorker] Fehler beim Schreiben der Transport-Waiting-Notification: {err}"
);
}
}
} }
Ok(()) Ok(())
@@ -421,18 +322,11 @@ impl TransportWorker {
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
// Region des Branches abrufen // Region des Branches abrufen
const QUERY_GET_BRANCH_REGION: &str = r#"
SELECT region_id
FROM falukant_data.branch
WHERE id = $1
LIMIT 1;
"#;
conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?; conn.prepare("get_branch_region", QUERY_GET_BRANCH_REGION)?;
let rows = conn.execute("get_branch_region", &[&target_branch_id])?; let rows = conn.execute("get_branch_region", &[&target_branch_id])?;
let region_id = rows let region_id = rows
.get(0) .first()
.and_then(|r| r.get("region_id")) .and_then(|r| r.get("region_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1); .unwrap_or(-1);
@@ -479,46 +373,12 @@ impl TransportWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
const QUERY_UPDATE_TRANSPORT_SIZE: &str = r#"
UPDATE falukant_data.transport
SET size = $2,
updated_at = NOW()
WHERE id = $1;
"#;
conn.prepare("update_transport_size", QUERY_UPDATE_TRANSPORT_SIZE)?; conn.prepare("update_transport_size", QUERY_UPDATE_TRANSPORT_SIZE)?;
conn.execute("update_transport_size", &[&transport_id, &new_size])?; conn.execute("update_transport_size", &[&transport_id, &new_size])?;
Ok(()) Ok(())
} }
fn insert_transport_waiting_notification(
pool: &ConnectionPool,
user_id: i32,
product_id: i32,
remaining_quantity: i32,
) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare(
"add_transport_waiting_notification",
QUERY_ADD_TRANSPORT_WAITING_NOTIFICATION,
)?;
let notification = format!(
r#"{{"tr":"transport.waiting","productId":{},"value":{}}}"#,
product_id, remaining_quantity
);
conn.execute(
"add_transport_waiting_notification",
&[&user_id, &notification],
)?;
Ok(())
}
} }
impl Worker for TransportWorker { impl Worker for TransportWorker {

View File

@@ -11,6 +11,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::QUERY_UPDATE_MONEY;
pub struct UndergroundWorker { pub struct UndergroundWorker {
base: BaseWorker, base: BaseWorker,
@@ -118,10 +119,7 @@ const Q_SELECT_FALUKANT_USER: &str = r#"
LIMIT 1; LIMIT 1;
"#; "#;
// Query für Geldänderungen (lokale Variante von BaseWorker::change_falukant_user_money) // Use centralized QUERY_UPDATE_MONEY from src/worker/sql.rs
const QUERY_UPDATE_MONEY: &str = r#"
SELECT falukant_data.update_money($1, $2, $3);
"#;
impl UndergroundWorker { impl UndergroundWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -192,7 +190,7 @@ impl UndergroundWorker {
let task_type = r.get("underground_type").cloned().unwrap_or_default(); let task_type = r.get("underground_type").cloned().unwrap_or_default();
let params = r.get("parameters").cloned().unwrap_or_else(|| "{}".into()); let params = r.get("parameters").cloned().unwrap_or_else(|| "{}".into());
Ok(Self::handle_task(pool, &task_type, performer_id, victim_id, &params)?) Self::handle_task(pool, &task_type, performer_id, victim_id, &params)
} }
fn handle_task( fn handle_task(
@@ -350,7 +348,7 @@ impl UndergroundWorker {
let rows = conn.execute("ug_select_char_user", &[&character_id])?; let rows = conn.execute("ug_select_char_user", &[&character_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("user_id")) .and_then(|r| r.get("user_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1)) .unwrap_or(-1))
@@ -515,12 +513,12 @@ impl UndergroundWorker {
let mut out = Vec::new(); let mut out = Vec::new();
for r in rows { for r in rows {
if let Some(t) = r.get("stock_type_id").and_then(|v| v.parse::<i32>().ok()) { if let Some(t) = r.get("stock_type_id").and_then(|v| v.parse::<i32>().ok())
if allowed.contains(&t) { && allowed.contains(&t)
{
out.push(r.clone()); out.push(r.clone());
} }
} }
}
out out
} }

View File

@@ -8,6 +8,41 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_GET_USERS_TO_UPDATE,
QUERY_UPDATE_CHARACTERS_HEALTH,
QUERY_UPDATE_MOOD,
QUERY_UPDATE_GET_ITEMS_TO_UPDATE,
QUERY_UPDATE_GET_CHARACTER_IDS,
QUERY_UPDATE_KNOWLEDGE,
QUERY_DELETE_LOG_ENTRY,
QUERY_GET_OPEN_CREDITS,
QUERY_UPDATE_CREDIT,
QUERY_CLEANUP_CREDITS,
QUERY_ADD_CHARACTER_TO_DEBTORS_PRISM,
QUERY_GET_CURRENT_MONEY,
QUERY_GET_HOUSE_VALUE,
QUERY_GET_SETTLEMENT_VALUE,
QUERY_GET_INVENTORY_VALUE,
QUERY_GET_CREDIT_DEBT,
QUERY_COUNT_CHILDREN,
QUERY_GET_HEIR,
QUERY_RANDOM_HEIR,
QUERY_SET_CHARACTER_USER,
QUERY_UPDATE_USER_MONEY,
QUERY_GET_FALUKANT_USER_ID,
QUERY_AUTOBATISM,
QUERY_GET_PREGNANCY_CANDIDATES,
QUERY_INSERT_CHILD,
QUERY_INSERT_CHILD_RELATION,
QUERY_DELETE_DIRECTOR,
QUERY_DELETE_RELATIONSHIP,
QUERY_DELETE_CHILD_RELATION,
QUERY_DELETE_KNOWLEDGE,
QUERY_DELETE_DEBTORS_PRISM,
QUERY_DELETE_POLITICAL_OFFICE,
QUERY_DELETE_ELECTION_CANDIDATE,
};
/// Vereinfachtes Abbild eines Characters aus `QUERY_GET_USERS_TO_UPDATE`. /// Vereinfachtes Abbild eines Characters aus `QUERY_GET_USERS_TO_UPDATE`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -26,418 +61,7 @@ pub struct UserCharacterWorker {
last_mood_run: Option<Instant>, last_mood_run: Option<Instant>,
} }
// SQL-Queries (1:1 aus der C++-Implementierung übernommen, gruppiert nach Themen) // SQL moved to `src/worker/sql.rs`
const QUERY_GET_USERS_TO_UPDATE: &str = r#"
SELECT id, CURRENT_DATE - birthdate::date AS age, health
FROM falukant_data."character"
WHERE user_id IS NOT NULL;
"#;
const QUERY_UPDATE_CHARACTERS_HEALTH: &str = r#"
UPDATE falukant_data."character"
SET health = $1
WHERE id = $2;
"#;
// Mood-Update mit zufälliger Auswahl pro Charakter:
// Jeder lebende Charakter hat pro Aufruf (ca. 1x pro Minute)
// eine kleine Chance auf Mood-Wechsel. Die Bedingung `random() < 1.0 / 50.0`
// ergibt im Erwartungswert ca. alle 50 Minuten einen Wechsel, verteilt
// individuell und zufällig.
const QUERY_UPDATE_MOOD: &str = r#"
UPDATE falukant_data."character" AS c
SET mood_id = falukant_data.get_random_mood_id()
WHERE c.health > 0
AND random() < (1.0 / 50.0);
"#;
const QUERY_UPDATE_GET_ITEMS_TO_UPDATE: &str = r#"
SELECT id, product_id, producer_id, quantity
FROM falukant_log.production p
WHERE p.production_timestamp::date < current_date;
"#;
const QUERY_UPDATE_GET_CHARACTER_IDS: &str = r#"
SELECT fu.id AS user_id,
c.id AS character_id,
c2.id AS director_id
FROM falukant_data.falukant_user fu
JOIN falukant_data.character c
ON c.user_id = fu.id
LEFT JOIN falukant_data.director d
ON d.employer_user_id = fu.id
LEFT JOIN falukant_data.character c2
ON c2.id = d.director_character_id
WHERE fu.id = $1;
"#;
const QUERY_UPDATE_KNOWLEDGE: &str = r#"
UPDATE falukant_data.knowledge
SET knowledge = LEAST(knowledge + $3, 100)
WHERE character_id = $1
AND product_id = $2;
"#;
const QUERY_DELETE_LOG_ENTRY: &str = r#"
DELETE FROM falukant_log.production
WHERE id = $1;
"#;
// Kredit- und Vermögens-Queries
const QUERY_GET_OPEN_CREDITS: &str = r#"
SELECT
c.id AS credit_id,
c.amount,
c.remaining_amount,
c.interest_rate,
fu.id AS user_id,
fu.money,
c2.id AS character_id,
dp.created_at AS debitor_prism_start,
dp.created_at::date < current_date AS prism_started_previously
FROM falukant_data.credit c
JOIN falukant_data.falukant_user fu
ON fu.id = c.id
JOIN falukant_data.character c2
ON c2.user_id = c.falukant_user_id
LEFT JOIN falukant_data.debtors_prism dp
ON dp.character_id = c2.id
WHERE c.remaining_amount > 0
AND c.updated_at::date < current_date;
"#;
const QUERY_UPDATE_CREDIT: &str = r#"
UPDATE falukant_data.credit c
SET remaining_amount = $1
WHERE falukant_user_id = $2;
"#;
const QUERY_CLEANUP_CREDITS: &str = r#"
DELETE FROM falukant_data.credit
WHERE remaining_amount >= 0.01;
"#;
const QUERY_ADD_CHARACTER_TO_DEBTORS_PRISM: &str = r#"
INSERT INTO falukant_data.debtors_prism (character_id)
VALUES ($1);
"#;
const QUERY_GET_CURRENT_MONEY: &str = r#"
SELECT COALESCE(money, 0) AS sum
FROM falukant_data.falukant_user
WHERE user_id = $1;
"#;
const QUERY_HOUSE_VALUE: &str = r#"
SELECT COALESCE(SUM(h.cost), 0) AS sum
FROM falukant_data.user_house AS uh
JOIN falukant_type.house AS h
ON uh.house_type_id = h.id
WHERE uh.user_id = $1;
"#;
const QUERY_SETTLEMENT_VALUE: &str = r#"
SELECT COALESCE(SUM(b.base_cost), 0) AS sum
FROM falukant_data.branch AS br
JOIN falukant_type.branch AS b
ON br.branch_type_id = b.id
WHERE br.falukant_user_id = $1;
"#;
const QUERY_INVENTORY_VALUE: &str = r#"
SELECT COALESCE(SUM(i.quantity * p.sell_cost), 0) AS sum
FROM falukant_data.inventory AS i
JOIN falukant_type.product AS p
ON i.product_id = p.id
JOIN falukant_data.branch AS br
ON i.stock_id = br.id
WHERE br.falukant_user_id = $1;
"#;
const QUERY_CREDIT_DEBT: &str = r#"
SELECT COALESCE(SUM(remaining_amount), 0) AS sum
FROM falukant_data.credit
WHERE falukant_user_id = $1;
"#;
const QUERY_COUNT_CHILDREN: &str = r#"
SELECT COUNT(*) AS cnt
FROM falukant_data.child_relation
WHERE father_character_id = $1
OR mother_character_id = $1;
"#;
// Vererbungs-Queries
const QUERY_GET_HEIR: &str = r#"
SELECT child_character_id
FROM falukant_data.child_relation
WHERE father_character_id = $1
OR mother_character_id = $1
ORDER BY (is_heir IS TRUE) DESC,
updated_at DESC
LIMIT 1;
"#;
const QUERY_RANDOM_HEIR: &str = r#"
WITH chosen AS (
SELECT
cr.id AS relation_id,
cr.child_character_id
FROM
falukant_data.child_relation AS cr
JOIN
falukant_data.character AS ch
ON ch.id = cr.child_character_id
WHERE
(cr.father_character_id = $1 OR cr.mother_character_id = $1)
AND ch.region_id = (
SELECT region_id
FROM falukant_data.character
WHERE id = $1
)
AND ch.birthdate >= NOW() - INTERVAL '10 days'
AND ch.title_of_nobility = (
SELECT id
FROM falukant_type.title
WHERE label_tr = 'noncivil'
)
ORDER BY RANDOM()
LIMIT 1
)
UPDATE
falukant_data.child_relation AS cr2
SET
is_heir = TRUE,
updated_at = NOW()
FROM
chosen
WHERE
cr2.id = chosen.relation_id
RETURNING
chosen.child_character_id;
"#;
const QUERY_SET_CHARACTER_USER: &str = r#"
UPDATE falukant_data.character
SET user_id = $1,
updated_at = NOW()
WHERE id = $2;
"#;
const QUERY_UPDATE_USER_MONEY: &str = r#"
UPDATE falukant_data.falukant_user
SET money = $1,
updated_at = NOW()
WHERE user_id = $2;
"#;
const QUERY_GET_FALUKANT_USER_ID: &str = r#"
SELECT user_id
FROM falukant_data.character
WHERE id = $1
LIMIT 1;
"#;
// Schwangerschafts-Queries
const QUERY_AUTOBATISM: &str = r#"
UPDATE falukant_data.child_relation
SET name_set = TRUE
WHERE id IN (
SELECT cr.id
FROM falukant_data.child_relation cr
JOIN falukant_data.character c
ON c.id = cr.child_character_id
WHERE cr.name_set = FALSE
AND c.birthdate < current_date - INTERVAL '5 days'
);
"#;
const QUERY_GET_PREGNANCY_CANDIDATES: &str = r#"
SELECT
r.character1_id AS father_cid,
r.character2_id AS mother_cid,
c1.title_of_nobility,
c1.last_name,
c1.region_id,
fu1.id AS father_uid,
fu2.id AS mother_uid,
((CURRENT_DATE - c1.birthdate::date)
+ (CURRENT_DATE - c2.birthdate::date)) / 2 AS avg_age_days,
100.0 /
(1 + EXP(
0.0647 * (
((CURRENT_DATE - c1.birthdate::date)
+ (CURRENT_DATE - c2.birthdate::date)) / 2
) - 0.0591
)) AS prob_pct
FROM falukant_data.relationship r
JOIN falukant_type.relationship r2
ON r2.id = r.relationship_type_id
AND r2.tr = 'married'
JOIN falukant_data.character c1
ON c1.id = r.character1_id
JOIN falukant_data.character c2
ON c2.id = r.character2_id
LEFT JOIN falukant_data.falukant_user fu1
ON fu1.id = c1.user_id
LEFT JOIN falukant_data.falukant_user fu2
ON fu2.id = c2.user_id
WHERE random() * 100 < (
100.0 /
(1 + EXP(
0.11166347 * (
((CURRENT_DATE - c1.birthdate::date)
+ (CURRENT_DATE - c2.birthdate::date)) / 2
) - 2.638267
))
) / 2;
-- Hinweis: Der Divisor `/ 2` halbiert die Wahrscheinlichkeit und ist Teil der
-- ursprünglichen Formel. Wurde vorübergehend entfernt, um die Geburtenrate zu erhöhen,
-- wurde aber wiederhergestellt, um die mathematische Korrektheit der Formel zu gewährleisten.
-- Um die Geburtenrate anzupassen, sollte stattdessen die Formel selbst angepasst werden.
"#;
const QUERY_INSERT_CHILD: &str = r#"
INSERT INTO falukant_data.character (
user_id,
region_id,
first_name,
last_name,
birthdate,
gender,
title_of_nobility,
mood_id,
created_at,
updated_at
) VALUES (
NULL,
$1::int,
(
SELECT id
FROM falukant_predefine.firstname
WHERE gender = $2
ORDER BY RANDOM()
LIMIT 1
),
$3::int,
NOW(),
$2::varchar,
$4::int,
(
SELECT id
FROM falukant_type.mood
ORDER BY RANDOM()
LIMIT 1
),
NOW(),
NOW()
)
RETURNING id AS child_cid;
"#;
const QUERY_INSERT_CHILD_RELATION: &str = r#"
INSERT INTO falukant_data.child_relation (
father_character_id,
mother_character_id,
child_character_id,
name_set,
created_at,
updated_at
)
VALUES (
$1::int,
$2::int,
$3::int,
FALSE,
NOW(), NOW()
);
"#;
// Aufräum-Queries beim Tod eines Characters
const QUERY_DELETE_DIRECTOR: &str = r#"
DELETE FROM falukant_data.director
WHERE director_character_id = $1;
"#;
const QUERY_DELETE_RELATIONSHIP: &str = r#"
DELETE FROM falukant_data.relationship
WHERE character1_id = $1
OR character2_id = $1;
"#;
const QUERY_DELETE_CHILD_RELATION: &str = r#"
DELETE FROM falukant_data.child_relation
WHERE child_character_id = $1
OR father_character_id = $1
OR mother_character_id = $1;
"#;
const QUERY_DELETE_KNOWLEDGE: &str = r#"
DELETE FROM falukant_data.knowledge
WHERE character_id = $1;
"#;
const QUERY_DELETE_DEBTORS_PRISM: &str = r#"
DELETE FROM falukant_data.debtors_prism
WHERE character_id = $1;
"#;
/// Löscht alle Ämter eines Charakters und stellt anschließend sicher, dass
/// für die betroffenen Amtstyp/Region-Kombinationen nicht mehr Ämter
/// besetzt sind als durch `seats_per_region` vorgesehen.
///
/// Die überzähligen Ämter werden deterministisch nach `created_at DESC`
/// gekappt, d.h. neuere Amtsinhaber bleiben bevorzugt im Amt.
const QUERY_DELETE_POLITICAL_OFFICE: &str = r#"
WITH removed AS (
DELETE FROM falukant_data.political_office
WHERE character_id = $1
RETURNING office_type_id, region_id
),
affected AS (
SELECT DISTINCT office_type_id, region_id
FROM removed
),
seats AS (
SELECT
pot.id AS office_type_id,
rt.id AS region_id,
pot.seats_per_region AS seats_total
FROM falukant_type.political_office_type AS pot
JOIN falukant_type.region AS rt
ON pot.region_type = rt.label_tr
JOIN affected AS a
ON a.office_type_id = pot.id
AND a.region_id = rt.id
),
ranked AS (
SELECT
po.id,
po.office_type_id,
po.region_id,
s.seats_total,
ROW_NUMBER() OVER (
PARTITION BY po.office_type_id, po.region_id
ORDER BY po.created_at DESC
) AS rn
FROM falukant_data.political_office AS po
JOIN seats AS s
ON s.office_type_id = po.office_type_id
AND s.region_id = po.region_id
),
to_delete AS (
SELECT id
FROM ranked
WHERE rn > seats_total
)
DELETE FROM falukant_data.political_office
WHERE id IN (SELECT id FROM to_delete);
"#;
const QUERY_DELETE_ELECTION_CANDIDATE: &str = r#"
DELETE FROM falukant_data.election_candidate
WHERE character_id = $1;
"#;
impl UserCharacterWorker { impl UserCharacterWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -470,7 +94,7 @@ impl UserCharacterWorker {
} }
if !state.running_worker.load(Ordering::Relaxed) { if !state.running_worker.load(Ordering::Relaxed) {
return; // worker stopping
} }
} }
@@ -894,7 +518,7 @@ impl UserCharacterWorker {
let inserted = let inserted =
conn.execute("insert_child", &[&region_id, &gender, &last_name, &title_of_nobility])?; conn.execute("insert_child", &[&region_id, &gender, &last_name, &title_of_nobility])?;
let child_cid = inserted let child_cid = inserted
.get(0) .first()
.and_then(|r| r.get("child_cid")) .and_then(|r| r.get("child_cid"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1); .unwrap_or(-1);
@@ -1002,7 +626,7 @@ impl UserCharacterWorker {
let rows = conn.execute("get_falukant_user_id", &[&character_id])?; let rows = conn.execute("get_falukant_user_id", &[&character_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("user_id")) .and_then(|r| r.get("user_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1)) .unwrap_or(-1))
@@ -1019,7 +643,7 @@ impl UserCharacterWorker {
let rows = conn.execute("get_heir", &[&deceased_character_id])?; let rows = conn.execute("get_heir", &[&deceased_character_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("child_character_id")) .and_then(|r| r.get("child_character_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1)) .unwrap_or(-1))
@@ -1036,7 +660,7 @@ impl UserCharacterWorker {
let rows = conn.execute("random_heir", &[&deceased_character_id])?; let rows = conn.execute("random_heir", &[&deceased_character_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("child_character_id")) .and_then(|r| r.get("child_character_id"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1)) .unwrap_or(-1))
@@ -1084,7 +708,7 @@ impl UserCharacterWorker {
let rows = conn.execute("get_current_money", &[&falukant_user_id])?; let rows = conn.execute("get_current_money", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1097,11 +721,11 @@ impl UserCharacterWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("house_value", QUERY_HOUSE_VALUE)?; conn.prepare("house_value", QUERY_GET_HOUSE_VALUE)?;
let rows = conn.execute("house_value", &[&falukant_user_id])?; let rows = conn.execute("house_value", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1114,11 +738,11 @@ impl UserCharacterWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("settlement_value", QUERY_SETTLEMENT_VALUE)?; conn.prepare("settlement_value", QUERY_GET_SETTLEMENT_VALUE)?;
let rows = conn.execute("settlement_value", &[&falukant_user_id])?; let rows = conn.execute("settlement_value", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1131,11 +755,11 @@ impl UserCharacterWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("inventory_value", QUERY_INVENTORY_VALUE)?; conn.prepare("inventory_value", QUERY_GET_INVENTORY_VALUE)?;
let rows = conn.execute("inventory_value", &[&falukant_user_id])?; let rows = conn.execute("inventory_value", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1148,11 +772,11 @@ impl UserCharacterWorker {
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("credit_debt", QUERY_CREDIT_DEBT)?; conn.prepare("credit_debt", QUERY_GET_CREDIT_DEBT)?;
let rows = conn.execute("credit_debt", &[&falukant_user_id])?; let rows = conn.execute("credit_debt", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("sum")) .and_then(|r| r.get("sum"))
.and_then(|v| v.parse::<f64>().ok()) .and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0)) .unwrap_or(0.0))
@@ -1169,7 +793,7 @@ impl UserCharacterWorker {
let rows = conn.execute("count_children", &[&deceased_user_id])?; let rows = conn.execute("count_children", &[&deceased_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("cnt")) .and_then(|r| r.get("cnt"))
.and_then(|v| v.parse::<i32>().ok()) .and_then(|v| v.parse::<i32>().ok())
.unwrap_or(0)) .unwrap_or(0))

View File

@@ -6,291 +6,27 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState}; use super::base::{BaseWorker, Worker, WorkerState};
use crate::worker::sql::{
QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER,
QUERY_DELETE_OLD_PRODUCTIONS,
QUERY_GET_PRODUCERS_LAST_DAY,
QUERY_UPDATE_REGION_SELL_PRICE,
QUERY_DELETE_REGION_SELL_PRICE,
QUERY_GET_SELL_REGIONS,
QUERY_HOURLY_PRICE_RECALCULATION,
QUERY_SET_MARRIAGES_BY_PARTY,
QUERY_GET_STUDYINGS_TO_EXECUTE,
QUERY_GET_OWN_CHARACTER_ID,
QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE,
QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE,
QUERY_SET_LEARNING_DONE,
};
pub struct ValueRecalculationWorker { pub struct ValueRecalculationWorker {
base: BaseWorker, base: BaseWorker,
} }
// Produktwissen / Produktions-Logs
const QUERY_UPDATE_PRODUCT_KNOWLEDGE_USER: &str = r#"
UPDATE falukant_data.knowledge k
SET knowledge = LEAST(100, k.knowledge + 1)
FROM falukant_data.character c
JOIN falukant_log.production p
ON DATE(p.production_timestamp) = CURRENT_DATE - INTERVAL '1 day'
WHERE c.id = k.character_id
AND c.user_id = 18
AND k.product_id = 10;
"#;
const QUERY_DELETE_OLD_PRODUCTIONS: &str = r#"
DELETE FROM falukant_log.production flp
WHERE DATE(flp.production_timestamp) < CURRENT_DATE;
"#;
const QUERY_GET_PRODUCERS_LAST_DAY: &str = r#"
SELECT p.producer_id
FROM falukant_log.production p
WHERE DATE(p.production_timestamp) = CURRENT_DATE - INTERVAL '1 day'
GROUP BY producer_id;
"#;
// Regionale Verkaufspreise
const QUERY_UPDATE_REGION_SELL_PRICE: &str = r#"
UPDATE falukant_data.town_product_worth tpw
SET worth_percent =
GREATEST(
0,
LEAST(
CASE
WHEN s.quantity > avg_sells THEN tpw.worth_percent - 1
WHEN s.quantity < avg_sells THEN tpw.worth_percent + 1
ELSE tpw.worth_percent
END,
100
)
)
FROM (
SELECT region_id,
product_id,
quantity,
(SELECT AVG(quantity)
FROM falukant_log.sell avs
WHERE avs.product_id = s.product_id) AS avg_sells
FROM falukant_log.sell s
WHERE DATE(s.sell_timestamp) = CURRENT_DATE - INTERVAL '1 day'
) s
WHERE tpw.region_id = s.region_id
AND tpw.product_id = s.product_id;
"#;
const QUERY_DELETE_REGION_SELL_PRICE: &str = r#"
DELETE FROM falukant_log.sell s
WHERE DATE(s.sell_timestamp) < CURRENT_DATE;
"#;
const QUERY_GET_SELL_REGIONS: &str = r#"
SELECT s.region_id
FROM falukant_log.sell s
WHERE DATE(s.sell_timestamp) = CURRENT_DATE - INTERVAL '1 day'
GROUP BY region_id;
"#;
// Stündliche Preisneuberechnung basierend auf Verkäufen der letzten Stunde
// Zwei Ebenen der Preisberechnung:
// 1. Weltweit: Vergleich Stadt-Verkäufe vs. weltweiter Durchschnitt
// - ±5% Toleranz: Preis bleibt gleich
// - Mehr Verkäufe (>5% über Durchschnitt): Preis +10%
// - Weniger Verkäufe (<5% unter Durchschnitt): Preis -10%
// 2. Parent-Region: Vergleich Stadt-Verkäufe vs. Durchschnitt der parent-region
// - ±5% Toleranz: Preis bleibt gleich
// - Abweichung >±5%: Preis ±5%
const QUERY_HOURLY_PRICE_RECALCULATION: &str = r#"
WITH city_sales AS (
SELECT
s.region_id,
s.product_id,
SUM(s.quantity) AS total_sold
FROM falukant_log.sell s
WHERE s.sell_timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY s.region_id, s.product_id
),
world_avg_sales AS (
SELECT
product_id,
AVG(total_sold) AS avg_sold
FROM city_sales
GROUP BY product_id
),
parent_region_sales AS (
SELECT
r.parent_region_id,
cs.product_id,
AVG(cs.total_sold) AS avg_sold
FROM city_sales cs
JOIN falukant_data.region r ON r.id = cs.region_id
WHERE r.parent_region_id IS NOT NULL
GROUP BY r.parent_region_id, cs.product_id
),
price_updates_world AS (
SELECT
cs.region_id,
cs.product_id,
cs.total_sold,
COALESCE(wa.avg_sold, 0) AS world_avg,
tpw.worth_percent AS current_price,
CASE
-- Mehr als 5% über dem weltweiten Durchschnitt: 10% teurer
WHEN cs.total_sold > COALESCE(wa.avg_sold, 0) * 1.05
THEN tpw.worth_percent * 1.1
-- Weniger als 5% unter dem weltweiten Durchschnitt: 10% billiger
WHEN cs.total_sold < COALESCE(wa.avg_sold, 0) * 0.95
THEN tpw.worth_percent * 0.9
-- Innerhalb ±5%: Preis bleibt gleich
ELSE tpw.worth_percent
END AS price_after_world
FROM city_sales cs
JOIN world_avg_sales wa ON wa.product_id = cs.product_id
JOIN falukant_data.town_product_worth tpw
ON tpw.region_id = cs.region_id
AND tpw.product_id = cs.product_id
-- Nur updaten wenn es eine Änderung gibt (außerhalb der ±5% Toleranz)
WHERE cs.total_sold > COALESCE(wa.avg_sold, 0) * 1.05
OR cs.total_sold < COALESCE(wa.avg_sold, 0) * 0.95
),
all_cities_with_prices AS (
SELECT
cs.region_id,
cs.product_id,
cs.total_sold,
r.parent_region_id,
tpw.worth_percent AS original_price,
COALESCE(puw.price_after_world, tpw.worth_percent) AS price_after_world
FROM city_sales cs
JOIN falukant_data.region r ON r.id = cs.region_id
JOIN falukant_data.town_product_worth tpw
ON tpw.region_id = cs.region_id
AND tpw.product_id = cs.product_id
LEFT JOIN price_updates_world puw
ON puw.region_id = cs.region_id
AND puw.product_id = cs.product_id
),
price_updates_parent AS (
SELECT
acwp.region_id,
acwp.product_id,
acwp.total_sold,
acwp.parent_region_id,
COALESCE(prs.avg_sold, 0) AS parent_avg,
acwp.price_after_world AS current_price,
CASE
-- Mehr als 5% über dem parent-region Durchschnitt: 5% teurer
WHEN acwp.total_sold > COALESCE(prs.avg_sold, 0) * 1.05
THEN acwp.price_after_world * 1.05
-- Weniger als 5% unter dem parent-region Durchschnitt: 5% billiger
WHEN acwp.total_sold < COALESCE(prs.avg_sold, 0) * 0.95
THEN acwp.price_after_world * 0.95
-- Innerhalb ±5%: Preis bleibt gleich (vom world-update)
ELSE acwp.price_after_world
END AS new_price
FROM all_cities_with_prices acwp
LEFT JOIN parent_region_sales prs
ON prs.parent_region_id = acwp.parent_region_id
AND prs.product_id = acwp.product_id
WHERE acwp.parent_region_id IS NOT NULL
AND (
acwp.total_sold > COALESCE(prs.avg_sold, 0) * 1.05
OR acwp.total_sold < COALESCE(prs.avg_sold, 0) * 0.95
)
),
final_price_updates AS (
SELECT
COALESCE(pup.region_id, puw.region_id) AS region_id,
COALESCE(pup.product_id, puw.product_id) AS product_id,
COALESCE(pup.new_price, puw.price_after_world, acwp.original_price) AS final_price
FROM all_cities_with_prices acwp
LEFT JOIN price_updates_world puw
ON puw.region_id = acwp.region_id
AND puw.product_id = acwp.product_id
LEFT JOIN price_updates_parent pup
ON pup.region_id = acwp.region_id
AND pup.product_id = acwp.product_id
WHERE puw.region_id IS NOT NULL
OR pup.region_id IS NOT NULL
)
UPDATE falukant_data.town_product_worth tpw
SET worth_percent = GREATEST(
0,
LEAST(
100,
fpu.final_price
)
)
FROM final_price_updates fpu
WHERE tpw.region_id = fpu.region_id
AND tpw.product_id = fpu.product_id;
"#;
// Ehen / Beziehungen
const QUERY_SET_MARRIAGES_BY_PARTY: &str = r#"
WITH updated_relations AS (
UPDATE falukant_data.relationship AS rel
SET relationship_type_id = (
SELECT id
FROM falukant_type.relationship AS rt
WHERE rt.tr = 'married'
)
WHERE rel.id IN (
SELECT rel2.id
FROM falukant_data.party AS p
JOIN falukant_type.party AS pt
ON pt.id = p.party_type_id
AND pt.tr = 'wedding'
JOIN falukant_data.falukant_user AS fu
ON fu.id = p.falukant_user_id
JOIN falukant_data.character AS c
ON c.user_id = fu.id
JOIN falukant_data.relationship AS rel2
ON rel2.character1_id = c.id
OR rel2.character2_id = c.id
JOIN falukant_type.relationship AS rt2
ON rt2.id = rel2.relationship_type_id
AND rt2.tr = 'engaged'
WHERE p.created_at <= NOW() - INTERVAL '1 day'
)
RETURNING character1_id, character2_id
)
SELECT
c1.user_id AS character1_user,
c2.user_id AS character2_user
FROM updated_relations AS ur
JOIN falukant_data.character AS c1
ON c1.id = ur.character1_id
JOIN falukant_data.character AS c2
ON c2.id = ur.character2_id;
"#;
// Lernen / Studium
const QUERY_GET_STUDYINGS_TO_EXECUTE: &str = r#"
SELECT
l.id,
l.associated_falukant_user_id,
l.associated_learning_character_id,
l.learn_all_products,
l.learning_recipient_id,
l.product_id,
lr.tr
FROM falukant_data.learning l
JOIN falukant_type.learn_recipient lr
ON lr.id = l.learning_recipient_id
WHERE l.learning_is_executed = FALSE
AND l.created_at + INTERVAL '1 day' < NOW();
"#;
const QUERY_GET_OWN_CHARACTER_ID: &str = r#"
SELECT id
FROM falukant_data.character c
WHERE c.user_id = $1;
"#;
const QUERY_INCREASE_ONE_PRODUCT_KNOWLEDGE: &str = r#"
UPDATE falukant_data.knowledge k
SET knowledge = LEAST(100, k.knowledge + $1)
WHERE k.character_id = $2
AND k.product_id = $3;
"#;
const QUERY_INCREASE_ALL_PRODUCTS_KNOWLEDGE: &str = r#"
UPDATE falukant_data.knowledge k
SET knowledge = LEAST(100, k.knowledge + $1)
WHERE k.character_id = $2;
"#;
const QUERY_SET_LEARNING_DONE: &str = r#"
UPDATE falukant_data.learning
SET learning_is_executed = TRUE
WHERE id = $1;
"#;
impl ValueRecalculationWorker { impl ValueRecalculationWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
@@ -601,7 +337,7 @@ impl ValueRecalculationWorker {
let rows = conn.execute("get_own_character_id", &[&falukant_user_id])?; let rows = conn.execute("get_own_character_id", &[&falukant_user_id])?;
Ok(rows Ok(rows
.get(0) .first()
.and_then(|r| r.get("id")) .and_then(|r| r.get("id"))
.and_then(|v| v.parse::<i32>().ok())) .and_then(|v| v.parse::<i32>().ok()))
} }

View File

@@ -12,9 +12,7 @@ pub struct WeatherWorker {
// Query zum Aktualisieren des Wetters für alle Regionen // Query zum Aktualisieren des Wetters für alle Regionen
// Wählt für jede Region ein zufälliges Wetter aus allen verfügbaren Wettertypen aus // Wählt für jede Region ein zufälliges Wetter aus allen verfügbaren Wettertypen aus
// Wichtig: Jede Region bekommt ein individuelles, zufälliges Wetter. Die vorherige // Wichtig: Jede Region bekommt ein individuelles, zufälliges Wetter
// Variante konnte vom Planner unter Umständen die Zufalls-Subquery nur einmal
// auswerten; mit LATERAL wird die Zufallsauswahl pro Region garantiert ausgeführt.
const QUERY_UPDATE_WEATHER: &str = r#" const QUERY_UPDATE_WEATHER: &str = r#"
WITH all_regions AS ( WITH all_regions AS (
SELECT DISTINCT r.id AS region_id SELECT DISTINCT r.id AS region_id
@@ -23,14 +21,15 @@ const QUERY_UPDATE_WEATHER: &str = r#"
WHERE tr.label_tr = 'city' WHERE tr.label_tr = 'city'
), ),
random_weather AS ( random_weather AS (
SELECT ar.region_id, wt.id AS weather_type_id SELECT
FROM all_regions ar ar.region_id,
CROSS JOIN LATERAL ( (
SELECT wt.id SELECT wt.id
FROM falukant_type.weather wt FROM falukant_type.weather wt
ORDER BY RANDOM() ORDER BY RANDOM()
LIMIT 1 LIMIT 1
) wt ) AS weather_type_id
FROM all_regions ar
) )
INSERT INTO falukant_data.weather (region_id, weather_type_id) INSERT INTO falukant_data.weather (region_id, weather_type_id)
SELECT rw.region_id, rw.weather_type_id SELECT rw.region_id, rw.weather_type_id