Files
yourpart3/src/message_broker.cpp
Torsten Schulz (local) bd961a03d4 Aktualisiere WebSocket-Server und Daemon-Konfiguration
- Ändere die Pfade für SSL-Zertifikate in der Konfigurationsdatei.
- Verbessere die Fehlerbehandlung beim Entfernen alter vorbereiteter Anweisungen in HouseWorker.
- Füge Debug-Ausgaben zur Nachverfolgung von Verbindungen und Nachrichten im WebSocket-Server hinzu.
- Implementiere Timeout-Logik für das Stoppen von Worker- und Watchdog-Threads.
- Optimiere die Signalverarbeitung und Shutdown-Logik in main.cpp für bessere Responsivität.
2026-01-14 14:38:42 +01:00

46 lines
1.3 KiB
C++

#include "message_broker.h"
#include <iostream>
void MessageBroker::publish(const std::string &message) {
std::lock_guard<std::mutex> lock(mutex);
std::cout << "[MessageBroker] Nachricht gepubliziert: " << message << std::endl;
messageQueue.push(message);
cv.notify_all();
}
void MessageBroker::subscribe(const MessageCallback &callback) {
std::lock_guard<std::mutex> lock(mutex);
subscribers.push_back(callback);
}
void MessageBroker::start() {
running = true;
brokerThread = std::thread([this]() { processMessages(); });
}
void MessageBroker::stop() {
running = false;
cv.notify_all();
if (brokerThread.joinable()) {
brokerThread.join();
}
}
void MessageBroker::processMessages() {
while (running) {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this]() { return !messageQueue.empty() || !running; });
if (!running) break;
while (!messageQueue.empty()) {
std::string message = std::move(messageQueue.front());
messageQueue.pop();
lock.unlock();
std::cout << "[MessageBroker] Sende Nachricht an " << subscribers.size() << " Subscribers: " << message << std::endl;
for (const auto &callback : subscribers) {
callback(message);
}
lock.lock();
}
}
}