Files
yourpart-daemon/src/message_broker.rs
Torsten Schulz (local) d0ec363f09 Initial commit: Rust YpDaemon
2025-11-21 23:05:34 +01:00

116 lines
3.6 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
// Platzhalter-Implementierung, angelehnt an die C++-Version.
// Später können wir hier auf Kanäle und ggf. async (Tokio) umstellen.
type Callback = Arc<dyn Fn(String) + Send + Sync + 'static>;
#[derive(Clone)]
pub struct MessageBroker {
inner: Arc<Inner>,
}
struct Inner {
subscribers: Mutex<Vec<Callback>>,
sender: mpsc::Sender<String>,
receiver: Mutex<Option<mpsc::Receiver<String>>>,
running: AtomicBool,
started: AtomicBool,
}
impl MessageBroker {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel::<String>();
Self {
inner: Arc::new(Inner {
subscribers: Mutex::new(Vec::new()),
sender: tx,
receiver: Mutex::new(Some(rx)),
running: AtomicBool::new(true),
started: AtomicBool::new(false),
}),
}
}
pub fn publish(&self, message: String) {
// Nachrichten werden in eine interne Queue gestellt und von einem
// Hintergrund-Thread an alle Subscriber verteilt.
//
// Falls der Empfänger bereits beendet wurde, ignorieren wir den Fehler
// still (Broker fährt gerade herunter).
let _ = self.inner.sender.send(message);
}
pub fn subscribe<F>(&self, f: F)
where
F: Fn(String) + Send + Sync + 'static,
{
let mut guard = self.inner.subscribers.lock().unwrap();
guard.push(Arc::new(f));
}
pub fn start(&self) {
// Idempotent: nur einmal einen Hintergrund-Thread starten, der
// Nachrichten aus der Queue liest und an Subscriber verteilt.
if self
.inner
.started
.swap(true, Ordering::SeqCst)
{
return;
}
let inner = Arc::clone(&self.inner);
let rx_opt = {
let mut guard = inner.receiver.lock().unwrap();
guard.take()
};
if let Some(rx) = rx_opt {
thread::spawn(move || {
// Arbeite Nachrichten ab, solange `running` true ist oder noch
// Nachrichten im Kanal vorhanden sind.
loop {
if !inner.running.load(Ordering::Relaxed) {
// Wir beenden trotzdem erst, wenn der Kanal leer oder
// getrennt ist recv_timeout mit kurzer Wartezeit.
match rx.recv_timeout(Duration::from_millis(50)) {
Ok(msg) => dispatch_to_subscribers(&inner, &msg),
Err(mpsc::RecvTimeoutError::Timeout) => break,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
} else {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(msg) => dispatch_to_subscribers(&inner, &msg),
Err(mpsc::RecvTimeoutError::Timeout) => continue,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
}
});
}
}
pub fn stop(&self) {
// Signalisiert dem Hintergrund-Thread, dass er nach Abarbeiten der
// aktuellen Nachrichten-Schlange beenden soll.
self.inner.running.store(false, Ordering::SeqCst);
}
}
fn dispatch_to_subscribers(inner: &Inner, message: &str) {
let subs = {
let guard = inner.subscribers.lock().unwrap();
guard.clone()
};
for cb in subs {
cb(message.to_string());
}
}