Implement periodic ping frames in WebSocket server for keepalive and update overproduction handling in ProduceWorker to include branch_id. This enhances connection stability and improves notification clarity for overproduction events.

This commit is contained in:
Torsten Schulz (local)
2025-12-01 13:25:23 +01:00
parent 3e9f921f4f
commit b8fa644c97
2 changed files with 28 additions and 6 deletions

View File

@@ -15,6 +15,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::runtime::{Builder, Runtime}; use tokio::runtime::{Builder, Runtime};
use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::sync::{broadcast, mpsc, Mutex};
use tokio::time::{interval, Duration as TokioDuration};
use tokio_rustls::rustls::{self, ServerConfig}; use tokio_rustls::rustls::{self, ServerConfig};
use tokio_rustls::TlsAcceptor; use tokio_rustls::TlsAcceptor;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
@@ -473,8 +474,12 @@ async fn handle_connection<S>(
} }
}; };
// Broker-Nachrichten an den Client // Broker-Nachrichten an den Client + periodische Ping-Frames als Keepalive
let outgoing = async move { let outgoing = async move {
// Regelmäßiges Ping, um inaktiven Verbindungen ein Lebenszeichen zu senden
// und Timeouts auf dem Weg (Proxy/Loadbalancer) zu vermeiden.
let mut ping_interval = interval(TokioDuration::from_secs(240));
loop { loop {
tokio::select! { tokio::select! {
// Nachrichten aus dem MessageBroker // Nachrichten aus dem MessageBroker
@@ -551,6 +556,16 @@ async fn handle_connection<S>(
} }
} }
} }
// Periodisches Ping an den Client
_ = ping_interval.tick() => {
if let Err(e) = ws_sender.send(Message::Ping(Vec::new())).await {
eprintln!(
"[WebSocketServer] Fehler beim Senden von Ping an {}: {}",
peer_addr, e
);
break;
}
}
} }
} }
}; };

View File

@@ -271,7 +271,9 @@ impl ProduceWorker {
self.send_production_ready_event(user_id, product_id, quantity, quality, branch_id); self.send_production_ready_event(user_id, product_id, quantity, quality, branch_id);
true true
} else { } else {
self.handle_overproduction(user_id, remaining_quantity); // Es konnten nicht alle produzierten Einheiten eingelagert werden
// Überproduktion für diesen Branch melden.
self.handle_overproduction(user_id, branch_id, remaining_quantity);
true true
} }
} }
@@ -331,8 +333,10 @@ impl ProduceWorker {
self.base.broker.publish(message); self.base.broker.publish(message);
} }
fn handle_overproduction(&self, user_id: i32, remaining_quantity: i32) { fn handle_overproduction(&self, user_id: i32, branch_id: i32, remaining_quantity: i32) {
if let Err(err) = self.insert_overproduction_notification(user_id, remaining_quantity) { if let Err(err) =
self.insert_overproduction_notification(user_id, branch_id, remaining_quantity)
{
eprintln!( eprintln!(
"[ProduceWorker] Fehler beim Schreiben der Overproduction-Notification: {err}" "[ProduceWorker] Fehler beim Schreiben der Overproduction-Notification: {err}"
); );
@@ -424,6 +428,7 @@ impl ProduceWorker {
fn insert_overproduction_notification( fn insert_overproduction_notification(
&self, &self,
user_id: i32, user_id: i32,
branch_id: i32,
remaining_quantity: i32, remaining_quantity: i32,
) -> Result<(), crate::db::DbError> { ) -> Result<(), crate::db::DbError> {
let mut conn = self let mut conn = self
@@ -437,9 +442,11 @@ impl ProduceWorker {
QUERY_ADD_OVERPRODUCTION_NOTIFICATION, QUERY_ADD_OVERPRODUCTION_NOTIFICATION,
)?; )?;
// Zusätzlich zur Menge die Branch-ID in der Payload mitschicken, damit
// das Frontend die Überproduktion einem konkreten Branch zuordnen kann.
let notification = format!( let notification = format!(
r#"{{"tr":"production.overproduction","value":{}}}"#, r#"{{"tr":"production.overproduction","value":{},"branch_id":{}}}"#,
remaining_quantity remaining_quantity, branch_id
); );
conn.execute( conn.execute(