2013-10-26 4 views
-1

У нас есть приложение, которое регулярно получает мультимедийные сообщения и должно отвечать на них.Обработка данных несколькими потоками одновременно

В настоящее время мы делаем это с помощью одного потока, сначала получая сообщения, а затем обрабатывая их один за другим. Это делает работу, но она медленная.

Итак, мы сейчас думаем о том же процессе, но с несколькими потоками одновременно.

Любой простой способ разрешить параллельную обработку входящих записей, но избежать ошибочной обработки одной записи двумя потоками?

+1

простой и элегантный -> поднимет дискуссию ... – igrimpe

+0

Как вы получаете записи? Если с TCP/IP у вас может быть один поток прослушивания, который будет создавать один поток для каждой входящей записи. –

+0

Трудно дать конкретный ответ. Вещи, которые вы можете посмотреть, варьируются от метода Parallel.For в .NET до Azure с шиной с несколькими масштабируемыми рабочими ролями. Если вы можете указать дополнительную информацию и даже пример кода, мы можем предложить более конкретные рекомендации. –

ответ

1

Любой простой способ разрешить параллельную обработку входящих записей, но избежать ошибочной обработки одной записи двумя потоками?

Да это на самом деле не так уж трудно, то, что вы хотели сделать, это называется «производитель-потребитель модель»

Если ваше сообщение приемник может обрабатывать только один поток в то время, но ваше сообщение «процессор "может работать на несколько сообщений сразу вам просто нужно использовать BlockingCollection для хранения работ, которые должны быть обработаны

public sealed class MessageProcessor : IDisposable 
{ 
    public MessageProcessor() 
     : this(-1) 
    { 
    } 

    public MessageProcessor(int maxThreadsForProcessing) 
    { 
     _maxThreadsForProcessing = maxThreadsForProcessing; 
     _messages = new BlockingCollection<Message>(); 
     _cts = new CancellationTokenSource(); 

     _messageProcessorThread = new Thread(ProcessMessages); 
     _messageProcessorThread.IsBackground = true; 
     _messageProcessorThread.Name = "Message Processor Thread"; 
     _messageProcessorThread.Start(); 
    } 

    public int MaxThreadsForProcessing 
    { 
     get { return _maxThreadsForProcessing; } 
    } 

    private readonly BlockingCollection<Message> _messages; 
    private readonly CancellationTokenSource _cts; 
    private readonly Thread _messageProcessorThread; 
    private bool _disposed = false; 
    private readonly int _maxThreadsForProcessing; 


    /// <summary> 
    /// Add a new message to be queued up and processed in the background. 
    /// </summary> 
    public void ReceiveMessage(Message message) 
    { 
     _messages.Add(message); 
    } 

    /// <summary> 
    /// Signals the system to stop processing messages. 
    /// </summary> 
    /// <param name="finishQueue">Should the queue of messages waiting to be processed be allowed to finish</param> 
    public void Stop(bool finishQueue) 
    { 
     _messages.CompleteAdding(); 
     if(!finishQueue) 
      _cts.Cancel(); 

     //Wait for the message processor thread to finish it's work. 
     _messageProcessorThread.Join(); 
    } 

    /// <summary> 
    /// The background thread that processes messages in the system 
    /// </summary> 
    private void ProcessMessages() 
    { 
     try 
     { 
      Parallel.ForEach(_messages.GetConsumingEnumerable(), 
         new ParallelOptions() 
         { 
          CancellationToken = _cts.Token, 
          MaxDegreeOfParallelism = MaxThreadsForProcessing 
         }, 
         ProcessMessage); 
     } 
     catch (OperationCanceledException) 
     { 
      //Don't care that it happened, just don't want it to bubble up as a unhandeled exception. 
     } 
    } 

    private void ProcessMessage(Message message, ParallelLoopState loopState) 
    { 
     //Here be dragons! (or your code to process a message, your choice :-)) 

     //Use if(_cts.Token.IsCancellationRequested || loopState.ShouldExitCurrentIteration) to test if 
     // we should quit out of the function early for a graceful shutdown. 
    } 

    public void Dispose() 
    { 
     if(!_disposed) 
     { 
      if(_cts != null && _messages != null && _messageProcessorThread != null) 
       Stop(true); //This line will block till all queued messages have been processed, if you want it to be quicker you need to call `Stop(false)` before you dispose the object. 

      if(_cts != null) 
       _cts.Dispose(); 

      if(_messages != null) 
       _messages.Dispose(); 

      GC.SuppressFinalize(this); 
      _disposed = true; 
     } 
    } 

    ~MessageProcessor() 
    { 
     //Nothing to do, just making FXCop happy. 
    } 

} 

Я настоятельно рекомендую вам прочитать бесплатно книгу Patterns for Parallel Programming, он идет в какой-то подробно об этом. Существует целая секция, подробно объясняющая модель Продюсер-Потребитель.


UPDATE: Есть некоторые проблемы с производительностью с GetConsumingEnumerable() и Parallel.ForEach(, вместо того, чтобы использовать библиотеку ParallelExtensionsExtras и это новый метод расширения GetConsumingPartitioner()

public static Partitioner<T> GetConsumingPartitioner<T>(
    this BlockingCollection<T> collection) 
{ 
    return new BlockingCollectionPartitioner<T>(collection); 
} 

private class BlockingCollectionPartitioner<T> : Partitioner<T> 
{ 
    private BlockingCollection<T> _collection; 

    internal BlockingCollectionPartitioner(
     BlockingCollection<T> collection) 
    { 
     if (collection == null) 
      throw new ArgumentNullException("collection"); 
     _collection = collection; 
    } 

    public override bool SupportsDynamicPartitions { 
     get { return true; } 
    } 

    public override IList<IEnumerator<T>> GetPartitions(
     int partitionCount) 
    { 
     if (partitionCount < 1) 
      throw new ArgumentOutOfRangeException("partitionCount"); 
     var dynamicPartitioner = GetDynamicPartitions(); 
     return Enumerable.Range(0, partitionCount).Select(_ => 
      dynamicPartitioner.GetEnumerator()).ToArray(); 
    } 

    public override IEnumerable<T> GetDynamicPartitions() 
    { 
     return _collection.GetConsumingEnumerable(); 
    } 
} 
Смежные вопросы