use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration; // Platzhalter-Implementierung, angelehnt an die C++-Version. // Später können wir hier auf Kanäle und ggf. async (Tokio) umstellen. type Callback = Arc; #[derive(Clone)] pub struct MessageBroker { inner: Arc, } struct Inner { subscribers: Mutex>, sender: mpsc::Sender, receiver: Mutex>>, running: AtomicBool, started: AtomicBool, } impl MessageBroker { pub fn new() -> Self { let (tx, rx) = mpsc::channel::(); Self { inner: Arc::new(Inner { subscribers: Mutex::new(Vec::new()), sender: tx, receiver: Mutex::new(Some(rx)), running: AtomicBool::new(true), started: AtomicBool::new(false), }), } } pub fn publish(&self, message: String) { // Nachrichten werden in eine interne Queue gestellt und von einem // Hintergrund-Thread an alle Subscriber verteilt. // // Falls der Empfänger bereits beendet wurde, ignorieren wir den Fehler // still (Broker fährt gerade herunter). let _ = self.inner.sender.send(message); } pub fn subscribe(&self, f: F) where F: Fn(String) + Send + Sync + 'static, { let mut guard = self.inner.subscribers.lock().unwrap(); guard.push(Arc::new(f)); } pub fn start(&self) { // Idempotent: nur einmal einen Hintergrund-Thread starten, der // Nachrichten aus der Queue liest und an Subscriber verteilt. if self .inner .started .swap(true, Ordering::SeqCst) { return; } let inner = Arc::clone(&self.inner); let rx_opt = { let mut guard = inner.receiver.lock().unwrap(); guard.take() }; if let Some(rx) = rx_opt { thread::spawn(move || { // Arbeite Nachrichten ab, solange `running` true ist oder noch // Nachrichten im Kanal vorhanden sind. loop { if !inner.running.load(Ordering::Relaxed) { // Wir beenden trotzdem erst, wenn der Kanal leer oder // getrennt ist – recv_timeout mit kurzer Wartezeit. match rx.recv_timeout(Duration::from_millis(50)) { Ok(msg) => dispatch_to_subscribers(&inner, &msg), Err(mpsc::RecvTimeoutError::Timeout) => break, Err(mpsc::RecvTimeoutError::Disconnected) => break, } } else { match rx.recv_timeout(Duration::from_millis(100)) { Ok(msg) => dispatch_to_subscribers(&inner, &msg), Err(mpsc::RecvTimeoutError::Timeout) => continue, Err(mpsc::RecvTimeoutError::Disconnected) => break, } } } }); } } pub fn stop(&self) { // Signalisiert dem Hintergrund-Thread, dass er nach Abarbeiten der // aktuellen Nachrichten-Schlange beenden soll. self.inner.running.store(false, Ordering::SeqCst); } } fn dispatch_to_subscribers(inner: &Inner, message: &str) { let subs = { let guard = inner.subscribers.lock().unwrap(); guard.clone() }; for cb in subs { cb(message.to_string()); } }