Add WeatherWorker to the worker module and update worker creation logic: Introduced WeatherWorker to manage weather-related tasks. Updated the create_workers function to include WeatherWorker alongside existing workers, ensuring comprehensive handling of various operational aspects.

This commit is contained in:
Torsten Schulz (local)
2025-12-02 12:59:40 +01:00
parent c943e57f80
commit e14272e477
3 changed files with 143 additions and 2 deletions

View File

@@ -15,7 +15,7 @@ use websocket_server::WebSocketServer;
use worker::{ use worker::{
CharacterCreationWorker, ConnectionPool, DirectorWorker, HouseWorker, PoliticsWorker, CharacterCreationWorker, ConnectionPool, DirectorWorker, HouseWorker, PoliticsWorker,
ProduceWorker, StockageManager, TransportWorker, UndergroundWorker, UserCharacterWorker, ProduceWorker, StockageManager, TransportWorker, UndergroundWorker, UserCharacterWorker,
ValueRecalculationWorker, Worker, ValueRecalculationWorker, WeatherWorker, Worker,
}; };
static KEEP_RUNNING: AtomicBool = AtomicBool::new(true); static KEEP_RUNNING: AtomicBool = AtomicBool::new(true);
@@ -140,7 +140,8 @@ fn create_workers(pool: ConnectionPool, broker: MessageBroker) -> Vec<Box<dyn Wo
Box::new(HouseWorker::new(pool.clone(), broker.clone())), Box::new(HouseWorker::new(pool.clone(), broker.clone())),
Box::new(PoliticsWorker::new(pool.clone(), broker.clone())), Box::new(PoliticsWorker::new(pool.clone(), broker.clone())),
Box::new(TransportWorker::new(pool.clone(), broker.clone())), Box::new(TransportWorker::new(pool.clone(), broker.clone())),
Box::new(UndergroundWorker::new(pool, broker)), Box::new(UndergroundWorker::new(pool.clone(), broker.clone())),
Box::new(WeatherWorker::new(pool, broker)),
] ]
} }

View File

@@ -9,6 +9,7 @@ mod underground;
mod value_recalculation; mod value_recalculation;
mod user_character; mod user_character;
mod transport; mod transport;
mod weather;
pub use base::Worker; pub use base::Worker;
pub use crate::db::ConnectionPool; pub use crate::db::ConnectionPool;
@@ -22,4 +23,5 @@ pub use underground::UndergroundWorker;
pub use value_recalculation::ValueRecalculationWorker; pub use value_recalculation::ValueRecalculationWorker;
pub use user_character::UserCharacterWorker; pub use user_character::UserCharacterWorker;
pub use transport::TransportWorker; pub use transport::TransportWorker;
pub use weather::WeatherWorker;

138
src/worker/weather.rs Normal file
View File

@@ -0,0 +1,138 @@
use crate::db::{ConnectionPool, DbError};
use crate::message_broker::MessageBroker;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::base::{BaseWorker, Worker, WorkerState};
pub struct WeatherWorker {
base: BaseWorker,
}
// Query zum Aktualisieren des Wetters für alle Regionen
// Wählt für jede Region ein zufälliges Wetter aus allen verfügbaren Wettertypen aus
const QUERY_UPDATE_WEATHER: &str = r#"
WITH all_regions AS (
SELECT DISTINCT r.id AS region_id
FROM falukant_data.region r
JOIN falukant_type.region tr ON r.region_type_id = tr.id
WHERE tr.label_tr = 'city'
),
all_weather_types AS (
SELECT id AS weather_type_id, ROW_NUMBER() OVER (ORDER BY RANDOM()) AS rn
FROM falukant_type.weather
),
random_weather AS (
SELECT
ar.region_id,
awt.weather_type_id
FROM all_regions ar
CROSS JOIN LATERAL (
SELECT weather_type_id
FROM all_weather_types
ORDER BY RANDOM()
LIMIT 1
) awt
)
INSERT INTO falukant_data.weather (region_id, weather_type_id)
SELECT rw.region_id, rw.weather_type_id
FROM random_weather rw
ON CONFLICT (region_id)
DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id;
"#;
impl WeatherWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self {
base: BaseWorker::new("WeatherWorker", pool, broker),
}
}
fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc<WorkerState>) {
let mut last_weather_update = None;
loop {
if !state.running_worker.load(Ordering::Relaxed) {
break;
}
let now = Instant::now();
// Wetter alle 30 Minuten aktualisieren
if should_run_interval(last_weather_update, now, Duration::from_secs(30 * 60)) {
if let Err(err) = Self::update_weather_inner(&pool, &broker) {
eprintln!("[WeatherWorker] Fehler beim Aktualisieren des Wetters: {err}");
}
last_weather_update = Some(now);
}
// 60-Sekunden-Wartezeit in kurze Scheiben aufteilen, damit ein Shutdown
// (running_worker = false) schnell greift.
const SLICE_MS: u64 = 500;
let total_ms = 60_000;
let mut slept = 0;
while slept < total_ms {
if !state.running_worker.load(Ordering::Relaxed) {
break;
}
let remaining = total_ms - slept;
let slice = SLICE_MS.min(remaining);
std::thread::sleep(Duration::from_millis(slice));
slept += slice;
}
}
}
fn update_weather_inner(
pool: &ConnectionPool,
broker: &MessageBroker,
) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("update_weather", QUERY_UPDATE_WEATHER)?;
let updated_rows = conn.execute("update_weather", &[])?;
eprintln!(
"[WeatherWorker] Wetter aktualisiert. {} Regionen betroffen.",
updated_rows.len()
);
// Benachrichtige alle Clients über Wetteränderungen
let message = r#"{"event":"weather_updated"}"#;
broker.publish(message.to_string());
Ok(())
}
}
impl Worker for WeatherWorker {
fn start_worker_thread(&mut self) {
let pool = self.base.pool.clone();
let broker = self.base.broker.clone();
self.base
.start_worker_with_loop(move |state: Arc<WorkerState>| {
Self::run_loop(pool.clone(), broker.clone(), state);
});
}
fn stop_worker_thread(&mut self) {
self.base.stop_worker();
}
fn enable_watchdog(&mut self) {
self.base.start_watchdog();
}
}
// Hilfsfunktion zum Prüfen, ob ein Intervall abgelaufen ist
fn should_run_interval(last: Option<Instant>, now: Instant, interval: Duration) -> bool {
match last {
None => true,
Some(last_time) => now.saturating_duration_since(last_time) >= interval,
}
}