Enhance SQL queries and logging in TransportWorker and WeatherWorker: Update weather assignment logic to ensure unique weather types per region, improve distance calculations in transport queries, and refine logging for transport processing. Additionally, adjust notification insert queries to include character_id.
This commit is contained in:
@@ -30,11 +30,27 @@ SELECT DISTINCT b.falukant_user_id AS user_id FROM falukant_data.branch b WHERE
|
|||||||
|
|
||||||
pub const QUERY_UPDATE_WEATHER: &str = r#"
|
pub const QUERY_UPDATE_WEATHER: &str = r#"
|
||||||
WITH all_regions AS (
|
WITH all_regions AS (
|
||||||
SELECT DISTINCT r.id AS region_id FROM falukant_data.region r JOIN falukant_type.region tr ON r.region_type_id = tr.id WHERE tr.label_tr = 'city'
|
SELECT DISTINCT r.id AS region_id
|
||||||
|
FROM falukant_data.region r
|
||||||
|
JOIN falukant_type.region tr ON r.region_type_id = tr.id
|
||||||
|
WHERE tr.label_tr = 'city'
|
||||||
|
),
|
||||||
|
random_weather AS (
|
||||||
|
SELECT
|
||||||
|
ar.region_id,
|
||||||
|
(
|
||||||
|
SELECT wt.id
|
||||||
|
FROM falukant_type.weather wt
|
||||||
|
ORDER BY RANDOM()
|
||||||
|
LIMIT 1
|
||||||
|
) AS weather_type_id
|
||||||
|
FROM all_regions ar
|
||||||
)
|
)
|
||||||
INSERT INTO falukant_data.weather (region_id, weather_type_id)
|
INSERT INTO falukant_data.weather (region_id, weather_type_id)
|
||||||
SELECT ar.region_id, (SELECT wt.id FROM falukant_type.weather wt ORDER BY random() + ar.region_id * 0 LIMIT 1) FROM all_regions ar
|
SELECT rw.region_id, rw.weather_type_id
|
||||||
ON CONFLICT (region_id) DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id;
|
FROM random_weather rw
|
||||||
|
ON CONFLICT (region_id)
|
||||||
|
DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id;
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
pub const QUERY_INSERT_NOTIFICATION: &str = r#"
|
pub const QUERY_INSERT_NOTIFICATION: &str = r#"
|
||||||
@@ -44,7 +60,7 @@ VALUES ($1, $2, FALSE, NOW(), NOW());
|
|||||||
|
|
||||||
// Product pricing
|
// Product pricing
|
||||||
pub const QUERY_GET_PRODUCT_COST: &str = r#"
|
pub const QUERY_GET_PRODUCT_COST: &str = r#"
|
||||||
SELECT original_sell_cost, sell_cost FROM falukant_type.product WHERE id = $1;
|
SELECT sell_cost, sell_cost AS original_sell_cost FROM falukant_type.product WHERE id = $1;
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
pub const QUERY_GET_DIRECTORS: &str = r#"
|
pub const QUERY_GET_DIRECTORS: &str = r#"
|
||||||
@@ -203,15 +219,21 @@ SELECT
|
|||||||
t.target_region_id,
|
t.target_region_id,
|
||||||
b_target.id AS target_branch_id,
|
b_target.id AS target_branch_id,
|
||||||
b_source.id AS source_branch_id,
|
b_source.id AS source_branch_id,
|
||||||
rd.distance AS distance,
|
COALESCE(rd.distance, 0.0) AS distance,
|
||||||
v.falukant_user_id AS user_id
|
v.falukant_user_id AS user_id
|
||||||
FROM falukant_data.transport AS t
|
FROM falukant_data.transport AS t
|
||||||
JOIN falukant_data.vehicle AS v ON v.id = t.vehicle_id
|
JOIN falukant_data.vehicle AS v ON v.id = t.vehicle_id
|
||||||
JOIN falukant_type.vehicle AS vt ON vt.id = v.vehicle_type_id
|
JOIN falukant_type.vehicle AS vt ON vt.id = v.vehicle_type_id
|
||||||
JOIN falukant_data.region_distance AS rd ON ((rd.source_region_id = t.source_region_id AND rd.target_region_id = t.target_region_id) OR (rd.source_region_id = t.target_region_id AND rd.target_region_id = t.source_region_id)) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
|
LEFT JOIN falukant_data.region_distance AS rd ON ((rd.source_region_id = t.source_region_id AND rd.target_region_id = t.target_region_id) OR (rd.source_region_id = t.target_region_id AND rd.target_region_id = t.source_region_id)) AND (rd.transport_mode = vt.transport_mode OR rd.transport_mode IS NULL)
|
||||||
LEFT JOIN falukant_data.branch AS b_target ON b_target.region_id = t.target_region_id AND b_target.falukant_user_id = v.falukant_user_id
|
LEFT JOIN falukant_data.branch AS b_target ON b_target.region_id = t.target_region_id AND b_target.falukant_user_id = v.falukant_user_id
|
||||||
LEFT JOIN falukant_data.branch AS b_source ON b_source.region_id = t.source_region_id AND b_source.falukant_user_id = v.falukant_user_id
|
LEFT JOIN falukant_data.branch AS b_source ON b_source.region_id = t.source_region_id AND b_source.falukant_user_id = v.falukant_user_id
|
||||||
WHERE vt.speed > 0 AND t.created_at + (rd.distance / vt.speed::double precision) * INTERVAL '1 minute' <= NOW();
|
WHERE (
|
||||||
|
-- Transport ist angekommen basierend auf Distanz und Geschwindigkeit (wenn speed > 0)
|
||||||
|
(vt.speed > 0 AND rd.distance IS NOT NULL AND t.created_at + (rd.distance / vt.speed::double precision) * INTERVAL '1 minute' <= NOW())
|
||||||
|
OR
|
||||||
|
-- Fallback: Transport ist länger als 24 Stunden unterwegs (falls keine Distanz gefunden wurde oder speed = 0/NULL)
|
||||||
|
(t.created_at + INTERVAL '24 hours' <= NOW())
|
||||||
|
);
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
pub const QUERY_GET_AVAILABLE_STOCKS: &str = r#"
|
pub const QUERY_GET_AVAILABLE_STOCKS: &str = r#"
|
||||||
@@ -1440,10 +1462,11 @@ pub const QUERY_ADD_OVERPRODUCTION_NOTIFICATION: &str = r#"
|
|||||||
INSERT INTO falukant_log.notification (
|
INSERT INTO falukant_log.notification (
|
||||||
user_id,
|
user_id,
|
||||||
tr,
|
tr,
|
||||||
|
character_id,
|
||||||
shown,
|
shown,
|
||||||
created_at,
|
created_at,
|
||||||
updated_at
|
updated_at
|
||||||
) VALUES ($1, $2, FALSE, NOW(), NOW());
|
) VALUES ($1, $2, NULL, FALSE, NOW(), NOW());
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
// Aliases for personal variants (keeps original prepared statement names used in events.worker)
|
// Aliases for personal variants (keeps original prepared statement names used in events.worker)
|
||||||
|
|||||||
@@ -2,8 +2,8 @@ use crate::db::{ConnectionPool, DbError};
|
|||||||
use crate::message_broker::MessageBroker;
|
use crate::message_broker::MessageBroker;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use super::base::{BaseWorker, Worker, WorkerState};
|
use super::base::{BaseWorker, Worker, WorkerState};
|
||||||
use crate::worker::sql::{
|
use crate::worker::sql::{
|
||||||
@@ -67,7 +67,18 @@ impl TransportWorker {
|
|||||||
) -> Result<(), DbError> {
|
) -> Result<(), DbError> {
|
||||||
let transports = Self::load_arrived_transports(pool)?;
|
let transports = Self::load_arrived_transports(pool)?;
|
||||||
|
|
||||||
|
if !transports.is_empty() {
|
||||||
|
eprintln!(
|
||||||
|
"[TransportWorker] {} angekommene Transport(e) gefunden",
|
||||||
|
transports.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
for t in transports {
|
for t in transports {
|
||||||
|
eprintln!(
|
||||||
|
"[TransportWorker] Verarbeite Transport {} (vehicle_id={}, product_id={:?}, size={})",
|
||||||
|
t.id, t.vehicle_id, t.product_id, t.size
|
||||||
|
);
|
||||||
if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) {
|
if let Err(err) = Self::handle_arrived_transport(pool, broker, &t) {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}",
|
"[TransportWorker] Fehler beim Verarbeiten von Transport {}: {err}",
|
||||||
@@ -87,6 +98,19 @@ impl TransportWorker {
|
|||||||
conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?;
|
conn.prepare("get_arrived_transports", QUERY_GET_ARRIVED_TRANSPORTS)?;
|
||||||
let rows = conn.execute("get_arrived_transports", &[])?;
|
let rows = conn.execute("get_arrived_transports", &[])?;
|
||||||
|
|
||||||
|
if rows.is_empty() {
|
||||||
|
// Nur alle 60 Sekunden loggen, um Log-Flut zu vermeiden
|
||||||
|
static LAST_LOG: Mutex<Option<Instant>> = Mutex::new(None);
|
||||||
|
let mut last_log = LAST_LOG.lock().unwrap();
|
||||||
|
let should_log = last_log
|
||||||
|
.map(|t| t.elapsed().as_secs() >= 60)
|
||||||
|
.unwrap_or(true);
|
||||||
|
if should_log {
|
||||||
|
eprintln!("[TransportWorker] Keine angekommenen Transporte gefunden");
|
||||||
|
*last_log = Some(Instant::now());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut result = Vec::with_capacity(rows.len());
|
let mut result = Vec::with_capacity(rows.len());
|
||||||
for row in rows {
|
for row in rows {
|
||||||
let id = parse_i32(&row, "id", -1);
|
let id = parse_i32(&row, "id", -1);
|
||||||
|
|||||||
@@ -5,39 +5,12 @@ use std::sync::Arc;
|
|||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use super::base::{BaseWorker, Worker, WorkerState};
|
use super::base::{BaseWorker, Worker, WorkerState};
|
||||||
|
use crate::worker::sql::QUERY_UPDATE_WEATHER;
|
||||||
|
|
||||||
pub struct WeatherWorker {
|
pub struct WeatherWorker {
|
||||||
base: BaseWorker,
|
base: BaseWorker,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query zum Aktualisieren des Wetters für alle Regionen
|
|
||||||
// Wählt für jede Region ein zufälliges Wetter aus allen verfügbaren Wettertypen aus
|
|
||||||
// Wichtig: Jede Region bekommt ein individuelles, zufälliges Wetter
|
|
||||||
const QUERY_UPDATE_WEATHER: &str = r#"
|
|
||||||
WITH all_regions AS (
|
|
||||||
SELECT DISTINCT r.id AS region_id
|
|
||||||
FROM falukant_data.region r
|
|
||||||
JOIN falukant_type.region tr ON r.region_type_id = tr.id
|
|
||||||
WHERE tr.label_tr = 'city'
|
|
||||||
),
|
|
||||||
random_weather AS (
|
|
||||||
SELECT
|
|
||||||
ar.region_id,
|
|
||||||
(
|
|
||||||
SELECT wt.id
|
|
||||||
FROM falukant_type.weather wt
|
|
||||||
ORDER BY RANDOM()
|
|
||||||
LIMIT 1
|
|
||||||
) AS weather_type_id
|
|
||||||
FROM all_regions ar
|
|
||||||
)
|
|
||||||
INSERT INTO falukant_data.weather (region_id, weather_type_id)
|
|
||||||
SELECT rw.region_id, rw.weather_type_id
|
|
||||||
FROM random_weather rw
|
|
||||||
ON CONFLICT (region_id)
|
|
||||||
DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id;
|
|
||||||
"#;
|
|
||||||
|
|
||||||
impl WeatherWorker {
|
impl WeatherWorker {
|
||||||
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
|
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|||||||
Reference in New Issue
Block a user