// services/webSocketService.js import { Server } from 'socket.io'; import amqp from 'amqplib/callback_api.js'; const RABBITMQ_URL = process.env.AMQP_URL || 'amqp://localhost'; const QUEUE = 'chat_messages'; export function setupWebSocket(server) { const io = new Server(server); let channel = null; amqp.connect(RABBITMQ_URL, (err, connection) => { if (err) { console.warn(`[webSocketService] RabbitMQ nicht erreichbar (${RABBITMQ_URL}) - WebSocket läuft ohne Queue-Bridge.`); return; } connection.on('error', (connectionError) => { console.warn('[webSocketService] RabbitMQ-Verbindung fehlerhaft:', connectionError.message); channel = null; }); connection.on('close', () => { console.warn('[webSocketService] RabbitMQ-Verbindung geschlossen.'); channel = null; }); connection.createChannel((channelError, createdChannel) => { if (channelError) { console.warn('[webSocketService] RabbitMQ-Channel konnte nicht erstellt werden:', channelError.message); return; } channel = createdChannel; channel.assertQueue(QUEUE, { durable: false }); channel.consume(QUEUE, (msg) => { if (!msg) return; const message = JSON.parse(msg.content.toString()); io.emit('newMessage', message); }, { noAck: true }); }); }); io.on('connection', (socket) => { console.log('Client connected via WebSocket'); socket.on('newMessage', (message) => { if (channel) { channel.sendToQueue(QUEUE, Buffer.from(JSON.stringify(message))); return; } io.emit('newMessage', message); }); socket.on('disconnect', () => { console.log('Client disconnected'); }); }); }