Enhance WebSocket server connection management and error handling
- Introduce connection time tracking for WebSocket users to monitor connection duration. - Implement user ID management to allow dynamic updates and removal of connections based on user ID changes. - Add functionality to retrieve active connections, including unauthenticated ones, for administrative purposes. - Improve error handling during connection closure and ensure proper cleanup of connection entries.
This commit is contained in:
committed by
Torsten (PC)
parent
753c5929e1
commit
5ac8e9b484
@@ -36,8 +36,11 @@ if(NOT EXISTS "/etc/yourpart")
|
||||
message(STATUS "Erstelle Verzeichnis /etc/yourpart...")
|
||||
execute_process(
|
||||
COMMAND ${CMAKE_COMMAND} -E make_directory "/etc/yourpart"
|
||||
COMMAND_ERROR_IS_FATAL ANY
|
||||
RESULT_VARIABLE MKDIR_RESULT
|
||||
)
|
||||
if(NOT MKDIR_RESULT EQUAL 0)
|
||||
message(FATAL_ERROR "Konnte Verzeichnis /etc/yourpart nicht erstellen")
|
||||
endif()
|
||||
endif()
|
||||
|
||||
# Prüfe ob Config-Datei existiert
|
||||
@@ -45,8 +48,11 @@ if(NOT EXISTS "${CONFIG_FILE}")
|
||||
message(STATUS "Konfigurationsdatei existiert nicht, erstelle neue...")
|
||||
execute_process(
|
||||
COMMAND ${CMAKE_COMMAND} -E copy "${TEMPLATE_FILE}" "${CONFIG_FILE}"
|
||||
COMMAND_ERROR_IS_FATAL ANY
|
||||
RESULT_VARIABLE COPY_RESULT
|
||||
)
|
||||
if(NOT COPY_RESULT EQUAL 0)
|
||||
message(FATAL_ERROR "Konnte Konfigurationsdatei nicht erstellen: ${CONFIG_FILE}")
|
||||
endif()
|
||||
message(STATUS "Neue Konfigurationsdatei erstellt: ${CONFIG_FILE}")
|
||||
else()
|
||||
message(STATUS "Konfigurationsdatei existiert bereits, prüfe auf fehlende Keys...")
|
||||
@@ -144,16 +150,26 @@ if __name__ == '__main__':
|
||||
endif()
|
||||
endif()
|
||||
|
||||
# Setze korrekte Berechtigungen
|
||||
# Setze korrekte Berechtigungen (Fehler werden ignoriert, da Berechtigungen optional sind)
|
||||
execute_process(
|
||||
COMMAND chown yourpart:yourpart "${CONFIG_FILE}"
|
||||
COMMAND_ERROR_IS_FATAL FALSE
|
||||
RESULT_VARIABLE CHOWN_RESULT
|
||||
ERROR_QUIET
|
||||
)
|
||||
|
||||
if(NOT CHOWN_RESULT EQUAL 0)
|
||||
message(WARNING "Konnte Besitzer von ${CONFIG_FILE} nicht ändern (möglicherweise kein Root oder User existiert nicht)")
|
||||
endif()
|
||||
|
||||
execute_process(
|
||||
COMMAND chmod 600 "${CONFIG_FILE}"
|
||||
COMMAND_ERROR_IS_FATAL FALSE
|
||||
RESULT_VARIABLE CHMOD_RESULT
|
||||
ERROR_QUIET
|
||||
)
|
||||
|
||||
if(NOT CHMOD_RESULT EQUAL 0)
|
||||
message(WARNING "Konnte Berechtigungen von ${CONFIG_FILE} nicht ändern")
|
||||
endif()
|
||||
|
||||
message(STATUS "Konfigurationsdatei-Verwaltung abgeschlossen: ${CONFIG_FILE}")
|
||||
|
||||
|
||||
@@ -240,9 +240,17 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_ESTABLISHED: {
|
||||
ud->pongReceived = true;
|
||||
ud->connectionTime = std::chrono::steady_clock::now();
|
||||
ud->lastPingTime = std::chrono::steady_clock::now();
|
||||
ud->lastPongTime = std::chrono::steady_clock::now();
|
||||
ud->pingTimeoutCount = 0;
|
||||
|
||||
// Füge Verbindung zur Liste aller Verbindungen hinzu
|
||||
{
|
||||
std::unique_lock<std::shared_mutex> lock(instance->connectionsMutex);
|
||||
instance->allConnections.push_back(wsi);
|
||||
}
|
||||
|
||||
const char* protocolName = lws_get_protocol(wsi)->name;
|
||||
std::cout << "WebSocket-Verbindung hergestellt (Protokoll: " << (protocolName ? protocolName : "Standard") << ")" << std::endl;
|
||||
char client_addr[128];
|
||||
@@ -274,9 +282,20 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
||||
json parsed = json::parse(msg);
|
||||
std::cout << "[RECEIVE] Nachricht empfangen: " << msg << std::endl;
|
||||
|
||||
if (parsed.contains("event") && parsed["event"] == "setUserId") {
|
||||
if (parsed.contains("event")) {
|
||||
std::string event = parsed["event"].get<std::string>();
|
||||
|
||||
if (event == "setUserId") {
|
||||
if (parsed.contains("data") && parsed["data"].contains("userId")) {
|
||||
ud->userId = parsed["data"]["userId"].get<std::string>();
|
||||
std::string newUserId = parsed["data"]["userId"].get<std::string>();
|
||||
|
||||
// Wenn die Verbindung bereits unter einer anderen userId registriert ist, entferne die alte Registrierung
|
||||
if (!ud->userId.empty() && ud->userId != newUserId) {
|
||||
std::cout << "[RECEIVE] User-ID ändert sich von " << ud->userId << " zu " << newUserId << ", entferne alte Registrierung" << std::endl;
|
||||
instance->removeConnection(ud->userId, wsi);
|
||||
}
|
||||
|
||||
ud->userId = newUserId;
|
||||
std::cout << "[RECEIVE] User-ID gesetzt: " << ud->userId << std::endl;
|
||||
|
||||
// Verbindung in der Map speichern
|
||||
@@ -285,8 +304,45 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
||||
} else {
|
||||
std::cerr << "[RECEIVE] setUserId-Event ohne data.userId-Feld" << std::endl;
|
||||
}
|
||||
} else if (event == "getConnections") {
|
||||
// Admin-Funktion: Liste aller aktiven Verbindungen
|
||||
if (ud->userId.empty()) {
|
||||
std::cerr << "[RECEIVE] getConnections: User-ID nicht gesetzt" << std::endl;
|
||||
json errorResponse = {
|
||||
{"event", "getConnectionsResponse"},
|
||||
{"success", false},
|
||||
{"error", "User-ID nicht gesetzt"}
|
||||
};
|
||||
instance->sendMessageToConnection(wsi, errorResponse.dump());
|
||||
break;
|
||||
}
|
||||
|
||||
// Prüfe Mainadmin-Rechte
|
||||
if (!instance->isMainAdmin(ud->userId)) {
|
||||
std::cerr << "[RECEIVE] getConnections: Zugriff verweigert für User " << ud->userId << std::endl;
|
||||
json errorResponse = {
|
||||
{"event", "getConnectionsResponse"},
|
||||
{"success", false},
|
||||
{"error", "Zugriff verweigert: Nur Mainadmin-User können Verbindungen abfragen"}
|
||||
};
|
||||
instance->sendMessageToConnection(wsi, errorResponse.dump());
|
||||
break;
|
||||
}
|
||||
|
||||
// Hole aktive Verbindungen
|
||||
json connections = instance->getActiveConnections();
|
||||
json response = {
|
||||
{"event", "getConnectionsResponse"},
|
||||
{"success", true},
|
||||
{"data", connections}
|
||||
};
|
||||
instance->sendMessageToConnection(wsi, response.dump());
|
||||
std::cout << "[RECEIVE] getConnections: Verbindungen an Mainadmin gesendet" << std::endl;
|
||||
} else {
|
||||
std::cout << "[RECEIVE] Ignoriere Nachricht (kein setUserId-Event)" << std::endl;
|
||||
std::cout << "[RECEIVE] Unbekanntes Event: " << event << std::endl;
|
||||
}
|
||||
} else {
|
||||
std::cout << "[RECEIVE] Nachricht ohne event-Feld" << std::endl;
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
std::cerr << "[RECEIVE] Fehler beim Parsen der WebSocket-Nachricht: " << e.what() << std::endl;
|
||||
@@ -332,17 +388,37 @@ int WebSocketServer::wsCallback(struct lws *wsi,
|
||||
}
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
// Verbindung aus der Map entfernen
|
||||
if (ud && !ud->userId.empty()) {
|
||||
if (ud) {
|
||||
if (!ud->userId.empty()) {
|
||||
instance->removeConnection(ud->userId, wsi);
|
||||
std::cout << "WebSocket-Verbindung geschlossen für User: " << ud->userId << std::endl;
|
||||
} else {
|
||||
std::cout << "WebSocket-Verbindung geschlossen (ohne User-ID)" << std::endl;
|
||||
// Falls keine userId gesetzt ist, entferne die Verbindung aus allen möglichen Einträgen
|
||||
// (Fallback für den Fall, dass setUserId nie aufgerufen wurde)
|
||||
instance->removeConnectionByWsi(wsi);
|
||||
std::cout << "WebSocket-Verbindung geschlossen (ohne User-ID, entferne aus allen Einträgen)" << std::endl;
|
||||
}
|
||||
} else {
|
||||
std::cout << "WebSocket-Verbindung geschlossen (ud ist nullptr)" << std::endl;
|
||||
}
|
||||
|
||||
// Entferne aus allConnections
|
||||
{
|
||||
std::unique_lock<std::shared_mutex> lock(instance->connectionsMutex);
|
||||
instance->allConnections.erase(
|
||||
std::remove(instance->allConnections.begin(), instance->allConnections.end(), wsi),
|
||||
instance->allConnections.end()
|
||||
);
|
||||
}
|
||||
break;
|
||||
case LWS_CALLBACK_WSI_DESTROY:
|
||||
// Verbindung wird zerstört - aufräumen falls nötig
|
||||
if (ud && !ud->userId.empty()) {
|
||||
if (ud) {
|
||||
if (!ud->userId.empty()) {
|
||||
instance->removeConnection(ud->userId, wsi);
|
||||
} else {
|
||||
instance->removeConnectionByWsi(wsi);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
||||
@@ -453,6 +529,124 @@ std::string WebSocketServer::getUserIdFromFalukantUserId(int userId) {
|
||||
return (!res.empty()) ? res[0]["hashed_id"] : std::string();
|
||||
}
|
||||
|
||||
bool WebSocketServer::isMainAdmin(const std::string &hashedUserId) {
|
||||
ConnectionGuard guard(pool);
|
||||
auto &db = guard.get();
|
||||
std::string sql = R"(
|
||||
SELECT COUNT(*) as count
|
||||
FROM community.user u
|
||||
JOIN community.user_right ur ON u.id = ur.user_id
|
||||
JOIN "type".user_right tr ON ur.right_type_id = tr.id
|
||||
WHERE u.hashed_id = $1
|
||||
AND tr.title = 'mainadmin'
|
||||
)";
|
||||
db.prepare("check_mainadmin", sql);
|
||||
auto res = db.execute("check_mainadmin", {hashedUserId});
|
||||
if (res.empty()) {
|
||||
return false;
|
||||
}
|
||||
int count = std::stoi(res[0]["count"].c_str());
|
||||
return count > 0;
|
||||
}
|
||||
|
||||
json WebSocketServer::getActiveConnections() {
|
||||
json result = json::array();
|
||||
|
||||
std::shared_lock<std::shared_mutex> lock(connectionsMutex);
|
||||
|
||||
// Zähle Verbindungen ohne userId
|
||||
size_t unauthenticatedCount = 0;
|
||||
for (auto* wsi : allConnections) {
|
||||
auto* ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
||||
if (ud && ud->userId.empty()) {
|
||||
unauthenticatedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
// Iteriere über die Member-Variable this->connections (nicht die lokale Variable)
|
||||
for (const auto& pair : this->connections) {
|
||||
const std::string& userId = pair.first;
|
||||
const auto& connList = pair.second;
|
||||
|
||||
json userConnections = {
|
||||
{"userId", userId},
|
||||
{"connectionCount", connList.size()},
|
||||
{"connections", json::array()}
|
||||
};
|
||||
|
||||
for (auto* wsi : connList) {
|
||||
auto* ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
||||
if (ud) {
|
||||
// Berechne Verbindungsdauer seit ESTABLISHED
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
auto connectionDuration = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
now - ud->connectionTime).count();
|
||||
|
||||
// Berechne Zeit seit letztem Pong
|
||||
auto timeSinceLastPong = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
now - ud->lastPongTime).count();
|
||||
|
||||
json connInfo = {
|
||||
{"connectionDurationSeconds", connectionDuration},
|
||||
{"timeSinceLastPongSeconds", timeSinceLastPong},
|
||||
{"pingTimeoutCount", ud->pingTimeoutCount},
|
||||
{"pongReceived", ud->pongReceived}
|
||||
};
|
||||
userConnections["connections"].push_back(connInfo);
|
||||
}
|
||||
}
|
||||
|
||||
result.push_back(userConnections);
|
||||
}
|
||||
|
||||
// Füge unauthentifizierte Verbindungen hinzu
|
||||
if (unauthenticatedCount > 0) {
|
||||
json unauthenticatedConnections = {
|
||||
{"userId", ""},
|
||||
{"connectionCount", unauthenticatedCount},
|
||||
{"connections", json::array()}
|
||||
};
|
||||
|
||||
for (auto* wsi : allConnections) {
|
||||
auto* ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
||||
if (ud && ud->userId.empty()) {
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
auto connectionDuration = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
now - ud->connectionTime).count();
|
||||
auto timeSinceLastPong = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
now - ud->lastPongTime).count();
|
||||
|
||||
json connInfo = {
|
||||
{"connectionDurationSeconds", connectionDuration},
|
||||
{"timeSinceLastPongSeconds", timeSinceLastPong},
|
||||
{"pingTimeoutCount", ud->pingTimeoutCount},
|
||||
{"pongReceived", ud->pongReceived},
|
||||
{"status", "unauthenticated"}
|
||||
};
|
||||
unauthenticatedConnections["connections"].push_back(connInfo);
|
||||
}
|
||||
}
|
||||
|
||||
result.push_back(unauthenticatedConnections);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void WebSocketServer::sendMessageToConnection(struct lws *wsi, const std::string &message) {
|
||||
if (!wsi) return;
|
||||
|
||||
auto* ud = reinterpret_cast<WebSocketUserData*>(lws_wsi_user(wsi));
|
||||
if (!ud) return;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(ud->messageQueueMutex);
|
||||
ud->messageQueue.push(message);
|
||||
}
|
||||
|
||||
lws_callback_on_writable(wsi);
|
||||
}
|
||||
|
||||
void WebSocketServer::setWorkers(const std::vector<std::unique_ptr<Worker>> &workerList) {
|
||||
workers.clear();
|
||||
workers.reserve(workerList.size());
|
||||
@@ -495,4 +689,45 @@ void WebSocketServer::removeConnection(const std::string &userId, struct lws *ws
|
||||
} else {
|
||||
std::cout << "[removeConnection] Warnung: Keine Verbindungen für User " << userId << " gefunden" << std::endl;
|
||||
}
|
||||
|
||||
// Entferne auch aus allConnections
|
||||
allConnections.erase(
|
||||
std::remove(allConnections.begin(), allConnections.end(), wsi),
|
||||
allConnections.end()
|
||||
);
|
||||
}
|
||||
|
||||
void WebSocketServer::removeConnectionByWsi(struct lws *wsi) {
|
||||
// Entfernt eine Verbindung aus allen Einträgen in der connections-Map
|
||||
// Wird verwendet, wenn die userId nicht bekannt ist (z.B. bei vorzeitigem Schließen)
|
||||
std::unique_lock<std::shared_mutex> lock(connectionsMutex);
|
||||
|
||||
std::vector<std::string> usersToRemove;
|
||||
|
||||
for (auto it = connections.begin(); it != connections.end(); ++it) {
|
||||
auto& connList = it->second;
|
||||
auto wsiIt = std::find(connList.begin(), connList.end(), wsi);
|
||||
|
||||
if (wsiIt != connList.end()) {
|
||||
connList.erase(wsiIt);
|
||||
std::cout << "[removeConnectionByWsi] Verbindung entfernt von User " << it->first << std::endl;
|
||||
|
||||
// Wenn keine Verbindungen mehr vorhanden sind, markiere für Entfernung
|
||||
if (connList.empty()) {
|
||||
usersToRemove.push_back(it->first);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Entferne leere Einträge
|
||||
for (const auto& userId : usersToRemove) {
|
||||
connections.erase(userId);
|
||||
std::cout << "[removeConnectionByWsi] Leeren Eintrag für User " << userId << " entfernt" << std::endl;
|
||||
}
|
||||
|
||||
// Entferne auch aus allConnections
|
||||
allConnections.erase(
|
||||
std::remove(allConnections.begin(), allConnections.end(), wsi),
|
||||
allConnections.end()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ struct WebSocketUserData {
|
||||
bool pongReceived = true;
|
||||
std::queue<std::string> messageQueue;
|
||||
std::mutex messageQueueMutex;
|
||||
std::chrono::steady_clock::time_point connectionTime; // Zeitpunkt der Verbindungsherstellung
|
||||
std::chrono::steady_clock::time_point lastPingTime;
|
||||
std::chrono::steady_clock::time_point lastPongTime;
|
||||
int pingTimeoutCount = 0;
|
||||
@@ -48,8 +49,12 @@ private:
|
||||
void pingClients();
|
||||
void handleBrokerMessage(const std::string &message);
|
||||
std::string getUserIdFromFalukantUserId(int falukantUserId);
|
||||
bool isMainAdmin(const std::string &hashedUserId);
|
||||
json getActiveConnections();
|
||||
void sendMessageToConnection(struct lws *wsi, const std::string &message);
|
||||
void addConnection(const std::string &userId, struct lws *wsi);
|
||||
void removeConnection(const std::string &userId, struct lws *wsi);
|
||||
void removeConnectionByWsi(struct lws *wsi); // Entfernt Verbindung aus allen Einträgen (Fallback)
|
||||
|
||||
static int wsCallback(struct lws *wsi,
|
||||
enum lws_callback_reasons reason,
|
||||
@@ -74,6 +79,7 @@ private:
|
||||
|
||||
std::shared_mutex connectionsMutex;
|
||||
std::unordered_map<std::string, std::vector<struct lws*>> connections;
|
||||
std::vector<struct lws*> allConnections; // Alle aktiven Verbindungen (auch ohne userId)
|
||||
|
||||
std::vector<Worker*> workers;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user