From 071c05629eeaf65e8a8a767daf0d341bc596a845 Mon Sep 17 00:00:00 2001 From: "Torsten Schulz (local)" Date: Sat, 22 Nov 2025 09:44:44 +0100 Subject: [PATCH] Add WebSocket logging functionality: Implemented in-memory logging for WebSocket events, allowing retrieval of logs for the last 24 hours. Enhanced connection handling to log message direction and user information. --- src/websocket_server.rs | 129 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 126 insertions(+), 3 deletions(-) diff --git a/src/websocket_server.rs b/src/websocket_server.rs index 8988e8e..8ec185a 100644 --- a/src/websocket_server.rs +++ b/src/websocket_server.rs @@ -2,7 +2,7 @@ use crate::db::ConnectionPool; use crate::message_broker::MessageBroker; use crate::worker::Worker; use futures_util::{FutureExt, SinkExt, StreamExt}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::Value as Json; use std::collections::HashMap; use std::fs::File; @@ -10,6 +10,7 @@ use std::io::BufReader; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio::runtime::{Builder, Runtime}; @@ -47,6 +48,74 @@ struct ConnectionRegistry { by_user: HashMap, } +/// Eintrag für das WebSocket-Log, abrufbar über ein eigenes Event. +#[derive(Debug, Clone, Serialize)] +struct WebSocketLogEntry { + timestamp: u64, // Sekunden seit UNIX_EPOCH + direction: String, // z.B. "broker->client" + peer: String, // "ip:port" + conn_user: Option, // per setUserId gesetzte User-ID + target_user: Option, // user_id aus der Nachricht (falls vorhanden) + event: Option, // event-Feld aus der Nachricht (falls JSON) +} + +#[derive(Default)] +struct WebSocketLog { + entries: Vec, +} + +const WS_LOG_MAX_ENTRIES: usize = 50_000; + +async fn append_ws_log( + log: &Arc>, + direction: &str, + peer_addr: &SocketAddr, + conn_user: &Option, + msg: &str, +) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let (event, target_user) = if let Ok(json) = serde_json::from_str::(msg) { + let event = json + .get("event") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let target_user = json + .get("user_id") + .and_then(|v| { + if let Some(s) = v.as_str() { + Some(s.to_string()) + } else if let Some(n) = v.as_i64() { + Some(n.to_string()) + } else { + None + } + }); + (event, target_user) + } else { + (None, None) + }; + + let entry = WebSocketLogEntry { + timestamp: now, + direction: direction.to_string(), + peer: peer_addr.to_string(), + conn_user: conn_user.clone(), + target_user, + event, + }; + + let mut guard = log.lock().await; + guard.entries.push(entry); + if guard.entries.len() > WS_LOG_MAX_ENTRIES { + let overflow = guard.entries.len() - WS_LOG_MAX_ENTRIES; + guard.entries.drain(0..overflow); + } +} + fn create_tls_acceptor( cert_path: Option<&str>, key_path: Option<&str>, @@ -148,6 +217,8 @@ impl WebSocketServer { // Gemeinsame Registry für alle Verbindungen let registry = Arc::new(Mutex::new(ConnectionRegistry::default())); + // Gemeinsames WebSocket-Log (für max. 24h, begrenzt über WS_LOG_MAX_ENTRIES) + let ws_log = Arc::new(Mutex::new(WebSocketLog::default())); // Broadcast-Kanal für Broker-Nachrichten let (tx, _) = broadcast::channel::(1024); @@ -187,6 +258,7 @@ impl WebSocketServer { tx, self.pool.clone(), registry, + ws_log, tls_acceptor, )); @@ -218,6 +290,7 @@ async fn run_accept_loop( tx: broadcast::Sender, _pool: ConnectionPool, registry: Arc>, + ws_log: Arc>, tls_acceptor: Option, ) { let listener = match TcpListener::bind(&addr).await { @@ -243,13 +316,15 @@ async fn run_accept_loop( let peer_addr = peer; let rx = tx.subscribe(); let registry_clone = registry.clone(); + let ws_log_clone = ws_log.clone(); let tls_acceptor_clone = tls_acceptor.clone(); tokio::spawn(async move { if let Some(acc) = tls_acceptor_clone { match acc.accept(stream).await { Ok(tls_stream) => { - handle_connection(tls_stream, peer_addr, rx, registry_clone).await + handle_connection(tls_stream, peer_addr, rx, registry_clone, ws_log_clone) + .await } Err(err) => { eprintln!( @@ -258,7 +333,7 @@ async fn run_accept_loop( } } } else { - handle_connection(stream, peer_addr, rx, registry_clone).await; + handle_connection(stream, peer_addr, rx, registry_clone, ws_log_clone).await; } }); } @@ -269,6 +344,7 @@ async fn handle_connection( peer_addr: SocketAddr, mut broker_rx: broadcast::Receiver, registry: Arc>, + ws_log: Arc>, ) where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -300,6 +376,8 @@ async fn handle_connection( let user_id_for_broker = user_id.clone(); let registry_for_incoming = registry.clone(); let client_tx_incoming = client_tx.clone(); + let ws_log_for_incoming = ws_log.clone(); + let ws_log_for_outgoing = ws_log.clone(); // Eingehende Nachrichten vom Client let incoming = async move { @@ -343,6 +421,38 @@ async fn handle_connection( }; let _ = client_tx_incoming.send(snapshot).await; } + "getWebsocketLog" => { + // Liefert die letzten 24h (oder weniger) aus dem In-Memory-Log. + let entries = { + let guard = ws_log_for_incoming.lock().await; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let cutoff = now.saturating_sub(24 * 3600); + + let mut filtered: Vec = guard + .entries + .iter() + .filter(|e| e.timestamp >= cutoff) + .cloned() + .collect(); + + // Zur Sicherheit begrenzen wir die Antwortgröße + if filtered.len() > 1000 { + let len = filtered.len(); + filtered = filtered[len - 1000..].to_vec(); + } + filtered + }; + + let payload = serde_json::json!({ + "event": "getWebsocketLogResponse", + "entries": entries, + }) + .to_string(); + let _ = client_tx_incoming.send(payload).await; + } _ => { // Unbekannte Events ignorieren } @@ -402,6 +512,13 @@ async fn handle_connection( } } + // Logging für den 24h-Überblick, was an welchen User/Peer geht + let conn_user = { + let guard = user_id_for_broker.lock().await; + guard.clone() + }; + append_ws_log(&ws_log_for_outgoing, "broker->client", &peer_addr, &conn_user, &msg).await; + if let Err(e) = ws_sender.send(Message::Text(msg)).await { eprintln!( "[WebSocketServer] Fehler beim Senden an {}: {}", @@ -414,6 +531,12 @@ async fn handle_connection( client_msg = client_rx.recv() => { match client_msg { Some(msg) => { + let conn_user = { + let guard = user_id_for_broker.lock().await; + guard.clone() + }; + append_ws_log(&ws_log_for_outgoing, "local->client", &peer_addr, &conn_user, &msg).await; + if let Err(e) = ws_sender.send(Message::Text(msg)).await { eprintln!( "[WebSocketServer] Fehler beim Senden an {}: {}",