112 lines
3.6 KiB
JavaScript
112 lines
3.6 KiB
JavaScript
// services/webSocketService.js
|
|
import { Server } from 'socket.io';
|
|
import amqp from 'amqplib/callback_api.js';
|
|
|
|
const RABBITMQ_URL = process.env.AMQP_URL || 'amqp://localhost';
|
|
const QUEUE = 'chat_messages';
|
|
const MAX_PENDING_MESSAGES = 500;
|
|
|
|
function routeMessage(io, message) {
|
|
if (!message || typeof message !== 'object') return;
|
|
|
|
if (message.socketId) {
|
|
io.to(message.socketId).emit('newMessage', message);
|
|
return;
|
|
}
|
|
if (message.recipientSocketId) {
|
|
io.to(message.recipientSocketId).emit('newMessage', message);
|
|
return;
|
|
}
|
|
if (message.roomId) {
|
|
io.to(String(message.roomId)).emit('newMessage', message);
|
|
return;
|
|
}
|
|
if (message.room) {
|
|
io.to(String(message.room)).emit('newMessage', message);
|
|
return;
|
|
}
|
|
|
|
io.emit('newMessage', message);
|
|
}
|
|
|
|
export function setupWebSocket(server) {
|
|
const io = new Server(server);
|
|
let channel = null;
|
|
let pendingMessages = [];
|
|
|
|
const flushPendingMessages = () => {
|
|
if (!channel || pendingMessages.length === 0) return;
|
|
const queued = pendingMessages;
|
|
pendingMessages = [];
|
|
for (const message of queued) {
|
|
try {
|
|
channel.sendToQueue(QUEUE, Buffer.from(JSON.stringify(message)));
|
|
} catch (err) {
|
|
console.warn('[webSocketService] Flush fehlgeschlagen, Nachricht bleibt im Fallback:', err.message);
|
|
pendingMessages.unshift(message);
|
|
break;
|
|
}
|
|
}
|
|
};
|
|
|
|
amqp.connect(RABBITMQ_URL, (err, connection) => {
|
|
if (err) {
|
|
console.warn(`[webSocketService] RabbitMQ nicht erreichbar (${RABBITMQ_URL}) - WebSocket läuft ohne Queue-Bridge.`);
|
|
return;
|
|
}
|
|
|
|
connection.on('error', (connectionError) => {
|
|
console.warn('[webSocketService] RabbitMQ-Verbindung fehlerhaft:', connectionError.message);
|
|
channel = null;
|
|
});
|
|
|
|
connection.on('close', () => {
|
|
console.warn('[webSocketService] RabbitMQ-Verbindung geschlossen.');
|
|
channel = null;
|
|
});
|
|
|
|
connection.createChannel((channelError, createdChannel) => {
|
|
if (channelError) {
|
|
console.warn('[webSocketService] RabbitMQ-Channel konnte nicht erstellt werden:', channelError.message);
|
|
return;
|
|
}
|
|
|
|
channel = createdChannel;
|
|
channel.assertQueue(QUEUE, { durable: false });
|
|
channel.consume(QUEUE, (msg) => {
|
|
if (!msg) return;
|
|
const message = JSON.parse(msg.content.toString());
|
|
routeMessage(io, message);
|
|
}, { noAck: true });
|
|
flushPendingMessages();
|
|
});
|
|
});
|
|
|
|
io.on('connection', (socket) => {
|
|
console.log('Client connected via WebSocket');
|
|
|
|
socket.on('newMessage', (message) => {
|
|
if (channel) {
|
|
try {
|
|
channel.sendToQueue(QUEUE, Buffer.from(JSON.stringify(message)));
|
|
} catch (err) {
|
|
console.warn('[webSocketService] sendToQueue fehlgeschlagen, nutze In-Memory-Fallback:', err.message);
|
|
channel = null;
|
|
}
|
|
}
|
|
|
|
if (!channel) {
|
|
pendingMessages.push(message);
|
|
if (pendingMessages.length > MAX_PENDING_MESSAGES) {
|
|
pendingMessages = pendingMessages.slice(-MAX_PENDING_MESSAGES);
|
|
}
|
|
return;
|
|
}
|
|
});
|
|
|
|
socket.on('disconnect', () => {
|
|
console.log('Client disconnected');
|
|
});
|
|
});
|
|
}
|