From b8fa644c9773ef7d8eea58bbc03148f730c1536b Mon Sep 17 00:00:00 2001 From: "Torsten Schulz (local)" Date: Mon, 1 Dec 2025 13:25:23 +0100 Subject: [PATCH] Implement periodic ping frames in WebSocket server for keepalive and update overproduction handling in ProduceWorker to include branch_id. This enhances connection stability and improves notification clarity for overproduction events. --- src/websocket_server.rs | 17 ++++++++++++++++- src/worker/produce.rs | 17 ++++++++++++----- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/websocket_server.rs b/src/websocket_server.rs index 8ec185a..e589313 100644 --- a/src/websocket_server.rs +++ b/src/websocket_server.rs @@ -15,6 +15,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio::runtime::{Builder, Runtime}; use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio::time::{interval, Duration as TokioDuration}; use tokio_rustls::rustls::{self, ServerConfig}; use tokio_rustls::TlsAcceptor; use tokio_tungstenite::tungstenite::Message; @@ -473,8 +474,12 @@ async fn handle_connection( } }; - // Broker-Nachrichten an den Client + // Broker-Nachrichten an den Client + periodische Ping-Frames als Keepalive let outgoing = async move { + // Regelmäßiges Ping, um inaktiven Verbindungen ein Lebenszeichen zu senden + // und Timeouts auf dem Weg (Proxy/Loadbalancer) zu vermeiden. + let mut ping_interval = interval(TokioDuration::from_secs(240)); + loop { tokio::select! { // Nachrichten aus dem MessageBroker @@ -551,6 +556,16 @@ async fn handle_connection( } } } + // Periodisches Ping an den Client + _ = ping_interval.tick() => { + if let Err(e) = ws_sender.send(Message::Ping(Vec::new())).await { + eprintln!( + "[WebSocketServer] Fehler beim Senden von Ping an {}: {}", + peer_addr, e + ); + break; + } + } } } }; diff --git a/src/worker/produce.rs b/src/worker/produce.rs index c0150b8..dac4db1 100644 --- a/src/worker/produce.rs +++ b/src/worker/produce.rs @@ -271,7 +271,9 @@ impl ProduceWorker { self.send_production_ready_event(user_id, product_id, quantity, quality, branch_id); true } else { - self.handle_overproduction(user_id, remaining_quantity); + // Es konnten nicht alle produzierten Einheiten eingelagert werden – + // Überproduktion für diesen Branch melden. + self.handle_overproduction(user_id, branch_id, remaining_quantity); true } } @@ -331,8 +333,10 @@ impl ProduceWorker { self.base.broker.publish(message); } - fn handle_overproduction(&self, user_id: i32, remaining_quantity: i32) { - if let Err(err) = self.insert_overproduction_notification(user_id, remaining_quantity) { + fn handle_overproduction(&self, user_id: i32, branch_id: i32, remaining_quantity: i32) { + if let Err(err) = + self.insert_overproduction_notification(user_id, branch_id, remaining_quantity) + { eprintln!( "[ProduceWorker] Fehler beim Schreiben der Overproduction-Notification: {err}" ); @@ -424,6 +428,7 @@ impl ProduceWorker { fn insert_overproduction_notification( &self, user_id: i32, + branch_id: i32, remaining_quantity: i32, ) -> Result<(), crate::db::DbError> { let mut conn = self @@ -437,9 +442,11 @@ impl ProduceWorker { QUERY_ADD_OVERPRODUCTION_NOTIFICATION, )?; + // Zusätzlich zur Menge die Branch-ID in der Payload mitschicken, damit + // das Frontend die Überproduktion einem konkreten Branch zuordnen kann. let notification = format!( - r#"{{"tr":"production.overproduction","value":{}}}"#, - remaining_quantity + r#"{{"tr":"production.overproduction","value":{},"branch_id":{}}}"#, + remaining_quantity, branch_id ); conn.execute(