use crate::db::{ConnectionPool, DbError}; use crate::message_broker::MessageBroker; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; pub struct WeatherWorker { base: BaseWorker, } // Query zum Aktualisieren des Wetters für alle Regionen // Wählt für jede Region ein zufälliges Wetter aus allen verfügbaren Wettertypen aus // Wichtig: Jede Region bekommt ein individuelles, zufälliges Wetter. Die vorherige // Variante konnte vom Planner unter Umständen die Zufalls-Subquery nur einmal // auswerten; mit LATERAL wird die Zufallsauswahl pro Region garantiert ausgeführt. const QUERY_UPDATE_WEATHER: &str = r#" WITH all_regions AS ( SELECT DISTINCT r.id AS region_id FROM falukant_data.region r JOIN falukant_type.region tr ON r.region_type_id = tr.id WHERE tr.label_tr = 'city' ), random_weather AS ( SELECT ar.region_id, wt.id AS weather_type_id FROM all_regions ar CROSS JOIN LATERAL ( SELECT wt.id FROM falukant_type.weather wt ORDER BY RANDOM() LIMIT 1 ) wt ) INSERT INTO falukant_data.weather (region_id, weather_type_id) SELECT rw.region_id, rw.weather_type_id FROM random_weather rw ON CONFLICT (region_id) DO UPDATE SET weather_type_id = EXCLUDED.weather_type_id; "#; impl WeatherWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("WeatherWorker", pool, broker), } } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { let mut last_weather_update = None; loop { if !state.running_worker.load(Ordering::Relaxed) { break; } let now = Instant::now(); // Wetter alle 30 Minuten aktualisieren if should_run_interval(last_weather_update, now, Duration::from_secs(30 * 60)) { if let Err(err) = Self::update_weather_inner(&pool, &broker) { eprintln!("[WeatherWorker] Fehler beim Aktualisieren des Wetters: {err}"); } last_weather_update = Some(now); } // 60-Sekunden-Wartezeit in kurze Scheiben aufteilen, damit ein Shutdown // (running_worker = false) schnell greift. const SLICE_MS: u64 = 500; let total_ms = 60_000; let mut slept = 0; while slept < total_ms { if !state.running_worker.load(Ordering::Relaxed) { break; } let remaining = total_ms - slept; let slice = SLICE_MS.min(remaining); std::thread::sleep(Duration::from_millis(slice)); slept += slice; } } } fn update_weather_inner( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("update_weather", QUERY_UPDATE_WEATHER)?; let updated_rows = conn.execute("update_weather", &[])?; eprintln!( "[WeatherWorker] Wetter aktualisiert. {} Regionen betroffen.", updated_rows.len() ); // Benachrichtige alle Clients über Wetteränderungen let message = r#"{"event":"weather_updated"}"#; broker.publish(message.to_string()); Ok(()) } } impl Worker for WeatherWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { Self::run_loop(pool.clone(), broker.clone(), state); }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } } // Hilfsfunktion zum Prüfen, ob ein Intervall abgelaufen ist fn should_run_interval(last: Option, now: Instant, interval: Duration) -> bool { match last { None => true, Some(last_time) => now.saturating_duration_since(last_time) >= interval, } }