116 lines
3.6 KiB
Rust
116 lines
3.6 KiB
Rust
use std::sync::atomic::{AtomicBool, Ordering};
|
||
use std::sync::{mpsc, Arc, Mutex};
|
||
use std::thread;
|
||
use std::time::Duration;
|
||
|
||
// Platzhalter-Implementierung, angelehnt an die C++-Version.
|
||
// Später können wir hier auf Kanäle und ggf. async (Tokio) umstellen.
|
||
|
||
type Callback = Arc<dyn Fn(String) + Send + Sync + 'static>;
|
||
|
||
#[derive(Clone)]
|
||
pub struct MessageBroker {
|
||
inner: Arc<Inner>,
|
||
}
|
||
|
||
struct Inner {
|
||
subscribers: Mutex<Vec<Callback>>,
|
||
sender: mpsc::Sender<String>,
|
||
receiver: Mutex<Option<mpsc::Receiver<String>>>,
|
||
running: AtomicBool,
|
||
started: AtomicBool,
|
||
}
|
||
|
||
impl MessageBroker {
|
||
pub fn new() -> Self {
|
||
let (tx, rx) = mpsc::channel::<String>();
|
||
Self {
|
||
inner: Arc::new(Inner {
|
||
subscribers: Mutex::new(Vec::new()),
|
||
sender: tx,
|
||
receiver: Mutex::new(Some(rx)),
|
||
running: AtomicBool::new(true),
|
||
started: AtomicBool::new(false),
|
||
}),
|
||
}
|
||
}
|
||
|
||
pub fn publish(&self, message: String) {
|
||
// Nachrichten werden in eine interne Queue gestellt und von einem
|
||
// Hintergrund-Thread an alle Subscriber verteilt.
|
||
//
|
||
// Falls der Empfänger bereits beendet wurde, ignorieren wir den Fehler
|
||
// still (Broker fährt gerade herunter).
|
||
let _ = self.inner.sender.send(message);
|
||
}
|
||
|
||
pub fn subscribe<F>(&self, f: F)
|
||
where
|
||
F: Fn(String) + Send + Sync + 'static,
|
||
{
|
||
let mut guard = self.inner.subscribers.lock().unwrap();
|
||
guard.push(Arc::new(f));
|
||
}
|
||
|
||
pub fn start(&self) {
|
||
// Idempotent: nur einmal einen Hintergrund-Thread starten, der
|
||
// Nachrichten aus der Queue liest und an Subscriber verteilt.
|
||
if self
|
||
.inner
|
||
.started
|
||
.swap(true, Ordering::SeqCst)
|
||
{
|
||
return;
|
||
}
|
||
|
||
let inner = Arc::clone(&self.inner);
|
||
let rx_opt = {
|
||
let mut guard = inner.receiver.lock().unwrap();
|
||
guard.take()
|
||
};
|
||
|
||
if let Some(rx) = rx_opt {
|
||
thread::spawn(move || {
|
||
// Arbeite Nachrichten ab, solange `running` true ist oder noch
|
||
// Nachrichten im Kanal vorhanden sind.
|
||
loop {
|
||
if !inner.running.load(Ordering::Relaxed) {
|
||
// Wir beenden trotzdem erst, wenn der Kanal leer oder
|
||
// getrennt ist – recv_timeout mit kurzer Wartezeit.
|
||
match rx.recv_timeout(Duration::from_millis(50)) {
|
||
Ok(msg) => dispatch_to_subscribers(&inner, &msg),
|
||
Err(mpsc::RecvTimeoutError::Timeout) => break,
|
||
Err(mpsc::RecvTimeoutError::Disconnected) => break,
|
||
}
|
||
} else {
|
||
match rx.recv_timeout(Duration::from_millis(100)) {
|
||
Ok(msg) => dispatch_to_subscribers(&inner, &msg),
|
||
Err(mpsc::RecvTimeoutError::Timeout) => continue,
|
||
Err(mpsc::RecvTimeoutError::Disconnected) => break,
|
||
}
|
||
}
|
||
}
|
||
});
|
||
}
|
||
}
|
||
|
||
pub fn stop(&self) {
|
||
// Signalisiert dem Hintergrund-Thread, dass er nach Abarbeiten der
|
||
// aktuellen Nachrichten-Schlange beenden soll.
|
||
self.inner.running.store(false, Ordering::SeqCst);
|
||
}
|
||
}
|
||
|
||
fn dispatch_to_subscribers(inner: &Inner, message: &str) {
|
||
let subs = {
|
||
let guard = inner.subscribers.lock().unwrap();
|
||
guard.clone()
|
||
};
|
||
|
||
for cb in subs {
|
||
cb(message.to_string());
|
||
}
|
||
}
|
||
|
||
|