Füge Unterstützung für die Verwaltung von WebSocket-Verbindungen hinzu. Implementiere Methoden zum Hinzufügen und Entfernen von Verbindungen basierend auf Benutzer-IDs. Aktualisiere die WebSocket-Callback-Logik, um empfangene Nachrichten zu verarbeiten und Benutzer-IDs zu setzen. Verbessere die Ausgabe von Debug-Informationen zur Nachverfolgung von Verbindungen und Nachrichten.
This commit is contained in:
committed by
Torsten (PC)
parent
c9dc891481
commit
e7a8dc86eb
@@ -18,12 +18,18 @@ struct lws_protocols WebSocketServer::protocols[] = {
|
|||||||
{ nullptr, nullptr, 0, 0 }
|
{ nullptr, nullptr, 0, 0 }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Static instance pointer
|
||||||
|
WebSocketServer* WebSocketServer::instance = nullptr;
|
||||||
|
|
||||||
WebSocketServer::WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker,
|
WebSocketServer::WebSocketServer(int port, ConnectionPool &pool, MessageBroker &broker,
|
||||||
bool useSSL, const std::string& certPath, const std::string& keyPath)
|
bool useSSL, const std::string& certPath, const std::string& keyPath)
|
||||||
: port(port), pool(pool), broker(broker), useSSL(useSSL), certPath(certPath), keyPath(keyPath) {}
|
: port(port), pool(pool), broker(broker), useSSL(useSSL), certPath(certPath), keyPath(keyPath) {
|
||||||
|
instance = this;
|
||||||
|
}
|
||||||
|
|
||||||
WebSocketServer::~WebSocketServer() {
|
WebSocketServer::~WebSocketServer() {
|
||||||
stop();
|
stop();
|
||||||
|
instance = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void WebSocketServer::run() {
|
void WebSocketServer::run() {
|
||||||
@@ -108,24 +114,55 @@ void WebSocketServer::pingClients() {
|
|||||||
int WebSocketServer::wsCallback(struct lws *wsi,
|
int WebSocketServer::wsCallback(struct lws *wsi,
|
||||||
enum lws_callback_reasons reason,
|
enum lws_callback_reasons reason,
|
||||||
void *user, void *in, size_t len) {
|
void *user, void *in, size_t len) {
|
||||||
|
if (!instance) return 0;
|
||||||
|
|
||||||
auto *ud = reinterpret_cast<WebSocketUserData*>(user);
|
auto *ud = reinterpret_cast<WebSocketUserData*>(user);
|
||||||
switch (reason) {
|
switch (reason) {
|
||||||
case LWS_CALLBACK_ESTABLISHED:
|
case LWS_CALLBACK_ESTABLISHED:
|
||||||
ud->pongReceived = true;
|
ud->pongReceived = true;
|
||||||
|
std::cout << "WebSocket-Verbindung hergestellt" << std::endl;
|
||||||
break;
|
break;
|
||||||
case LWS_CALLBACK_RECEIVE: {
|
case LWS_CALLBACK_RECEIVE: {
|
||||||
std::string msg(reinterpret_cast<char*>(in), len);
|
std::string msg(reinterpret_cast<char*>(in), len);
|
||||||
// Here you would dispatch the received message to handleBrokerMessage or handleWebSocketMessage
|
std::cout << "WebSocket-Nachricht empfangen: " << msg << std::endl;
|
||||||
|
|
||||||
|
try {
|
||||||
|
json parsed = json::parse(msg);
|
||||||
|
if (parsed.contains("event") && parsed["event"] == "setUserId") {
|
||||||
|
if (parsed.contains("data") && parsed["data"].contains("userId")) {
|
||||||
|
ud->userId = parsed["data"]["userId"];
|
||||||
|
std::cout << "User-ID gesetzt: " << ud->userId << std::endl;
|
||||||
|
|
||||||
|
// Verbindung in der Map speichern
|
||||||
|
instance->addConnection(ud->userId, wsi);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
std::cerr << "Fehler beim Parsen der WebSocket-Nachricht: " << e.what() << std::endl;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
||||||
unsigned char buf[LWS_PRE + 4];
|
// Prüfe ob es eine Nachricht zum Senden gibt
|
||||||
memcpy(buf + LWS_PRE, "ping", 4);
|
if (ud->pendingMessage.empty()) {
|
||||||
lws_write(wsi, buf + LWS_PRE, 4, LWS_WRITE_TEXT);
|
// Ping senden
|
||||||
|
unsigned char buf[LWS_PRE + 4];
|
||||||
|
memcpy(buf + LWS_PRE, "ping", 4);
|
||||||
|
lws_write(wsi, buf + LWS_PRE, 4, LWS_WRITE_TEXT);
|
||||||
|
} else {
|
||||||
|
// Nachricht senden
|
||||||
|
unsigned char buf[LWS_PRE + ud->pendingMessage.length()];
|
||||||
|
memcpy(buf + LWS_PRE, ud->pendingMessage.c_str(), ud->pendingMessage.length());
|
||||||
|
lws_write(wsi, buf + LWS_PRE, ud->pendingMessage.length(), LWS_WRITE_TEXT);
|
||||||
|
ud->pendingMessage.clear();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case LWS_CALLBACK_CLOSED:
|
case LWS_CALLBACK_CLOSED:
|
||||||
// Remove closed connection if stored
|
// Verbindung aus der Map entfernen
|
||||||
|
if (!ud->userId.empty()) {
|
||||||
|
instance->removeConnection(ud->userId);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
@@ -139,10 +176,22 @@ void WebSocketServer::handleBrokerMessage(const std::string &message) {
|
|||||||
if (parsed.contains("user_id")) {
|
if (parsed.contains("user_id")) {
|
||||||
int fid = parsed["user_id"].get<int>();
|
int fid = parsed["user_id"].get<int>();
|
||||||
auto userId = getUserIdFromFalukantUserId(fid);
|
auto userId = getUserIdFromFalukantUserId(fid);
|
||||||
|
std::cout << "Broker-Nachricht für Falukant-User " << fid << " -> User-ID " << userId << std::endl;
|
||||||
|
|
||||||
std::shared_lock<std::shared_mutex> lock(connectionsMutex);
|
std::shared_lock<std::shared_mutex> lock(connectionsMutex);
|
||||||
auto it = connections.find(userId);
|
auto it = connections.find(userId);
|
||||||
if (it != connections.end()) {
|
if (it != connections.end()) {
|
||||||
|
std::cout << "Sende Nachricht an User " << userId << ": " << message << std::endl;
|
||||||
|
|
||||||
|
// Nachricht in der UserData speichern
|
||||||
|
auto *ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(it->second));
|
||||||
|
if (ud) {
|
||||||
|
ud->pendingMessage = message;
|
||||||
|
}
|
||||||
|
|
||||||
lws_callback_on_writable(it->second);
|
lws_callback_on_writable(it->second);
|
||||||
|
} else {
|
||||||
|
std::cout << "Keine aktive Verbindung für User " << userId << " gefunden" << std::endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (const std::exception &e) {
|
} catch (const std::exception &e) {
|
||||||
@@ -171,3 +220,15 @@ void WebSocketServer::setWorkers(const std::vector<std::unique_ptr<Worker>> &wor
|
|||||||
workers.push_back(wptr.get());
|
workers.push_back(wptr.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void WebSocketServer::addConnection(const std::string &userId, struct lws *wsi) {
|
||||||
|
std::unique_lock<std::shared_mutex> lock(connectionsMutex);
|
||||||
|
connections[userId] = wsi;
|
||||||
|
std::cout << "Verbindung für User " << userId << " gespeichert" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
void WebSocketServer::removeConnection(const std::string &userId) {
|
||||||
|
std::unique_lock<std::shared_mutex> lock(connectionsMutex);
|
||||||
|
connections.erase(userId);
|
||||||
|
std::cout << "Verbindung für User " << userId << " entfernt" << std::endl;
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
struct WebSocketUserData {
|
struct WebSocketUserData {
|
||||||
std::string userId;
|
std::string userId;
|
||||||
bool pongReceived = true;
|
bool pongReceived = true;
|
||||||
|
std::string pendingMessage;
|
||||||
};
|
};
|
||||||
|
|
||||||
class Worker; // forward
|
class Worker; // forward
|
||||||
@@ -39,6 +40,8 @@ private:
|
|||||||
void pingClients();
|
void pingClients();
|
||||||
void handleBrokerMessage(const std::string &message);
|
void handleBrokerMessage(const std::string &message);
|
||||||
std::string getUserIdFromFalukantUserId(int falukantUserId);
|
std::string getUserIdFromFalukantUserId(int falukantUserId);
|
||||||
|
void addConnection(const std::string &userId, struct lws *wsi);
|
||||||
|
void removeConnection(const std::string &userId);
|
||||||
|
|
||||||
static int wsCallback(struct lws *wsi,
|
static int wsCallback(struct lws *wsi,
|
||||||
enum lws_callback_reasons reason,
|
enum lws_callback_reasons reason,
|
||||||
@@ -67,4 +70,5 @@ private:
|
|||||||
std::vector<Worker*> workers;
|
std::vector<Worker*> workers;
|
||||||
|
|
||||||
static struct lws_protocols protocols[];
|
static struct lws_protocols protocols[];
|
||||||
|
static WebSocketServer* instance;
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user