2017-01-19 4 views
0

Я объясню, что я хочу достичь, и тогда то, что я сделал (без результатов)Как повторно использовать очереди RabbitMQ с помощью обменов?

У меня есть две службы узлов, соединенных между собой с RabbitMQ Ussing обменам (тема):

enter image description here

Что Я хочу отключить C1, все еще отправляя сообщения something.orange.something. Затем я снова хочу перезагрузить мой C1 и получить все сообщения, которые я потерял.

Теперь со мной происходит то, что каждый раз, когда я перезапускаю свой потребитель, создается новая очередь и создается новое связывание в моем обмене с тем же ключом маршрутизации. Поэтому у меня есть две очереди, получающие ту же информацию.

Если я сконфигурировал свою очередь с параметром {exclusive: true}, я решаю часть проблемы, у меня больше нет очередей без приемников, но по-прежнему с той же проблемой ... все сообщения, отправленные без активного приемника, теряются.

все возможно?

Вот мой код:

отправителем:

'use strict'; 

const amqp = require('amqplib/callback_api'); 
const logatim = require('logatim'); 
logatim.setLevel('info') 

amqp.connect('amqp://localhost', (err, conn) => { 
    conn.createChannel((err, ch) => { 
    let ex = 'direct_colors'; 
    let args = process.argv.slice(2); 
    let colors = ['colors.en.green', 'colors.en.yellow', 'colors.es.red'] 

    ch.assertExchange(ex, 'topic', {durable: true}); 

    setInterval(() => { 
     let color = colors[Math.floor(Math.random() * 3)]; 
     let msg = `This is a ${color} message`; 
     ch.publish(ex, color, new Buffer(msg)); 

     logatim[color.split('.').pop()].info(msg); 
    }, 1000); 
    }); 
}); 

Reveiver:

'use strict'; 

const amqp = require('amqplib/callback_api'); 
const logatim = require('logatim'); 
logatim.setLevel('info'); 
const args = process.argv.slice(2); 

amqp.connect('amqp://localhost', (err, conn) => { 
    conn.createChannel((err, ch) => { 
    var ex = 'direct_colors'; 

    ch.assertExchange(ex, 'topic', {durable: true}); 

    ch.assertQueue('', {exclusive: true, durable: true}, (err, q) => { 
     logatim.green.info(' [*] Waiting for logs. To exit press CTRL+C'); 

     args.forEach((arg) => { 
     ch.bindQueue(q.queue, ex, arg); 
     }); 

     ch.consume(q.queue, (msg) => { 
     logatim[msg.fields.routingKey.split('.').pop()].info(` [x] ${msg.content.toString()}`); 
     }); 
    }); 
    }); 
}); 

ответ

1

Вам нужно именованные очереди. При объявлении очереди в своем классе приемника, дать ему хорошо известное имя (постоянный), что-то вроде

ch.assertQueue('my_service_1_queue', {durable: true}, ... 

основных примеров из них находятся в RabbitMQ Tutorial

Когда потребитель будет идти вниз и перезапуск, он будет потребляться из одной и той же очереди. ПРИМЕЧАНИЕ: вам не нужна эксклюзивная очередь, так как она будет удалена, когда потребитель упадет.

Смежные вопросы