diff --git a/README.md b/README.md index c7fdcc3..95938ad 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,8 @@ Die Kommunikation erfolgt ueber WebSocket (Frontend) und optional TCP/Unix-Socke - In-Memory-Room-Management - Token-basierte Session-Absicherung pro Verbindung - Username-Pruefung (3-32 Zeichen, `[a-zA-Z0-9_]`) -- Verhindert Mehrfach-Login mit identischem Namen (`loggedin`) +- Reconnect-freundlich: bestehende Session wird bei erneutem `init` mit gleichem Namen ersetzt +- WebSocket Keepalive via Ping/Pong zur Erkennung verwaister Verbindungen - Optionale Allowlist fuer User via `CHAT_ALLOWED_USERS` - Optionale DB-basierte User-Pruefung via Postgres (`CHAT_DB_URL`) - Kompatible Antworttypen fuer die bestehende `chatTcpBridge.js` diff --git a/WEB_DIALOG_INTEGRATION.md b/WEB_DIALOG_INTEGRATION.md index e4d18e0..77ede15 100644 --- a/WEB_DIALOG_INTEGRATION.md +++ b/WEB_DIALOG_INTEGRATION.md @@ -47,6 +47,12 @@ Sie ist absichtlich auf das Protokoll und die Client-Integration fokussiert (ohn {"type":5,"message":"room_entered","to":"lobby"} ``` +- Erzwungenes Logout bei Session-Ersatz (Reconnect mit gleichem Namen): + +```json +{"type":5,"message":"logout","reason":"replaced_by_new_login"} +``` + - Chatnachricht: ```json @@ -183,7 +189,6 @@ function sendMessage(text) { - `missing_name`: `init` ohne `name` - `invalid_username`: Username entspricht nicht den Regeln -- `loggedin`: Username ist bereits aktiv - `user_not_allowed`: User nicht erlaubt (DB/Allowlist) - `not_initialized`: Command vor `init` - `missing_token`: Token fehlt @@ -194,7 +199,7 @@ function sendMessage(text) { ## Hinweise für Frontend-Implementierung - Token zentral im Chat-State halten -- Reconnect-Strategie einbauen (neues `init`, neues Token) +- Reconnect-Strategie einbauen (neues `init`, neues Token); alte Session wird dabei serverseitig ersetzt - Vor Senden auth-pflichtiger Commands Token prüfen - UI sollte Fehler vom Typ `error` immer sichtbar machen - Für Slash-Kommandos reicht normales `message`-Senden diff --git a/src/commands.rs b/src/commands.rs index 2eeceaf..2ef2f90 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -211,83 +211,95 @@ async fn handle_init_command( return; } - let (token, user_name, actual_room_name, old_room_name) = { - let mut guard = state.write().await; - if guard.logged_in_names.contains(&requested_user_name) - && guard - .clients - .iter() - .any(|(id, c)| *id != client_id && c.logged_in && c.user_name == requested_user_name) - { - drop(guard); - send_error(client_id, Arc::clone(&state), "loggedin").await; - return; - } - if !guard.room_meta.contains_key(&resolved_room_name) { - drop(guard); - if room_debug_enabled() { - eprintln!( - "[yourchat2][room-debug][init] client_id={client_id} resolved_room='{resolved_room_name}' vanished_before_join" - ); + let (token, user_name, actual_room_name, old_room_name) = loop { + let replacement_needed = { + let mut guard = state.write().await; + if let Some(existing_client_id) = guard.clients.iter().find_map(|(id, c)| { + if *id != client_id && c.logged_in && c.user_name == requested_user_name { + Some(*id) + } else { + None + } + }) { + Some(existing_client_id) + } else { + if !guard.room_meta.contains_key(&resolved_room_name) { + drop(guard); + if room_debug_enabled() { + eprintln!( + "[yourchat2][room-debug][init] client_id={client_id} resolved_room='{resolved_room_name}' vanished_before_join" + ); + } + send_error(client_id, Arc::clone(&state), "room_not_found").await; + return; + } + + let (old_room, old_name, was_logged_in, user_name, token, new_token) = { + let Some(client) = guard.clients.get_mut(&client_id) else { + return; + }; + let old_room = client.room.clone(); + let old_name = client.user_name.clone(); + let was_logged_in = client.logged_in; + + client.user_name = profile.display_name.clone(); + client.color = profile.color.clone(); + client.falukant_user_id = profile.falukant_user_id; + client.chat_user_id = profile.chat_user_id; + client.gender_id = profile.gender_id; + client.age = profile.age; + client.rights = profile.rights.clone(); + client.right_type_ids = profile.right_type_ids.clone(); + client.logged_in = true; + client.room = resolved_room_name.clone(); + + let mut new_token = None; + if client.token.is_none() { + let generated = Uuid::new_v4().to_string(); + client.token = Some(generated.clone()); + new_token = Some(generated); + } + + ( + old_room, + old_name, + was_logged_in, + client.user_name.clone(), + client.token.clone().unwrap_or_default(), + new_token, + ) + }; + + if let Some(generated) = new_token { + guard.tokens.insert(generated, client_id); + } + if was_logged_in { + guard.logged_in_names.remove(&old_name); + } + guard.logged_in_names.insert(user_name.clone()); + if !old_room.is_empty() { + if let Some(members) = guard.rooms.get_mut(&old_room) { + members.remove(&client_id); + } + } + guard + .rooms + .entry(resolved_room_name.clone()) + .or_default() + .insert(client_id); + break (token, user_name, resolved_room_name.clone(), old_room); } - send_error(client_id, Arc::clone(&state), "room_not_found").await; - return; - } - - let (old_room, old_name, was_logged_in, user_name, token, new_token) = { - let Some(client) = guard.clients.get_mut(&client_id) else { - return; - }; - let old_room = client.room.clone(); - let old_name = client.user_name.clone(); - let was_logged_in = client.logged_in; - - client.user_name = profile.display_name.clone(); - client.color = profile.color.clone(); - client.falukant_user_id = profile.falukant_user_id; - client.chat_user_id = profile.chat_user_id; - client.gender_id = profile.gender_id; - client.age = profile.age; - client.rights = profile.rights.clone(); - client.right_type_ids = profile.right_type_ids.clone(); - client.logged_in = true; - client.room = resolved_room_name.clone(); - - let mut new_token = None; - if client.token.is_none() { - let generated = Uuid::new_v4().to_string(); - client.token = Some(generated.clone()); - new_token = Some(generated); - } - - ( - old_room, - old_name, - was_logged_in, - client.user_name.clone(), - client.token.clone().unwrap_or_default(), - new_token, - ) }; - - if let Some(generated) = new_token { - guard.tokens.insert(generated, client_id); + if let Some(old_client_id) = replacement_needed { + state::send_to_client( + old_client_id, + Arc::clone(&state), + json!({"type":5, "message":"logout", "reason":"replaced_by_new_login"}), + ) + .await; + state::disconnect_client(old_client_id, Arc::clone(&state)).await; + continue; } - if was_logged_in { - guard.logged_in_names.remove(&old_name); - } - guard.logged_in_names.insert(user_name.clone()); - if !old_room.is_empty() { - if let Some(members) = guard.rooms.get_mut(&old_room) { - members.remove(&client_id); - } - } - guard - .rooms - .entry(resolved_room_name.clone()) - .or_default() - .insert(client_id); - (token, user_name, resolved_room_name, old_room) }; if !old_room_name.is_empty() { state::mark_room_possibly_empty(&old_room_name, Arc::clone(&state)).await; diff --git a/src/db.rs b/src/db.rs index 86e1c6b..428c425 100644 --- a/src/db.rs +++ b/src/db.rs @@ -421,31 +421,52 @@ async fn load_user_gender(client: Arc, user_id: i32) -> Option { } async fn load_user_gender_from_user_param(client: Arc, user_id: i32) -> Option { - let row = client - .query_opt( - "SELECT - up.value AS raw_value, - tpv_id.value AS mapped_by_id_value, - tpv_order.value AS mapped_by_order_value - FROM community.user_param up - JOIN \"type\".user_param tp - ON up.param_type_id = tp.id - LEFT JOIN \"type\".user_param_value tpv_id - ON tpv_id.user_param_type_id = tp.id - AND tpv_id.id::text = up.value - LEFT JOIN \"type\".user_param_value tpv_order - ON tpv_order.user_param_type_id = tp.id - AND tpv_order.order_id::text = up.value - WHERE up.user_id = $1 - AND tp.description IN ('gender', 'sex', 'geschlecht') - AND up.value IS NOT NULL - AND btrim(up.value) <> '' - ORDER BY up.updated_at DESC, up.id DESC - LIMIT 1", - &[&user_id], - ) - .await - .ok()??; + let mut row = None; + for query in [ + "SELECT + up.value::text AS raw_value, + tpv_id.value AS mapped_by_id_value, + tpv_order.value AS mapped_by_order_value + FROM community.user_param up + JOIN \"type\".user_param tp + ON up.param_type_id = tp.id + LEFT JOIN \"type\".user_param_value tpv_id + ON tpv_id.user_param_type_id = tp.id + AND tpv_id.id::text = up.value::text + LEFT JOIN \"type\".user_param_value tpv_order + ON tpv_order.user_param_type_id = tp.id + AND tpv_order.order_id::text = up.value::text + WHERE up.user_id = $1 + AND lower(btrim(tp.description)) IN ('gender', 'sex', 'geschlecht') + AND up.value IS NOT NULL + AND btrim(up.value::text) <> '' + ORDER BY up.updated_at DESC NULLS LAST, up.id DESC + LIMIT 1", + "SELECT + up.user_param_value_id::text AS raw_value, + tpv_id.value AS mapped_by_id_value, + tpv_order.value AS mapped_by_order_value + FROM community.user_param up + JOIN \"type\".user_param tp + ON up.param_type_id = tp.id + LEFT JOIN \"type\".user_param_value tpv_id + ON tpv_id.user_param_type_id = tp.id + AND tpv_id.id = up.user_param_value_id + LEFT JOIN \"type\".user_param_value tpv_order + ON tpv_order.user_param_type_id = tp.id + AND tpv_order.order_id = up.user_param_value_id + WHERE up.user_id = $1 + AND lower(btrim(tp.description)) IN ('gender', 'sex', 'geschlecht') + AND up.user_param_value_id IS NOT NULL + ORDER BY up.updated_at DESC NULLS LAST, up.id DESC + LIMIT 1", + ] { + row = client.query_opt(query, &[&user_id]).await.ok().flatten(); + if row.is_some() { + break; + } + } + let row = row?; let raw_value = row.get::<_, Option>("raw_value"); let mapped_by_id_value = row.get::<_, Option>("mapped_by_id_value"); diff --git a/src/main.rs b/src/main.rs index d76ea36..d96add1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, UnixListener}; use tokio::sync::{mpsc, watch, RwLock}; -use tokio::time::{Duration, interval}; +use tokio::time::{Duration, Instant, interval}; use tokio_rustls::TlsAcceptor; use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer}; use tokio_rustls::rustls::ServerConfig as RustlsServerConfig; @@ -395,13 +395,8 @@ where ); } - let writer_task = tokio::spawn(async move { - while let Some(msg) = rx.recv().await { - if ws_write.send(Message::Text(msg.into())).await.is_err() { - break; - } - } - }); + let mut ping_tick = interval(Duration::from_secs(20)); + let mut pending_ping_since: Option = None; loop { tokio::select! { @@ -410,17 +405,46 @@ where break; } } + _ = ping_tick.tick() => { + if let Some(sent_at) = pending_ping_since { + if sent_at.elapsed() > Duration::from_secs(10) { + break; + } + } + if ws_write.send(Message::Ping(Vec::new().into())).await.is_err() { + break; + } + pending_ping_since = Some(Instant::now()); + } + outbound = rx.recv() => { + let Some(msg) = outbound else { + break; + }; + if ws_write.send(Message::Text(msg.into())).await.is_err() { + break; + } + } incoming = ws_read.next() => { match incoming { Some(Ok(Message::Text(text))) => { + pending_ping_since = None; commands::process_text_command(client_id, &text, Arc::clone(&state), Arc::clone(&config)).await; } Some(Ok(Message::Binary(bin))) => { + pending_ping_since = None; if let Ok(text) = std::str::from_utf8(&bin) { commands::process_text_command(client_id, text, Arc::clone(&state), Arc::clone(&config)).await; } } - Some(Ok(Message::Ping(_))) => {} + Some(Ok(Message::Ping(payload))) => { + pending_ping_since = None; + if ws_write.send(Message::Pong(payload)).await.is_err() { + break; + } + } + Some(Ok(Message::Pong(_))) => { + pending_ping_since = None; + } Some(Ok(Message::Close(_))) => break, Some(Ok(_)) => {} Some(Err(_)) | None => break, @@ -430,7 +454,6 @@ where } state::disconnect_client(client_id, state).await; - writer_task.abort(); Ok(()) }