2016-09-16 3 views
1

Я пытаюсь реализовать решение с использованием RabbitMQ для достижения чего-то вроде распределенного RPC, используя только один запрос и одну очередь ответов для большого числа процессоров, я уже реализовал такое решение с Apache Apollo и я бы хотел, чтобы смог перенести его в RabbitMQ. Вот ключевые моменты:RabbitMQ: использование маршрутизации для достижения выбора сообщений

  • Каждый серверов подключается к очереди запросов только
  • Каждый сервер обрабатывает запросы, которые должны быть для него (поле заголовка)

В моей реализации для Аполлона ключевым моментом было использование селекторов (например, where clauses on values ​​of fields fields), я думал, что это достигнуто в RabbitMQ с помощью маршрутизации и маршрутизации, но я должен быть неправ, потому что я вижу, что рабочие получают сообщения, которые не должны быть для них ,

Я изменил образец маршрутизации (http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html), чтобы воспроизвести проблему. У меня есть два пользователя, с которыми я могу начать с разных параметров, определяющих routingKey, и производителя, который генерировал сообщения для одного из потребителей. Поведение, которое я вижу, заключается в том, что потребление сообщений кажется случайным (сообщение для «Джона» обрабатывается потребителем для «Джона» в первый раз, а потребителем - для «Мэри» второй раз)

У кого-нибудь есть индикация или фрагменты кода при использовании селекторов в RabbitMQ?

Ниже мой код для потребителя:

public static void Main(String[] args) 
{ 
    var factory = new ConnectionFactory { HostName = "localhost" }; 
    using (var connection = factory.CreateConnection()) 
     using (var channel = connection.CreateModel()) 
     { 
      const String request = "request"; 
      channel.ExchangeDeclare(request, "direct"); 

      channel.QueueDeclare(request, true, false, false, null); 

      if (args.Length < 1) 
      { 
       Console.WriteLine(" Press [enter] to exit."); 
       Console.ReadLine(); 
       Environment.ExitCode = 1; 
       return; 
      } 

      var myRoutingKey = args[0]; 
      channel.QueueBind(request, request, myRoutingKey); 

      Console.WriteLine($" [*] Waiting for messages for {myRoutingKey}."); 

      var consumer = new EventingBasicConsumer(channel); 
      consumer.Received += (model, ea) => 
      { 
       var body = ea.Body; 
       var message = Encoding.UTF8.GetString(body); 
       var routingKey = ea.RoutingKey; 
       Console.WriteLine($" [x] Received '{routingKey}':'{message}'"); 
      }; 
      channel.BasicConsume(request, true, consumer); 

      Console.WriteLine(" Press [enter] to exit."); 
      Console.ReadLine(); 
     } 
} 

и для производителя:

public static void Main(String[] args) 
{ 
    var factory = new ConnectionFactory { HostName = "localhost" }; 
    using (var connection = factory.CreateConnection()) 
     using (var channel = connection.CreateModel()) 
     { 
      const String request = "request"; 
      channel.ExchangeDeclare(request, "direct"); 

      channel.QueueDeclare(request, true, false, false, null); 

      var routingKey = args.Length > 0 ? args[0] : "John"; 

      const String message = "Hi"; 
      var body = Encoding.UTF8.GetBytes(message); 
      channel.BasicPublish(request, routingKey, null, body); 
      Console.WriteLine($" [x] Sent '{routingKey}':'{message}'"); 
     } 

    Console.WriteLine(" Press [enter] to exit."); 
    Console.ReadLine(); 
} 

Спасибо заранее.

ответ

0

Я могу догадаться, почему это не сработало для вас. Ключом являются эти две линии у вашего потребителя.

channel.QueueDeclare(request, true, false, false, null); 
channel.QueueBind(request, request, myRoutingKey); 

И тот факт, что «запрос» - это имя вашей очереди для «всех» ваших потребителей. Если вы запустите эту программу, установив несколько привязок с помощью нескольких ключей маршрутизации, конечным результатом является то, что очередь с именем «запрос» привязана к вашему обмену с несколькими ключами маршрутизации (например, «Джон», «Мэри»). Помните, что когда вы выполняете эту привязку, привязка не прерывается на сервере RabbitMQ, и они остаются.

Теперь вернемся к тому, как решить вашу проблему. Есть несколько вариантов, но вот один из них. Сначала я рекомендую прочитать RabbitMQ Model.

Вы можете использовать один и тот же код учебника, который имеет эти строки, а не ваши:

var queueName = channel.QueueDeclare().QueueName; 
channel.QueueBind(queueName, request, myRoutingKey); 

Но выше означает, что каждый раз, когда вы запускаете вашу потребительскую программу новой очередь создаются и связанным с обменом с нужными ключ маршрутизации. Альтернативой является использование того же самого кода, который вы делали ранее, но просто выберите свое имя очереди, а не имя фиксированной очереди. Например, вы можете иметь одну очередь за маршрутизацию ключа

var queueName = myRoutingKey ; 
channel.QueueDeclare(queueName, true, false, false, null); 
channel.QueueBind(queueName, request, myRoutingKey); 

Или же вы можете сгруппировать несколько ключей маршрутизации в одной очереди, похожие на образец учебника.

Дело в том, что вам нужно сделать, не может быть сделано с одной очередью. (За исключением случаев, когда вы отфильтровываете сообщения, когда вы их потребляете). Но это не звучало как реальное требование для вас. Вы спросили, что каждый потребительский сервер имеет дело только с соответствующими сообщениями, которые вы можете сделать в этой модели. Производители публикуют только этот обмен, хотя (это то, что вы хотели).

+0

Привет, Амин, спасибо за ваш ответ. Я также обнаружил, что при использовании динамически генерируемых очередей проблема не возникнет. В любом случае я искал решение только с запросом и очередью ответов как из-за проблем с производительностью, так и из-за того, что другое решение, которое я реализовал ранее с другим брокером, было таким. Узнав другую модель, последнее не является проблемой, просто требует некоторой редизайна, но я определенно должен проверить результаты. – Leon

+0

Кроме того, используя динамически генерируемые очереди, я теряю все сообщения от производителей, которые генерируются, пока пользователи маршрутизации не работают. Я также пытался объявить обмены прочными, но это не помогает. – Leon

+0

Динамически сгенерированные очереди будут по существу временными очередями для вас, так как вы потеряете имя очереди, которую я предполагаю? Я не считаю, что динамические имена очередей будут решением для вас, просто используйте согласованные имена (подумайте о них как о своих селекторах). Для этого не существует производительности. Для этого предназначен RabbitMQ. –

Смежные вопросы