From 4fca4b4d7528c03fb04486d2a1c3eea704e07db9 Mon Sep 17 00:00:00 2001 From: "Torsten Schulz (local)" Date: Sat, 20 Dec 2025 11:15:45 +0100 Subject: [PATCH] Enhance SQL queries and logging in TransportWorker and WeatherWorker: Update weather assignment logic to ensure unique weather types per region, improve distance calculations in transport queries, and refine logging for transport processing. Additionally, adjust notification insert queries to include character_id. --- src/worker/sql.rs | 39 +++++++++++++++++++++++++++++++-------- src/worker/transport.rs | 28 ++++++++++++++++++++++++++-- src/worker/weather.rs | 29 +---------------------------- 3 files changed, 58 insertions(+), 38 deletions(-) diff --git a/src/worker/sql.rs b/src/worker/sql.rs index 8b7f79b..90546a0 100644 --- a/src/worker/sql.rs +++ b/src/worker/sql.rs @@ -30,11 +30,27 @@ SELECT DISTINCT b.falukant_user_id AS user_id FROM falukant_data.branch b WHERE pub const QUERY_UPDATE_WEATHER: &str = r#" WITH all_regions AS ( - SELECT DISTINCT r.id AS region_id FROM falukant_data.region r JOIN falukant_type.region tr ON r.region_type_id = tr.id WHERE tr.label_tr = 'city' + SELECT DISTINCT r.id AS region_id + FROM falukant_data.region r + JOIN falukant_type.region tr ON r.region_type_id = tr.id + WHERE tr.label_tr = 'city' +), +random_weather AS ( + SELECT + ar.region_id, + ( + SELECT wt.id + FROM falukant_type.weather wt + ORDER BY RANDOM() + LIMIT 1 + ) AS weather_type_id + FROM all_regions ar ) INSERT INTO falukant_data.weather (region_id, weather_type_id) -SELECT ar.region_id, (SELECT wt.id FROM falukant_type.weather wt ORDER BY random() + ar.region_id * 0 LIMIT 1) FROM all_regions ar -ON CONFLICT (region_id) DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id; +SELECT rw.region_id, rw.weather_type_id +FROM random_weather rw +ON CONFLICT (region_id) +DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id; "#; pub const QUERY_INSERT_NOTIFICATION: &str = r#" @@ -44,7 +60,7 @@ VALUES ($1, $2, FALSE, NOW(), NOW()); // Product pricing pub const QUERY_GET_PRODUCT_COST: &str = r#" -SELECT original_sell_cost, sell_cost FROM falukant_type.product WHERE id = $1; +SELECT sell_cost, sell_cost AS original_sell_cost FROM falukant_type.product WHERE id = $1; "#; pub const QUERY_GET_DIRECTORS: &str = r#" @@ -203,15 +219,21 @@ SELECT t.target_region_id, b_target.id AS target_branch_id, b_source.id AS source_branch_id, - rd.distance AS distance, + COALESCE(rd.distance, 0.0) AS distance, v.falukant_user_id AS user_id FROM falukant_data.transport AS t JOIN falukant_data.vehicle AS v ON v.id = t.vehicle_id JOIN falukant_type.vehicle AS vt ON vt.id = v.vehicle_type_id -JOIN falukant_data.region_distance AS rd ON ((rd.source_region_id = t.source_region_id AND rd.target_region_id = t.target_region_id) OR (rd.source_region_id = t.target_region_id AND rd.target_region_id = t.source_region_id)) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL) +LEFT JOIN falukant_data.region_distance AS rd ON ((rd.source_region_id = t.source_region_id AND rd.target_region_id = t.target_region_id) OR (rd.source_region_id = t.target_region_id AND rd.target_region_id = t.source_region_id)) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL) LEFT JOIN falukant_data.branch AS b_target ON b_target.region_id = t.target_region_id AND b_target.falukant_user_id = v.falukant_user_id LEFT JOIN falukant_data.branch AS b_source ON b_source.region_id = t.source_region_id AND b_source.falukant_user_id = v.falukant_user_id -WHERE vt.speed > 0 AND t.created_at + (rd.distance / vt.speed::double precision) * INTERVAL '1 minute' <= NOW(); +WHERE ( + -- Transport ist angekommen basierend auf Distanz und Geschwindigkeit (wenn speed > 0) + (vt.speed > 0 AND rd.distance IS NOT NULL AND t.created_at + (rd.distance / vt.speed::double precision) * INTERVAL '1 minute' <= NOW()) + OR + -- Fallback: Transport ist länger als 24 Stunden unterwegs (falls keine Distanz gefunden wurde oder speed = 0/NULL) + (t.created_at + INTERVAL '24 hours' <= NOW()) + ); "#; pub const QUERY_GET_AVAILABLE_STOCKS: &str = r#" @@ -1440,10 +1462,11 @@ pub const QUERY_ADD_OVERPRODUCTION_NOTIFICATION: &str = r#" INSERT INTO falukant_log.notification ( user_id, tr, + character_id, shown, created_at, updated_at - ) VALUES ($1, $2, FALSE, NOW(), NOW()); + ) VALUES ($1, $2, NULL, FALSE, NOW(), NOW()); "#; // Aliases for personal variants (keeps original prepared statement names used in events.worker) diff --git a/src/worker/transport.rs b/src/worker/transport.rs index 291dbab..a38d507 100644 --- a/src/worker/transport.rs +++ b/src/worker/transport.rs @@ -2,8 +2,8 @@ use crate::db::{ConnectionPool, DbError}; use crate::message_broker::MessageBroker; use std::cmp::min; use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; use crate::worker::sql::{ @@ -67,7 +67,18 @@ impl TransportWorker { ) -> Result<(), DbError> { let transports = Self::load_arrived_transports(pool)?; + if !transports.is_empty() { + eprintln!( + "[TransportWorker] {} angekommene Transport(e) gefunden", + transports.len() + ); + } + for t in transports { + eprintln!( + "[TransportWorker] Verarbeite Transport {} (vehicle_id={}, product_id={:?}, size={})", + t.id, t.vehicle_id, t.product_id, t.size + ); if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) { eprintln!( "[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}", @@ -87,6 +98,19 @@ impl TransportWorker { conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?; let rows = conn.execute("get_arrived_transports", &[])?; + if rows.is_empty() { + // Nur alle 60 Sekunden loggen, um Log-Flut zu vermeiden + static LAST_LOG: Mutex> = Mutex::new(None); + let mut last_log = LAST_LOG.lock().unwrap(); + let should_log = last_log + .map(|t| t.elapsed().as_secs() >= 60) + .unwrap_or(true); + if should_log { + eprintln!("[TransportWorker] Keine angekommenen Transporte gefunden"); + *last_log = Some(Instant::now()); + } + } + let mut result = Vec::with_capacity(rows.len()); for row in rows { let id = parse_i32(&row, "id", -1); diff --git a/src/worker/weather.rs b/src/worker/weather.rs index e9a7dc8..ea24a58 100644 --- a/src/worker/weather.rs +++ b/src/worker/weather.rs @@ -5,39 +5,12 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; +use crate::worker::sql::QUERY_UPDATE_WEATHER; pub struct WeatherWorker { base: BaseWorker, } -// Query zum Aktualisieren des Wetters für alle Regionen -// Wählt für jede Region ein zufälliges Wetter aus allen verfügbaren Wettertypen aus -// Wichtig: Jede Region bekommt ein individuelles, zufälliges Wetter -const QUERY_UPDATE_WEATHER: &str = r#" - WITH all_regions AS ( - SELECT DISTINCT r.id AS region_id - FROM falukant_data.region r - JOIN falukant_type.region tr ON r.region_type_id = tr.id - WHERE tr.label_tr = 'city' - ), - random_weather AS ( - SELECT - ar.region_id, - ( - SELECT wt.id - FROM falukant_type.weather wt - ORDER BY RANDOM() - LIMIT 1 - ) AS weather_type_id - FROM all_regions ar - ) - INSERT INTO falukant_data.weather (region_id, weather_type_id) - SELECT rw.region_id, rw.weather_type_id - FROM random_weather rw - ON CONFLICT (region_id) - DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id; -"#; - impl WeatherWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self {