2014-02-18 4 views
6

У нас есть служба Windows, которая слушает одиночную очередь RabbitMQ и обрабатывает сообщение.многопотоковый RabbitMQ потребитель

Мы хотели бы расширить такие же службы Windows, чтобы он мог прослушивать несколько очередей RabbitMQ и обрабатывать сообщение.

Не уверен, что это возможно, используя многопоточность, поскольку каждый поток должен будет отображать (блокировать) очередь.

Поскольку я очень новичок в многопоточности, необходимо руководство высокого уровня по следующему пункту, что поможет мне начать создание прототипа.

  1. Возможно ли прослушивание нескольких очередей в одном приложении с помощью потоков?
  2. Как обращаться с ситуацией, когда какая-либо нить получила крик посеяны (из-за исключения и т. Д.), Как вернуть без перезапуска услуги .
  3. Любой шаблон дизайна или реализация с открытым исходным кодом, которая может помочь мне справиться с такой ситуацией.
+0

Возможно, дубликат: [Как реализовать модель с одним-бытовым множеством очереди для rabbitMQ] (http://stackoverflow.com/q/11357512/1768303) – Noseratio

+0

@Noseratio - нет. Я не спрашиваю о том, очередь. Будет несколько очередей с несколькими очередями, но реализация должна выполняться при использовании служб с одним окном. Поэтому вместо того, чтобы писать несколько служб Windows для каждого пользователя очереди, я хотел бы написать отдельные службы Windows, которые будут прослушивать несколько очередей и обрабатывать сообщение. – Mahesh

+0

Я согласен, это не похоже на дубликат. – theMayer

ответ

6

Мне нравится, как вы написали свой вопрос - он начался очень широко и сфокусирован на специфике. Я успешно реализовал что-то очень похожее, и в настоящее время я работаю над проектом с открытым исходным кодом, чтобы извлечь уроки и вернуть их сообществу. К сожалению, хотя ... мне еще предстоит аккуратно упаковать код, что вам не поможет! Во всяком случае, чтобы ответить на ваши вопросы:

1. Is it possible to use threading for multiple queues.

A: Да, но это может быть полна подводных камней. А именно, библиотека RabbitMQ .NET не является лучшим написанным фрагментом кода, и я обнаружил, что это довольно громоздкая реализация протокола AMQP. Одним из самых пагубных предостережений является то, как он справляется с «принимающим» или «потребляющим» поведением, что может привести к возникновению взаимоблокировок, если вы не будете осторожны. К счастью, это хорошо иллюстрируется в документации API. Совет - если можно, используйте одножильный объект соединения. Затем в каждом потоке используйте соединение для создания нового IModel и соответствующих потребителей.

2. How to gracefully handle exceptions in threads - Я считаю, что это еще одна тема, и я не буду рассматривать ее здесь, поскольку есть несколько методов, которые вы можете использовать.

3. Any open-source projects? - Мне понравилось думать за EasyNetQ, хотя я все равно катался по своим собственным. Я надеюсь, что не забудьте последовать за мной, когда мой проект с открытым исходным кодом завершен, так как я считаю, что это еще лучше, чем EasyNetQ.

+0

Вы когда-нибудь могли аккуратно упаковать свой код? – Ommit

+0

Хороший вопрос. Ответ прост. У меня не было времени публиковать что-либо, потому что я все еще тестирую. – theMayer

5

Вы можете найти this answer очень полезно. У меня есть очень общее представление о том, как работает RabbitMQ, но я бы, вероятно, продолжал использовать один абонент на канал в потоке, как там было предложено.

Существует, конечно, более одного варианта организации модели потоков для этого. Фактическая реализация будет зависеть от того, как вам нужно обрабатывать сообщения из нескольких очередей: либо параллельно, либо путем их объединения, и сериализации обработки. Следующий код - консольное приложение, которое реализует симуляцию последнего случая. Он использует классы Task Parallel Library и BlockingCollection (что очень удобно для такого рода задач).

using System; 
using System.Collections.Concurrent; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Console_21842880 
{ 
    class Program 
    { 
     BlockingCollection<object> _commonQueue; 

     // process an individual queue 
     void ProcessQueue(int id, BlockingCollection<object> queue, CancellationToken token) 
     { 
      while (true) 
      { 
       // observe cancellation 
       token.ThrowIfCancellationRequested(); 
       // get a message, this blocks and waits 
       var message = queue.Take(token); 

       // process this message 
       // just place it to the common queue 
       var wrapperMessage = "queue " + id + ", message: " + message; 
       _commonQueue.Add(wrapperMessage); 
      } 
     } 

     // process the common aggregated queue 
     void ProcessCommonQeueue(CancellationToken token) 
     { 
      while (true) 
      { 
       // observe cancellation 
       token.ThrowIfCancellationRequested(); 
       // this blocks and waits 

       // get a message, this blocks and waits 
       var message = _commonQueue.Take(token); 

       // process this message 
       Console.WriteLine(message.ToString()); 
      } 
     } 

     // run the whole process 
     async Task RunAsync(CancellationToken token) 
     { 
      var queues = new List<BlockingCollection<object>>(); 
      _commonQueue = new BlockingCollection<object>(); 

      // start individual queue processors 
      var tasks = Enumerable.Range(0, 4).Select((i) => 
      { 
       var queue = new BlockingCollection<object>(); 
       queues.Add(queue); 

       return Task.Factory.StartNew(
        () => ProcessQeueue(i, queue, token), 
        TaskCreationOptions.LongRunning); 
      }).ToList(); 

      // start the common queue processor 
      tasks.Add(Task.Factory.StartNew(
       () => ProcessCommonQeueue(token), 
       TaskCreationOptions.LongRunning)); 

      // start the simulators 
      tasks.AddRange(Enumerable.Range(0, 4).Select((i) => 
       SimulateMessagesAsync(queues, token))); 

      // wait for all started tasks to complete 
      await Task.WhenAll(tasks); 
     } 

     // simulate a message source 
     async Task SimulateMessagesAsync(List<BlockingCollection<object>> queues, CancellationToken token) 
     { 
      var random = new Random(Environment.TickCount); 
      while (true) 
      { 
       token.ThrowIfCancellationRequested(); 
       await Task.Delay(random.Next(100, 1000)); 
       var queue = queues[random.Next(0, queues.Count)]; 
       var message = Guid.NewGuid().ToString() + " " + DateTime.Now.ToString(); 
       queue.Add(message); 
      } 
     } 

     // entry point 
     static void Main(string[] args) 
     { 
      Console.WriteLine("Ctrl+C to stop..."); 

      var cts = new CancellationTokenSource(); 
      Console.CancelKeyPress += (s, e) => 
      { 
       // cancel upon Ctrl+C 
       e.Cancel = true; 
       cts.Cancel(); 
      }; 

      try 
      { 
       new Program().RunAsync(cts.Token).Wait(); 
      } 
      catch (Exception ex) 
      { 
       if (ex is AggregateException) 
        ex = ex.InnerException; 
       Console.WriteLine(ex.Message); 
      } 

      Console.WriteLine("Press Enter to exit"); 
      Console.ReadLine(); 
     } 
    } 
} 

Другой идеей может быть использование Reactive Extensions (Rx). Если вы можете думать о прибывающих сообщениях как о событиях, а Rx может помочь объединить их в один поток.

+0

Основываясь на вашей реализации, я создал реализацию. Он не использует коллекцию BlockingCollection, поскольку прием сообщений, обработка сообщений и подтверждение сообщения для брокера должны выполняться на одном канале. https://gist.github.com/mahesh-singh/9214295 Не уверен, что эта реализация является правильной реализацией. – Mahesh

+0

@Mahesh, вы можете опубликовать это на http://codereview.stackexchange.com. – Noseratio

+0

Настройка нового обзора кода http://codereview.stackexchange.com/questions/42836/listen-to-multiple-rabbitmq-queue-by-task-and-process-the-message – Mahesh

Смежные вопросы