Add WebSocket logging functionality: Implemented in-memory logging for WebSocket events, allowing retrieval of logs for the last 24 hours. Enhanced connection handling to log message direction and user information.

This commit is contained in:
Torsten Schulz (local)
2025-11-22 09:44:44 +01:00
parent f9d869ee23
commit 071c05629e

View File

@@ -2,7 +2,7 @@ use crate::db::ConnectionPool;
use crate::message_broker::MessageBroker; use crate::message_broker::MessageBroker;
use crate::worker::Worker; use crate::worker::Worker;
use futures_util::{FutureExt, SinkExt, StreamExt}; use futures_util::{FutureExt, SinkExt, StreamExt};
use serde::Deserialize; use serde::{Deserialize, Serialize};
use serde_json::Value as Json; use serde_json::Value as Json;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
@@ -10,6 +10,7 @@ use std::io::BufReader;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::runtime::{Builder, Runtime}; use tokio::runtime::{Builder, Runtime};
@@ -47,6 +48,74 @@ struct ConnectionRegistry {
by_user: HashMap<String, usize>, by_user: HashMap<String, usize>,
} }
/// Eintrag für das WebSocket-Log, abrufbar über ein eigenes Event.
#[derive(Debug, Clone, Serialize)]
struct WebSocketLogEntry {
timestamp: u64, // Sekunden seit UNIX_EPOCH
direction: String, // z.B. "broker->client"
peer: String, // "ip:port"
conn_user: Option<String>, // per setUserId gesetzte User-ID
target_user: Option<String>, // user_id aus der Nachricht (falls vorhanden)
event: Option<String>, // event-Feld aus der Nachricht (falls JSON)
}
#[derive(Default)]
struct WebSocketLog {
entries: Vec<WebSocketLogEntry>,
}
const WS_LOG_MAX_ENTRIES: usize = 50_000;
async fn append_ws_log(
log: &Arc<Mutex<WebSocketLog>>,
direction: &str,
peer_addr: &SocketAddr,
conn_user: &Option<String>,
msg: &str,
) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let (event, target_user) = if let Ok(json) = serde_json::from_str::<Json>(msg) {
let event = json
.get("event")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let target_user = json
.get("user_id")
.and_then(|v| {
if let Some(s) = v.as_str() {
Some(s.to_string())
} else if let Some(n) = v.as_i64() {
Some(n.to_string())
} else {
None
}
});
(event, target_user)
} else {
(None, None)
};
let entry = WebSocketLogEntry {
timestamp: now,
direction: direction.to_string(),
peer: peer_addr.to_string(),
conn_user: conn_user.clone(),
target_user,
event,
};
let mut guard = log.lock().await;
guard.entries.push(entry);
if guard.entries.len() > WS_LOG_MAX_ENTRIES {
let overflow = guard.entries.len() - WS_LOG_MAX_ENTRIES;
guard.entries.drain(0..overflow);
}
}
fn create_tls_acceptor( fn create_tls_acceptor(
cert_path: Option<&str>, cert_path: Option<&str>,
key_path: Option<&str>, key_path: Option<&str>,
@@ -148,6 +217,8 @@ impl WebSocketServer {
// Gemeinsame Registry für alle Verbindungen // Gemeinsame Registry für alle Verbindungen
let registry = Arc::new(Mutex::new(ConnectionRegistry::default())); let registry = Arc::new(Mutex::new(ConnectionRegistry::default()));
// Gemeinsames WebSocket-Log (für max. 24h, begrenzt über WS_LOG_MAX_ENTRIES)
let ws_log = Arc::new(Mutex::new(WebSocketLog::default()));
// Broadcast-Kanal für Broker-Nachrichten // Broadcast-Kanal für Broker-Nachrichten
let (tx, _) = broadcast::channel::<String>(1024); let (tx, _) = broadcast::channel::<String>(1024);
@@ -187,6 +258,7 @@ impl WebSocketServer {
tx, tx,
self.pool.clone(), self.pool.clone(),
registry, registry,
ws_log,
tls_acceptor, tls_acceptor,
)); ));
@@ -218,6 +290,7 @@ async fn run_accept_loop(
tx: broadcast::Sender<String>, tx: broadcast::Sender<String>,
_pool: ConnectionPool, _pool: ConnectionPool,
registry: Arc<Mutex<ConnectionRegistry>>, registry: Arc<Mutex<ConnectionRegistry>>,
ws_log: Arc<Mutex<WebSocketLog>>,
tls_acceptor: Option<TlsAcceptor>, tls_acceptor: Option<TlsAcceptor>,
) { ) {
let listener = match TcpListener::bind(&addr).await { let listener = match TcpListener::bind(&addr).await {
@@ -243,13 +316,15 @@ async fn run_accept_loop(
let peer_addr = peer; let peer_addr = peer;
let rx = tx.subscribe(); let rx = tx.subscribe();
let registry_clone = registry.clone(); let registry_clone = registry.clone();
let ws_log_clone = ws_log.clone();
let tls_acceptor_clone = tls_acceptor.clone(); let tls_acceptor_clone = tls_acceptor.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Some(acc) = tls_acceptor_clone { if let Some(acc) = tls_acceptor_clone {
match acc.accept(stream).await { match acc.accept(stream).await {
Ok(tls_stream) => { Ok(tls_stream) => {
handle_connection(tls_stream, peer_addr, rx, registry_clone).await handle_connection(tls_stream, peer_addr, rx, registry_clone, ws_log_clone)
.await
} }
Err(err) => { Err(err) => {
eprintln!( eprintln!(
@@ -258,7 +333,7 @@ async fn run_accept_loop(
} }
} }
} else { } else {
handle_connection(stream, peer_addr, rx, registry_clone).await; handle_connection(stream, peer_addr, rx, registry_clone, ws_log_clone).await;
} }
}); });
} }
@@ -269,6 +344,7 @@ async fn handle_connection<S>(
peer_addr: SocketAddr, peer_addr: SocketAddr,
mut broker_rx: broadcast::Receiver<String>, mut broker_rx: broadcast::Receiver<String>,
registry: Arc<Mutex<ConnectionRegistry>>, registry: Arc<Mutex<ConnectionRegistry>>,
ws_log: Arc<Mutex<WebSocketLog>>,
) where ) where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
@@ -300,6 +376,8 @@ async fn handle_connection<S>(
let user_id_for_broker = user_id.clone(); let user_id_for_broker = user_id.clone();
let registry_for_incoming = registry.clone(); let registry_for_incoming = registry.clone();
let client_tx_incoming = client_tx.clone(); let client_tx_incoming = client_tx.clone();
let ws_log_for_incoming = ws_log.clone();
let ws_log_for_outgoing = ws_log.clone();
// Eingehende Nachrichten vom Client // Eingehende Nachrichten vom Client
let incoming = async move { let incoming = async move {
@@ -343,6 +421,38 @@ async fn handle_connection<S>(
}; };
let _ = client_tx_incoming.send(snapshot).await; let _ = client_tx_incoming.send(snapshot).await;
} }
"getWebsocketLog" => {
// Liefert die letzten 24h (oder weniger) aus dem In-Memory-Log.
let entries = {
let guard = ws_log_for_incoming.lock().await;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let cutoff = now.saturating_sub(24 * 3600);
let mut filtered: Vec<WebSocketLogEntry> = guard
.entries
.iter()
.filter(|e| e.timestamp >= cutoff)
.cloned()
.collect();
// Zur Sicherheit begrenzen wir die Antwortgröße
if filtered.len() > 1000 {
let len = filtered.len();
filtered = filtered[len - 1000..].to_vec();
}
filtered
};
let payload = serde_json::json!({
"event": "getWebsocketLogResponse",
"entries": entries,
})
.to_string();
let _ = client_tx_incoming.send(payload).await;
}
_ => { _ => {
// Unbekannte Events ignorieren // Unbekannte Events ignorieren
} }
@@ -402,6 +512,13 @@ async fn handle_connection<S>(
} }
} }
// Logging für den 24h-Überblick, was an welchen User/Peer geht
let conn_user = {
let guard = user_id_for_broker.lock().await;
guard.clone()
};
append_ws_log(&ws_log_for_outgoing, "broker->client", &peer_addr, &conn_user, &msg).await;
if let Err(e) = ws_sender.send(Message::Text(msg)).await { if let Err(e) = ws_sender.send(Message::Text(msg)).await {
eprintln!( eprintln!(
"[WebSocketServer] Fehler beim Senden an {}: {}", "[WebSocketServer] Fehler beim Senden an {}: {}",
@@ -414,6 +531,12 @@ async fn handle_connection<S>(
client_msg = client_rx.recv() => { client_msg = client_rx.recv() => {
match client_msg { match client_msg {
Some(msg) => { Some(msg) => {
let conn_user = {
let guard = user_id_for_broker.lock().await;
guard.clone()
};
append_ws_log(&ws_log_for_outgoing, "local->client", &peer_addr, &conn_user, &msg).await;
if let Err(e) = ws_sender.send(Message::Text(msg)).await { if let Err(e) = ws_sender.send(Message::Text(msg)).await {
eprintln!( eprintln!(
"[WebSocketServer] Fehler beim Senden an {}: {}", "[WebSocketServer] Fehler beim Senden an {}: {}",