2012-07-04 8 views
-2

У меня есть класс, который обрабатывает сообщения:Обработка очереди в параллельных

public abstract class ProcessingBase { 
    public bool IsBusy { get; set; } 
    public Queue<CustomMessage> PendingMessages { get; private set; } 
    public abstract MessageProcessingResult Process(); 
    ... 

В обрабатывающих модулях, которые я создал до сих пор, один использует очередь для обработки сообщений, отправляя их через сокет, а другой отправляет письма. Это отлично работает для отдельных модулей обработки, но предположим, что я хотел связать эти два?

Я думал, что я мог бы сделать что-то вроде:

public class ChainProcessor : ProcessingBase { 
    public List<ProcessingBase> Processors { get; set; } 
    public override MessageProcessingResult Process() { 
       if (IsBusy) 
        return null; 
       IsBusy = true; 

       CustomMessage msg = null; 
       this.ProcessedMessages = new List<CustomMessage>(); 

       // create clone of queue 
       var messagesToSend = new Queue<CustomMessage>(this.PendingMessages); 
       this.PendingMessages.Clear(); 
       while (messagesToSend.Count > 0 && (msg = messagesToSend.Dequeue()) != null) { 
        foreach (var processor in this.Processors) { 
         // something with yield return? 
        } 
       } 

Это фактическое сцепление, которое я шатким на. В идеале я хотел бы обработать их в виде «волны». Например:

Module 1 - Processed message A 
Module 2 - Processed message A 
Module 1 - Processed message B 
Module 2 - Processed message B 
Module 3 - Processed message A 
Module 1 - Processed message C 

Где каждое сообщение передается по цепочке, при этом сообщения постоянно поступают и уходят по мере их перемещения. Какой был бы лучший способ сделать это? Или я ограничена последовательным передачей каждого сообщения по всей цепочке?

Ie (То, что я не хочу): Модуля 1 - Обработанное ообщение Модуля 2 - Обработанное ообщение Модуля 3 - Обработанное ообщение Модуль 1 - Обработанное сообщение B Модуль 2 - Обработанное сообщение B модуль 3 - Обработанное сообщение B модуль 1 - Обработанное сообщение C

EDIT: Я надеялся, что я мог бы сделать что-то, где первый процессор будет уступать возвращение обратно в «цепи процессор», который будет проходить это сообщение на модуль 2, то, возможно, цепочка может начать следующее сообщение на processo r1

+0

На самом деле, я не против, какой поток, важно то, что они не обрабатываются последовательно (вопрос обновлен). – Echilon

ответ

0

Я думаю, что вы можете использовать

BlockingCollection<CustomMessage> 

для каждого модуля. Затем вы можете создавать несколько потоков для чтения с каждого сеанса и выполнять обработку модуля. После этого поставьте его в очередь следующего модуля.

Создав несколько потоков, вы можете работать параллельно. BlockingCollection является потокобезопасным, быстрым и блокирующим. Очень удобно. См. http://msdn.microsoft.com/en-us/library/dd267312.aspx

0

Вы ищете резьбу безопасности ConcurrentQueue и AutoResetEvent, ведьма может проснуться спальной нитью (см. Примеры).

Каждый рабочий работает в своем потоке (если у вас есть работа для основного потока), и после того, как он выполняет свою работу и опустошает свою очередь ввода, вызовите AutoResetEvent.WaitOne(). Если очередь ввода заполнена каким-либо элементом, вызовите AutoResetEvent.Set(), который снова запустит процесс.

Вы можете связать этих двух рабочих, создав вторую очередь с элементами, обработанными первым работником. После того, как первый рабочий закончил с сообщением, добавляет элемент в очередь вывода и вызывает AutoResetEvent.Set().

Прочтите и подумайте дважды, я не знаю вашу ситуацию, поэтому вы можете оптимизировать процесс для своих нужд.

Удачи. ;-)

Смежные вопросы