// services/webSocketService.js import { Server } from 'socket.io'; import amqp from 'amqplib/callback_api.js'; const RABBITMQ_URL = process.env.AMQP_URL || 'amqp://localhost'; const QUEUE = 'chat_messages'; const MAX_PENDING_MESSAGES = 500; function routeMessage(io, message) { if (!message || typeof message !== 'object') return; if (message.socketId) { io.to(message.socketId).emit('newMessage', message); return; } if (message.recipientSocketId) { io.to(message.recipientSocketId).emit('newMessage', message); return; } if (message.roomId) { io.to(String(message.roomId)).emit('newMessage', message); return; } if (message.room) { io.to(String(message.room)).emit('newMessage', message); return; } io.emit('newMessage', message); } export function setupWebSocket(server) { const io = new Server(server); let channel = null; let pendingMessages = []; const flushPendingMessages = () => { if (!channel || pendingMessages.length === 0) return; const queued = pendingMessages; pendingMessages = []; for (const message of queued) { try { channel.sendToQueue(QUEUE, Buffer.from(JSON.stringify(message))); } catch (err) { console.warn('[webSocketService] Flush fehlgeschlagen, Nachricht bleibt im Fallback:', err.message); pendingMessages.unshift(message); break; } } }; amqp.connect(RABBITMQ_URL, (err, connection) => { if (err) { console.warn(`[webSocketService] RabbitMQ nicht erreichbar (${RABBITMQ_URL}) - WebSocket läuft ohne Queue-Bridge.`); return; } connection.on('error', (connectionError) => { console.warn('[webSocketService] RabbitMQ-Verbindung fehlerhaft:', connectionError.message); channel = null; }); connection.on('close', () => { console.warn('[webSocketService] RabbitMQ-Verbindung geschlossen.'); channel = null; }); connection.createChannel((channelError, createdChannel) => { if (channelError) { console.warn('[webSocketService] RabbitMQ-Channel konnte nicht erstellt werden:', channelError.message); return; } channel = createdChannel; channel.assertQueue(QUEUE, { durable: false }); channel.consume(QUEUE, (msg) => { if (!msg) return; const message = JSON.parse(msg.content.toString()); routeMessage(io, message); }, { noAck: true }); flushPendingMessages(); }); }); io.on('connection', (socket) => { console.log('Client connected via WebSocket'); socket.on('newMessage', (message) => { if (channel) { try { channel.sendToQueue(QUEUE, Buffer.from(JSON.stringify(message))); } catch (err) { console.warn('[webSocketService] sendToQueue fehlgeschlagen, nutze In-Memory-Fallback:', err.message); channel = null; } } if (!channel) { pendingMessages.push(message); if (pendingMessages.length > MAX_PENDING_MESSAGES) { pendingMessages = pendingMessages.slice(-MAX_PENDING_MESSAGES); } return; } }); socket.on('disconnect', () => { console.log('Client disconnected'); }); }); }