use crate::db::ConnectionPool; use crate::message_broker::MessageBroker; use crate::worker::{get_worker_runtime_snapshot, Worker, WorkerRuntimeSnapshot}; use futures_util::{FutureExt, SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use serde_json::Value as Json; use std::collections::HashMap; use std::fs::File; use std::io::BufReader; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio::runtime::{Builder, Runtime}; use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::time::{interval, Duration as TokioDuration}; use tokio_rustls::rustls::{self, ServerConfig}; use tokio_rustls::TlsAcceptor; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::accept_async; use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs1KeyDer, PrivatePkcs8KeyDer}; /// Einfacher WebSocket-Server auf Basis von Tokio + tokio-tungstenite. /// /// Unterstützt: /// - `setUserId`-Event vom Client (`{"event":"setUserId","data":{"userId":"..."}}`) /// - Versenden von Broker-Nachrichten mit `user_id`-Feld an passende Verbindungen /// - Broadcasting von Nachrichten ohne `user_id` an alle pub struct WebSocketServer { port: u16, pool: ConnectionPool, broker: MessageBroker, use_ssl: bool, cert_path: Option, key_path: Option, workers: Vec<*const dyn Worker>, running: Arc, runtime: Option, } /// Einfache Registry, um Verbindungsstatistiken für `getConnections` zu liefern. #[derive(Default)] struct ConnectionRegistry { total: usize, unauthenticated: usize, by_user: HashMap, } /// Eintrag für das WebSocket-Log, abrufbar über ein eigenes Event. #[derive(Debug, Clone, Serialize)] struct WebSocketLogEntry { timestamp: u64, // Sekunden seit UNIX_EPOCH direction: String, // z.B. "broker->client" peer: String, // "ip:port" conn_user: Option, // per setUserId gesetzte User-ID target_user: Option, // user_id aus der Nachricht (falls vorhanden) event: Option, // event-Feld aus der Nachricht (falls JSON) } #[derive(Debug, Clone, Serialize)] struct WorkerTaskSchedule { task: String, cadence_seconds: u64, cadence_label: String, next_run_latest_ts: u64, next_run_latest_in_seconds: u64, } #[derive(Debug, Clone, Serialize)] struct WorkerSchedule { worker: String, tasks: Vec, } #[derive(Default)] struct WebSocketLog { entries: Vec, } const WS_LOG_MAX_ENTRIES: usize = 50_000; const RIGHT_ADMIN: &str = "admin"; const RIGHT_MAINADMIN: &str = "mainadmin"; const RIGHT_WORKER_SCHEDULE: &str = "worker_schedule_read"; async fn append_ws_log( log: &Arc>, direction: &str, peer_addr: &SocketAddr, conn_user: &Option, msg: &str, ) { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); let (event, target_user) = if let Ok(json) = serde_json::from_str::(msg) { let event = json .get("event") .and_then(|v| v.as_str()) .map(|s| s.to_string()); let target_user = json .get("user_id") .and_then(|v| v.as_str().map(|s| s.to_string()).or_else(|| v.as_i64().map(|n| n.to_string()))); (event, target_user) } else { (None, None) }; let entry = WebSocketLogEntry { timestamp: now, direction: direction.to_string(), peer: peer_addr.to_string(), conn_user: conn_user.clone(), target_user, event, }; let mut guard = log.lock().await; guard.entries.push(entry); if guard.entries.len() > WS_LOG_MAX_ENTRIES { let overflow = guard.entries.len() - WS_LOG_MAX_ENTRIES; guard.entries.drain(0..overflow); } } fn create_tls_acceptor( cert_path: Option<&str>, key_path: Option<&str>, ) -> Result> { let cert_path = cert_path.ok_or("SSL aktiviert, aber kein Zertifikatspfad gesetzt")?; let key_path = key_path.ok_or("SSL aktiviert, aber kein Key-Pfad gesetzt")?; let cert_file = File::open(cert_path)?; let mut cert_reader = BufReader::new(cert_file); let mut cert_chain: Vec> = Vec::new(); for cert_result in certs(&mut cert_reader) { let cert: CertificateDer<'static> = cert_result?; cert_chain.push(cert); } if cert_chain.is_empty() { return Err("Zertifikatsdatei enthält keine Zertifikate".into()); } let key_file = File::open(key_path)?; let mut key_reader = BufReader::new(key_file); // Versuche zuerst PKCS8, dann ggf. RSA-Key let mut keys: Vec> = pkcs8_private_keys(&mut key_reader) .map(|res: Result, _>| res.map(PrivateKeyDer::Pkcs8)) .collect::>()?; if keys.is_empty() { // Leser zurücksetzen und RSA-Keys versuchen let key_file = File::open(key_path)?; let mut key_reader = BufReader::new(key_file); keys = rsa_private_keys(&mut key_reader) .map(|res: Result, _>| res.map(PrivateKeyDer::Pkcs1)) .collect::>()?; } if keys.is_empty() { return Err("Key-Datei enthält keinen privaten Schlüssel (PKCS8 oder RSA)".into()); } let private_key = keys.remove(0); let config = ServerConfig::builder() .with_no_client_auth() .with_single_cert(cert_chain, private_key)?; Ok(TlsAcceptor::from(Arc::new(config))) } impl WebSocketServer { pub fn new( port: u16, pool: ConnectionPool, broker: MessageBroker, use_ssl: bool, cert_path: Option, key_path: Option, ) -> Self { Self { port, pool, broker, use_ssl, cert_path, key_path, workers: Vec::new(), running: Arc::new(AtomicBool::new(false)), runtime: None, } } pub fn set_workers(&mut self, workers: &[Box]) { self.workers.clear(); for w in workers { self.workers.push(&**w as *const dyn Worker); } } pub fn run(&mut self) { if self.running.swap(true, Ordering::SeqCst) { eprintln!("[WebSocketServer] Läuft bereits."); return; } if self.use_ssl { println!( "Starte WebSocket-Server auf Port {} mit SSL (cert: {:?}, key: {:?})", self.port, self.cert_path, self.key_path ); // Hinweis: SSL-Unterstützung ist noch nicht implementiert. } else { println!("Starte WebSocket-Server auf Port {} (ohne SSL)", self.port); } let addr = format!("0.0.0.0:{}", self.port); let running_flag = self.running.clone(); let broker = self.broker.clone(); // Gemeinsame Registry für alle Verbindungen let registry = Arc::new(Mutex::new(ConnectionRegistry::default())); // Gemeinsames WebSocket-Log (für max. 24h, begrenzt über WS_LOG_MAX_ENTRIES) let ws_log = Arc::new(Mutex::new(WebSocketLog::default())); // Broadcast-Kanal für Broker-Nachrichten let (tx, _) = broadcast::channel::(1024); let tx_clone = tx.clone(); // Broker-Subscription: jede gepublishte Nachricht geht in den Broadcast-Kanal broker.subscribe(move |msg: String| { let _ = tx_clone.send(msg); }); // Optionalen TLS-Akzeptor laden, falls SSL aktiviert ist let tls_acceptor = if self.use_ssl { match create_tls_acceptor( self.cert_path.as_deref(), self.key_path.as_deref(), ) { Ok(acc) => Some(acc), Err(err) => { eprintln!( "[WebSocketServer] TLS-Initialisierung fehlgeschlagen, starte ohne SSL: {err}" ); None } } } else { None }; let rt = Builder::new_multi_thread() .enable_all() .build() .expect("Tokio Runtime konnte nicht erstellt werden"); rt.spawn(run_accept_loop( addr, running_flag, tx, self.pool.clone(), registry, ws_log, tls_acceptor, )); self.runtime = Some(rt); } pub fn stop(&mut self) { if !self.running.swap(false, Ordering::SeqCst) { return; } println!("WebSocket-Server wird gestoppt."); if let Some(rt) = self.runtime.take() { rt.shutdown_background(); } } } #[derive(Debug, Deserialize)] struct IncomingMessage { #[serde(default)] event: String, #[serde(default)] data: Json, } async fn run_accept_loop( addr: String, running: Arc, tx: broadcast::Sender, pool: ConnectionPool, registry: Arc>, ws_log: Arc>, tls_acceptor: Option, ) { let listener = match TcpListener::bind(&addr).await { Ok(l) => l, Err(e) => { eprintln!("[WebSocketServer] Fehler beim Binden an {}: {}", addr, e); running.store(false, Ordering::SeqCst); return; } }; println!("[WebSocketServer] Lauscht auf {}", addr); while running.load(Ordering::SeqCst) { let (stream, peer) = match listener.accept().await { Ok(v) => v, Err(e) => { eprintln!("[WebSocketServer] accept() fehlgeschlagen: {}", e); continue; } }; let peer_addr = peer; let rx = tx.subscribe(); let registry_clone = registry.clone(); let ws_log_clone = ws_log.clone(); let tls_acceptor_clone = tls_acceptor.clone(); let pool_clone = pool.clone(); tokio::spawn(async move { if let Some(acc) = tls_acceptor_clone { match acc.accept(stream).await { Ok(tls_stream) => { handle_connection( tls_stream, peer_addr, rx, pool_clone, registry_clone, ws_log_clone, ) .await } Err(err) => { eprintln!( "[WebSocketServer] TLS-Handshake fehlgeschlagen ({peer_addr}): {err}" ); } } } else { handle_connection(stream, peer_addr, rx, pool_clone, registry_clone, ws_log_clone) .await; } }); } } async fn handle_connection( stream: S, peer_addr: SocketAddr, mut broker_rx: broadcast::Receiver, pool: ConnectionPool, registry: Arc>, ws_log: Arc>, ) where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { let ws_stream = match accept_async(stream).await { Ok(ws) => ws, Err(e) => { eprintln!("[WebSocketServer] WebSocket-Handshake fehlgeschlagen ({peer_addr}): {e}"); return; } }; println!("[WebSocketServer] Neue Verbindung von {}", peer_addr); let (mut ws_sender, mut ws_receiver) = ws_stream.split(); // Kanal für Antworten direkt an diesen Client (z.B. getConnections) let (client_tx, mut client_rx) = mpsc::channel::(32); // Neue Verbindung in der Registry zählen (zunächst als unauthentifiziert) { let mut reg = registry.lock().await; reg.total += 1; reg.unauthenticated += 1; } // user_id der Verbindung (nach setUserId) let user_id = Arc::new(tokio::sync::Mutex::new(Option::::None)); let user_id_for_incoming = user_id.clone(); let user_id_for_broker = user_id.clone(); let pool_for_incoming = pool.clone(); let registry_for_incoming = registry.clone(); let client_tx_incoming = client_tx.clone(); let ws_log_for_incoming = ws_log.clone(); let ws_log_for_outgoing = ws_log.clone(); // Eingehende Nachrichten vom Client let incoming = async move { while let Some(msg) = ws_receiver.next().await { match msg { Ok(Message::Text(txt)) => { if let Ok(parsed) = serde_json::from_str::(&txt) { match parsed.event.as_str() { "setUserId" => { if let Some(uid) = parsed.data.get("userId").and_then(|v| v.as_str()) { { // Registry aktualisieren: von unauthentifiziert -> Nutzer let mut reg = registry_for_incoming.lock().await; if reg.unauthenticated > 0 { reg.unauthenticated -= 1; } *reg.by_user.entry(uid.to_string()).or_insert(0) += 1; } let mut guard = user_id_for_incoming.lock().await; *guard = Some(uid.to_string()); println!( "[WebSocketServer] User-ID gesetzt für {}: {}", peer_addr, uid ); } } "getConnections" => { // Einfache Übersicht über aktuelle Verbindungen zurückgeben. let snapshot = { let reg = registry_for_incoming.lock().await; serde_json::json!({ "event": "getConnectionsResponse", "total": reg.total, "unauthenticated": reg.unauthenticated, "users": reg.by_user, }) .to_string() }; let _ = client_tx_incoming.send(snapshot).await; } "getWebsocketLog" => { // Liefert die letzten 24h (oder weniger) aus dem In-Memory-Log. let entries = { let guard = ws_log_for_incoming.lock().await; let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); let cutoff = now.saturating_sub(24 * 3600); let mut filtered: Vec = guard .entries .iter() .filter(|e| e.timestamp >= cutoff) .cloned() .collect(); // Zur Sicherheit begrenzen wir die Antwortgröße if filtered.len() > 1000 { let len = filtered.len(); filtered = filtered[len - 1000..].to_vec(); } filtered }; let payload = serde_json::json!({ "event": "getWebsocketLogResponse", "entries": entries, }) .to_string(); let _ = client_tx_incoming.send(payload).await; } "getWorkerSchedules" => { let uid_opt = { let guard = user_id_for_incoming.lock().await; guard.clone() }; let allowed = uid_opt .as_deref() .map(|uid| user_can_read_worker_schedules(&pool_for_incoming, uid)) .unwrap_or(false); if !allowed { let payload = serde_json::json!({ "event": "getWorkerSchedulesResponse", "ok": false, "error": "forbidden" }) .to_string(); let _ = client_tx_incoming.send(payload).await; continue; } let now_secs = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); let schedules = build_worker_schedule_overview(now_secs); let payload = serde_json::json!({ "event": "getWorkerSchedulesResponse", "ok": true, "generated_at": now_secs, "schedules": schedules }) .to_string(); let _ = client_tx_incoming.send(payload).await; } "getWorkerSchedulesDetailed" => { let uid_opt = { let guard = user_id_for_incoming.lock().await; guard.clone() }; let allowed = uid_opt .as_deref() .map(|uid| user_can_read_worker_schedules(&pool_for_incoming, uid)) .unwrap_or(false); if !allowed { let payload = serde_json::json!({ "event": "getWorkerSchedulesDetailedResponse", "ok": false, "error": "forbidden" }) .to_string(); let _ = client_tx_incoming.send(payload).await; continue; } let now_secs = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); let schedules = build_worker_schedule_overview(now_secs); let runtime = get_worker_runtime_snapshot(); let detailed = build_worker_schedule_detailed(schedules, runtime); let payload = serde_json::json!({ "event": "getWorkerSchedulesDetailedResponse", "ok": true, "generated_at": now_secs, "schedules": detailed }) .to_string(); let _ = client_tx_incoming.send(payload).await; } _ => { // Unbekannte Events ignorieren } } } } Ok(Message::Ping(_)) => { // Ping wird aktuell nur geloggt/ignoriert; optional könnte man hier ein eigenes // Ping/Pong-Handling ergänzen. } Ok(Message::Close(_)) => break, Err(e) => { eprintln!("[WebSocketServer] Fehler bei Nachricht von {peer_addr}: {e}"); break; } _ => {} } } }; // Broker-Nachrichten an den Client + periodische Ping-Frames als Keepalive let outgoing = async move { // Regelmäßiges Ping, um inaktiven Verbindungen ein Lebenszeichen zu senden // und Timeouts auf dem Weg (Proxy/Loadbalancer) zu vermeiden. let mut ping_interval = interval(TokioDuration::from_secs(240)); loop { tokio::select! { // Nachrichten aus dem MessageBroker broker_msg = broker_rx.recv() => { let msg = match broker_msg { Ok(m) => m, Err(_) => break, }; // Filter nach user_id, falls gesetzt und numerisch interpretierbar. // Historisch wurde hier der Falukant-User (numerisch) verwendet. // Wenn die gesetzte User-ID kein Integer ist (z.B. Benutzername), // wird *nicht* gefiltert und alle Nachrichten durchgelassen – // das Frontend muss dann selbst selektieren. let target_user = { let guard = user_id_for_broker.lock().await; guard.clone() }; if let Some(uid) = target_user.clone() { // Versuche, die user_id als numerisch zu interpretieren match uid.parse::() { Ok(numeric_uid) => { // Numerische user_id: Filtere explizit nach dieser ID if let Ok(json) = serde_json::from_str::(&msg) { let matches_user = json .get("user_id") .and_then(|v| { if let Some(s) = v.as_str() { s.parse::().ok() } else { v.as_i64() } }) .map(|v| v == numeric_uid) .unwrap_or(false); if !matches_user { continue; } } } Err(_) => { // Nicht-numerische user_id: Explizit alle Nachrichten durchlassen // (keine Filterung, wie im Kommentar dokumentiert) // Dies ermöglicht es dem Frontend, selbst zu filtern } } } // Logging für den 24h-Überblick, was an welchen User/Peer geht let conn_user = { let guard = user_id_for_broker.lock().await; guard.clone() }; append_ws_log(&ws_log_for_outgoing, "broker->client", &peer_addr, &conn_user, &msg).await; if let Err(e) = ws_sender.send(Message::Text(msg)).await { eprintln!( "[WebSocketServer] Fehler beim Senden an {}: {}", peer_addr, e ); break; } } // Antworten aus der Verbindung selbst (z.B. getConnections) client_msg = client_rx.recv() => { match client_msg { Some(msg) => { let conn_user = { let guard = user_id_for_broker.lock().await; guard.clone() }; append_ws_log(&ws_log_for_outgoing, "local->client", &peer_addr, &conn_user, &msg).await; if let Err(e) = ws_sender.send(Message::Text(msg)).await { eprintln!( "[WebSocketServer] Fehler beim Senden an {}: {}", peer_addr, e ); break; } } None => { // Kanal wurde geschlossen break; } } } // Periodisches Ping an den Client _ = ping_interval.tick() => { if let Err(e) = ws_sender.send(Message::Ping(Vec::new())).await { eprintln!( "[WebSocketServer] Fehler beim Senden von Ping an {}: {}", peer_addr, e ); break; } } } } }; futures_util::future::select(incoming.boxed(), outgoing.boxed()).await; // Verbindung aus der Registry entfernen let final_uid = { let guard = user_id.lock().await; guard.clone() }; { let mut reg = registry.lock().await; if reg.total > 0 { reg.total -= 1; } if let Some(uid) = final_uid { if let Some(count) = reg.by_user.get_mut(&uid) { if *count > 0 { *count -= 1; } if *count == 0 { reg.by_user.remove(&uid); } } } else if reg.unauthenticated > 0 { reg.unauthenticated -= 1; } } println!("[WebSocketServer] Verbindung geschlossen: {}", peer_addr); } fn user_can_read_worker_schedules(pool: &ConnectionPool, user_id_raw: &str) -> bool { let mut conn = match pool.get() { Ok(c) => c, Err(_) => return false, }; let uid = if let Ok(v) = user_id_raw.parse::() { if v > 0 { v } else { return false; } } else { // Fallback: setUserId kann auch username/hashed_id aus dem Community-User sein. let resolve_sql = r#" SELECT id FROM community."user" WHERE LOWER(username) = LOWER($1::text) OR LOWER(COALESCE(hashed_id, '')) = LOWER($1::text) LIMIT 1; "#; if conn.prepare("ws_resolve_community_user_id", resolve_sql).is_err() { return false; } let rows = match conn.execute("ws_resolve_community_user_id", &[&user_id_raw]) { Ok(r) => r, Err(_) => return false, }; let Some(row) = rows.first() else { return false; }; row.get("id") .and_then(|v| v.parse::().ok()) .filter(|v| *v > 0) .unwrap_or(-1) }; if uid <= 0 { return false; } let sql = r#" SELECT EXISTS ( SELECT 1 FROM community.user_right ur JOIN "type".user_right tr ON tr.id = ur.right_type_id WHERE ur.user_id = $1::int AND ( LOWER(TRIM(COALESCE(tr.title, ''))) = $2::text OR LOWER(TRIM(COALESCE(tr.title, ''))) = $3::text OR LOWER(TRIM(COALESCE(tr.title, ''))) = $4::text OR LOWER(COALESCE(tr.title, '')) LIKE '%admin%' ) ) AS allowed_direct, EXISTS ( SELECT 1 FROM falukant_data.falukant_user fu JOIN community.user_right ur ON ur.user_id = fu.user_id JOIN "type".user_right tr ON tr.id = ur.right_type_id WHERE fu.id = $1::int AND ( LOWER(TRIM(COALESCE(tr.title, ''))) = $2::text OR LOWER(TRIM(COALESCE(tr.title, ''))) = $3::text OR LOWER(TRIM(COALESCE(tr.title, ''))) = $4::text OR LOWER(COALESCE(tr.title, '')) LIKE '%admin%' ) ) AS allowed_falukant; "#; if conn.prepare("ws_can_read_worker_schedules", sql).is_err() { return false; } let admin = RIGHT_ADMIN.to_string(); let allowed_right = RIGHT_WORKER_SCHEDULE.to_string(); let mainadmin = RIGHT_MAINADMIN.to_string(); let rows = match conn.execute( "ws_can_read_worker_schedules", &[&uid, &admin, &allowed_right, &mainadmin], ) { Ok(r) => r, Err(_) => return false, }; let Some(row) = rows.first() else { return false; }; let direct = row .get("allowed_direct") .map(|v| v == "true" || v == "t" || v == "1") .unwrap_or(false); let falukant = row .get("allowed_falukant") .map(|v| v == "true" || v == "t" || v == "1") .unwrap_or(false); direct || falukant } fn build_worker_schedule_overview(now_secs: u64) -> Vec { fn task(name: &str, cadence_seconds: u64, now_secs: u64) -> WorkerTaskSchedule { WorkerTaskSchedule { task: name.to_string(), cadence_seconds, cadence_label: format!("{}s interval", cadence_seconds), next_run_latest_ts: now_secs.saturating_add(cadence_seconds), next_run_latest_in_seconds: cadence_seconds, } } vec![ WorkerSchedule { worker: "DirectorWorker".to_string(), tasks: vec![ task("salary_payout", 24 * 60 * 60, now_secs), task("satisfaction_or_auto_income_adjust", 24 * 60 * 60, now_secs), task("resignation_check", 24 * 60 * 60, now_secs), ], }, WorkerSchedule { worker: "ValueRecalculationWorker".to_string(), tasks: vec![ task("calculate_product_knowledge", 24 * 60 * 60, now_secs), task("calculate_regional_sell_price", 24 * 60 * 60, now_secs), task("calculate_hourly_price_recalculation", 60 * 60, now_secs), ], }, WorkerSchedule { worker: "FalukantFamilyWorker".to_string(), tasks: vec![ task("family_daily", 24 * 60 * 60, now_secs), task("family_monthly", 24 * 60 * 60, now_secs), task("lover_installment_and_servants_month_slice", 2 * 60 * 60, now_secs), ], }, WorkerSchedule { worker: "PoliticsWorker".to_string(), tasks: vec![ task("daily_politics", 24 * 60 * 60, now_secs), task("auto_approve_church_applications", 60 * 60, now_secs), ], }, WorkerSchedule { worker: "UserCharacterWorker".to_string(), tasks: vec![ task("hourly_tasks", 60 * 60, now_secs), task("hourly_pregnancies", 60 * 60, now_secs), task("daily_marriage_fertility", 24 * 60 * 60, now_secs), ], }, WorkerSchedule { worker: "HouseWorker".to_string(), tasks: vec![task("daily_house_updates", 24 * 60 * 60, now_secs)], }, ] } fn build_worker_schedule_detailed( schedules: Vec, runtime: Vec, ) -> Vec { let mut by_worker: HashMap = HashMap::new(); for r in runtime { by_worker.insert(r.worker.clone(), r); } schedules .into_iter() .map(|ws| { let rt = by_worker.get(&ws.worker); serde_json::json!({ "worker": ws.worker, "running_worker": rt.map(|r| r.running_worker).unwrap_or(false), "running_watchdog": rt.map(|r| r.running_watchdog).unwrap_or(false), "current_step": rt.map(|r| r.current_step.clone()).unwrap_or_else(|| "unknown".to_string()), "last_step_change_ts": rt.map(|r| r.last_step_change_ts).unwrap_or(0), "tasks": ws.tasks }) }) .collect() }