Implement session replacement and WebSocket keepalive features

Enhanced the session management by allowing a reconnect with the same username to replace the existing session, sending a logout message to the previous session. Introduced WebSocket keepalive functionality using Ping/Pong messages to detect stale connections. Updated documentation to reflect these changes and improve user experience during reconnections.
This commit is contained in:
Torsten Schulz (local)
2026-03-05 08:03:15 +01:00
parent 92ae7d614e
commit 8b9947cc03
5 changed files with 174 additions and 112 deletions

View File

@@ -11,7 +11,8 @@ Die Kommunikation erfolgt ueber WebSocket (Frontend) und optional TCP/Unix-Socke
- In-Memory-Room-Management - In-Memory-Room-Management
- Token-basierte Session-Absicherung pro Verbindung - Token-basierte Session-Absicherung pro Verbindung
- Username-Pruefung (3-32 Zeichen, `[a-zA-Z0-9_]`) - Username-Pruefung (3-32 Zeichen, `[a-zA-Z0-9_]`)
- Verhindert Mehrfach-Login mit identischem Namen (`loggedin`) - Reconnect-freundlich: bestehende Session wird bei erneutem `init` mit gleichem Namen ersetzt
- WebSocket Keepalive via Ping/Pong zur Erkennung verwaister Verbindungen
- Optionale Allowlist fuer User via `CHAT_ALLOWED_USERS` - Optionale Allowlist fuer User via `CHAT_ALLOWED_USERS`
- Optionale DB-basierte User-Pruefung via Postgres (`CHAT_DB_URL`) - Optionale DB-basierte User-Pruefung via Postgres (`CHAT_DB_URL`)
- Kompatible Antworttypen fuer die bestehende `chatTcpBridge.js` - Kompatible Antworttypen fuer die bestehende `chatTcpBridge.js`

View File

@@ -47,6 +47,12 @@ Sie ist absichtlich auf das Protokoll und die Client-Integration fokussiert (ohn
{"type":5,"message":"room_entered","to":"lobby"} {"type":5,"message":"room_entered","to":"lobby"}
``` ```
- Erzwungenes Logout bei Session-Ersatz (Reconnect mit gleichem Namen):
```json
{"type":5,"message":"logout","reason":"replaced_by_new_login"}
```
- Chatnachricht: - Chatnachricht:
```json ```json
@@ -183,7 +189,6 @@ function sendMessage(text) {
- `missing_name`: `init` ohne `name` - `missing_name`: `init` ohne `name`
- `invalid_username`: Username entspricht nicht den Regeln - `invalid_username`: Username entspricht nicht den Regeln
- `loggedin`: Username ist bereits aktiv
- `user_not_allowed`: User nicht erlaubt (DB/Allowlist) - `user_not_allowed`: User nicht erlaubt (DB/Allowlist)
- `not_initialized`: Command vor `init` - `not_initialized`: Command vor `init`
- `missing_token`: Token fehlt - `missing_token`: Token fehlt
@@ -194,7 +199,7 @@ function sendMessage(text) {
## Hinweise für Frontend-Implementierung ## Hinweise für Frontend-Implementierung
- Token zentral im Chat-State halten - Token zentral im Chat-State halten
- Reconnect-Strategie einbauen (neues `init`, neues Token) - Reconnect-Strategie einbauen (neues `init`, neues Token); alte Session wird dabei serverseitig ersetzt
- Vor Senden auth-pflichtiger Commands Token prüfen - Vor Senden auth-pflichtiger Commands Token prüfen
- UI sollte Fehler vom Typ `error` immer sichtbar machen - UI sollte Fehler vom Typ `error` immer sichtbar machen
- Für Slash-Kommandos reicht normales `message`-Senden - Für Slash-Kommandos reicht normales `message`-Senden

View File

@@ -211,18 +211,18 @@ async fn handle_init_command(
return; return;
} }
let (token, user_name, actual_room_name, old_room_name) = { let (token, user_name, actual_room_name, old_room_name) = loop {
let replacement_needed = {
let mut guard = state.write().await; let mut guard = state.write().await;
if guard.logged_in_names.contains(&requested_user_name) if let Some(existing_client_id) = guard.clients.iter().find_map(|(id, c)| {
&& guard if *id != client_id && c.logged_in && c.user_name == requested_user_name {
.clients Some(*id)
.iter() } else {
.any(|(id, c)| *id != client_id && c.logged_in && c.user_name == requested_user_name) None
{
drop(guard);
send_error(client_id, Arc::clone(&state), "loggedin").await;
return;
} }
}) {
Some(existing_client_id)
} else {
if !guard.room_meta.contains_key(&resolved_room_name) { if !guard.room_meta.contains_key(&resolved_room_name) {
drop(guard); drop(guard);
if room_debug_enabled() { if room_debug_enabled() {
@@ -287,7 +287,19 @@ async fn handle_init_command(
.entry(resolved_room_name.clone()) .entry(resolved_room_name.clone())
.or_default() .or_default()
.insert(client_id); .insert(client_id);
(token, user_name, resolved_room_name, old_room) break (token, user_name, resolved_room_name.clone(), old_room);
}
};
if let Some(old_client_id) = replacement_needed {
state::send_to_client(
old_client_id,
Arc::clone(&state),
json!({"type":5, "message":"logout", "reason":"replaced_by_new_login"}),
)
.await;
state::disconnect_client(old_client_id, Arc::clone(&state)).await;
continue;
}
}; };
if !old_room_name.is_empty() { if !old_room_name.is_empty() {
state::mark_room_possibly_empty(&old_room_name, Arc::clone(&state)).await; state::mark_room_possibly_empty(&old_room_name, Arc::clone(&state)).await;

View File

@@ -421,10 +421,10 @@ async fn load_user_gender(client: Arc<PgClient>, user_id: i32) -> Option<i32> {
} }
async fn load_user_gender_from_user_param(client: Arc<PgClient>, user_id: i32) -> Option<i32> { async fn load_user_gender_from_user_param(client: Arc<PgClient>, user_id: i32) -> Option<i32> {
let row = client let mut row = None;
.query_opt( for query in [
"SELECT "SELECT
up.value AS raw_value, up.value::text AS raw_value,
tpv_id.value AS mapped_by_id_value, tpv_id.value AS mapped_by_id_value,
tpv_order.value AS mapped_by_order_value tpv_order.value AS mapped_by_order_value
FROM community.user_param up FROM community.user_param up
@@ -432,20 +432,41 @@ async fn load_user_gender_from_user_param(client: Arc<PgClient>, user_id: i32) -
ON up.param_type_id = tp.id ON up.param_type_id = tp.id
LEFT JOIN \"type\".user_param_value tpv_id LEFT JOIN \"type\".user_param_value tpv_id
ON tpv_id.user_param_type_id = tp.id ON tpv_id.user_param_type_id = tp.id
AND tpv_id.id::text = up.value AND tpv_id.id::text = up.value::text
LEFT JOIN \"type\".user_param_value tpv_order LEFT JOIN \"type\".user_param_value tpv_order
ON tpv_order.user_param_type_id = tp.id ON tpv_order.user_param_type_id = tp.id
AND tpv_order.order_id::text = up.value AND tpv_order.order_id::text = up.value::text
WHERE up.user_id = $1 WHERE up.user_id = $1
AND tp.description IN ('gender', 'sex', 'geschlecht') AND lower(btrim(tp.description)) IN ('gender', 'sex', 'geschlecht')
AND up.value IS NOT NULL AND up.value IS NOT NULL
AND btrim(up.value) <> '' AND btrim(up.value::text) <> ''
ORDER BY up.updated_at DESC, up.id DESC ORDER BY up.updated_at DESC NULLS LAST, up.id DESC
LIMIT 1", LIMIT 1",
&[&user_id], "SELECT
) up.user_param_value_id::text AS raw_value,
.await tpv_id.value AS mapped_by_id_value,
.ok()??; tpv_order.value AS mapped_by_order_value
FROM community.user_param up
JOIN \"type\".user_param tp
ON up.param_type_id = tp.id
LEFT JOIN \"type\".user_param_value tpv_id
ON tpv_id.user_param_type_id = tp.id
AND tpv_id.id = up.user_param_value_id
LEFT JOIN \"type\".user_param_value tpv_order
ON tpv_order.user_param_type_id = tp.id
AND tpv_order.order_id = up.user_param_value_id
WHERE up.user_id = $1
AND lower(btrim(tp.description)) IN ('gender', 'sex', 'geschlecht')
AND up.user_param_value_id IS NOT NULL
ORDER BY up.updated_at DESC NULLS LAST, up.id DESC
LIMIT 1",
] {
row = client.query_opt(query, &[&user_id]).await.ok().flatten();
if row.is_some() {
break;
}
}
let row = row?;
let raw_value = row.get::<_, Option<String>>("raw_value"); let raw_value = row.get::<_, Option<String>>("raw_value");
let mapped_by_id_value = row.get::<_, Option<String>>("mapped_by_id_value"); let mapped_by_id_value = row.get::<_, Option<String>>("mapped_by_id_value");

View File

@@ -9,7 +9,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, UnixListener}; use tokio::net::{TcpListener, UnixListener};
use tokio::sync::{mpsc, watch, RwLock}; use tokio::sync::{mpsc, watch, RwLock};
use tokio::time::{Duration, interval}; use tokio::time::{Duration, Instant, interval};
use tokio_rustls::TlsAcceptor; use tokio_rustls::TlsAcceptor;
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer}; use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
use tokio_rustls::rustls::ServerConfig as RustlsServerConfig; use tokio_rustls::rustls::ServerConfig as RustlsServerConfig;
@@ -395,13 +395,8 @@ where
); );
} }
let writer_task = tokio::spawn(async move { let mut ping_tick = interval(Duration::from_secs(20));
while let Some(msg) = rx.recv().await { let mut pending_ping_since: Option<Instant> = None;
if ws_write.send(Message::Text(msg.into())).await.is_err() {
break;
}
}
});
loop { loop {
tokio::select! { tokio::select! {
@@ -410,17 +405,46 @@ where
break; break;
} }
} }
_ = ping_tick.tick() => {
if let Some(sent_at) = pending_ping_since {
if sent_at.elapsed() > Duration::from_secs(10) {
break;
}
}
if ws_write.send(Message::Ping(Vec::new().into())).await.is_err() {
break;
}
pending_ping_since = Some(Instant::now());
}
outbound = rx.recv() => {
let Some(msg) = outbound else {
break;
};
if ws_write.send(Message::Text(msg.into())).await.is_err() {
break;
}
}
incoming = ws_read.next() => { incoming = ws_read.next() => {
match incoming { match incoming {
Some(Ok(Message::Text(text))) => { Some(Ok(Message::Text(text))) => {
pending_ping_since = None;
commands::process_text_command(client_id, &text, Arc::clone(&state), Arc::clone(&config)).await; commands::process_text_command(client_id, &text, Arc::clone(&state), Arc::clone(&config)).await;
} }
Some(Ok(Message::Binary(bin))) => { Some(Ok(Message::Binary(bin))) => {
pending_ping_since = None;
if let Ok(text) = std::str::from_utf8(&bin) { if let Ok(text) = std::str::from_utf8(&bin) {
commands::process_text_command(client_id, text, Arc::clone(&state), Arc::clone(&config)).await; commands::process_text_command(client_id, text, Arc::clone(&state), Arc::clone(&config)).await;
} }
} }
Some(Ok(Message::Ping(_))) => {} Some(Ok(Message::Ping(payload))) => {
pending_ping_since = None;
if ws_write.send(Message::Pong(payload)).await.is_err() {
break;
}
}
Some(Ok(Message::Pong(_))) => {
pending_ping_since = None;
}
Some(Ok(Message::Close(_))) => break, Some(Ok(Message::Close(_))) => break,
Some(Ok(_)) => {} Some(Ok(_)) => {}
Some(Err(_)) | None => break, Some(Err(_)) | None => break,
@@ -430,7 +454,6 @@ where
} }
state::disconnect_client(client_id, state).await; state::disconnect_client(client_id, state).await;
writer_task.abort();
Ok(()) Ok(())
} }