diff --git a/src/main.rs b/src/main.rs index 0c9c3d0..853dce5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use websocket_server::WebSocketServer; use worker::{ CharacterCreationWorker, ConnectionPool, DirectorWorker, HouseWorker, PoliticsWorker, ProduceWorker, StockageManager, TransportWorker, UndergroundWorker, UserCharacterWorker, - ValueRecalculationWorker, Worker, + ValueRecalculationWorker, WeatherWorker, Worker, }; static KEEP_RUNNING: AtomicBool = AtomicBool::new(true); @@ -140,7 +140,8 @@ fn create_workers(pool: ConnectionPool, broker: MessageBroker) -> Vec Self { + Self { + base: BaseWorker::new("WeatherWorker", pool, broker), + } + } + + fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { + let mut last_weather_update = None; + + loop { + if !state.running_worker.load(Ordering::Relaxed) { + break; + } + + let now = Instant::now(); + + // Wetter alle 30 Minuten aktualisieren + if should_run_interval(last_weather_update, now, Duration::from_secs(30 * 60)) { + if let Err(err) = Self::update_weather_inner(&pool, &broker) { + eprintln!("[WeatherWorker] Fehler beim Aktualisieren des Wetters: {err}"); + } + last_weather_update = Some(now); + } + + // 60-Sekunden-Wartezeit in kurze Scheiben aufteilen, damit ein Shutdown + // (running_worker = false) schnell greift. + const SLICE_MS: u64 = 500; + let total_ms = 60_000; + let mut slept = 0; + while slept < total_ms { + if !state.running_worker.load(Ordering::Relaxed) { + break; + } + let remaining = total_ms - slept; + let slice = SLICE_MS.min(remaining); + std::thread::sleep(Duration::from_millis(slice)); + slept += slice; + } + } + } + + fn update_weather_inner( + pool: &ConnectionPool, + broker: &MessageBroker, + ) -> Result<(), DbError> { + let mut conn = pool + .get() + .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; + + conn.prepare("update_weather", QUERY_UPDATE_WEATHER)?; + let updated_rows = conn.execute("update_weather", &[])?; + + eprintln!( + "[WeatherWorker] Wetter aktualisiert. {} Regionen betroffen.", + updated_rows.len() + ); + + // Benachrichtige alle Clients über Wetteränderungen + let message = r#"{"event":"weather_updated"}"#; + broker.publish(message.to_string()); + + Ok(()) + } +} + +impl Worker for WeatherWorker { + fn start_worker_thread(&mut self) { + let pool = self.base.pool.clone(); + let broker = self.base.broker.clone(); + + self.base + .start_worker_with_loop(move |state: Arc| { + Self::run_loop(pool.clone(), broker.clone(), state); + }); + } + + fn stop_worker_thread(&mut self) { + self.base.stop_worker(); + } + + fn enable_watchdog(&mut self) { + self.base.start_watchdog(); + } +} + +// Hilfsfunktion zum Prüfen, ob ein Intervall abgelaufen ist +fn should_run_interval(last: Option, now: Instant, interval: Duration) -> bool { + match last { + None => true, + Some(last_time) => now.saturating_duration_since(last_time) >= interval, + } +} +