2010-06-29 5 views
5

привет, я хочу, чтобы темы стали сотрудником производителя и потребителя. потребитель довольно медленный, а производитель очень быстрый и работает в очередях.C# inter-thread communication

например, потребитель может обрабатывать одно сообщение за 20 секунд, а производитель может производить 10 сообщений за одну секунду, но делает это примерно один раз в течение длительного времени, чтобы потребитель мог догнать.

я хочу что-то вроде:

Stream commonStream; 
AutoResetEvent commonLock; 

void Producer() 
{ 
    while (true) 
    { 
    magic.BlockUntilMagicAvalible(); 
    byte[] buffer = magic.Produce(); 
    commonStream.Write(buffer); 
    commonLock.Set(); 
    } 
} 

void Consumer() 
{ 
    while(true) 
    { 
    commonLock.WaitOne(); 
    MagicalObject o = binarySerializer.Deserialize(commonStream); 
    DoSomething(o); 
    } 
} 
+0

Какую версию .Net вы используете, есть несколько новых вещей для v4 для именно этого материала. –

+0

.Net 3.5; Комментарии должны содержать не менее 15 символов. –

ответ

1

Я хотел бы прочитать следующие статьи, которые они описывают вашу проблему. В принципе, вы не получаете правильную изоляцию для своей единицы работы.

http://blogs.msdn.com/b/ricom/archive/2006/04/24/582643.aspx http://blogs.msdn.com/b/ricom/archive/2006/04/26/584802.aspx

+0

да, своего рода, что я хочу написать ... –

11

Если у вас есть .Net 4.0 или выше вы можете сделать это таким образом, используя BlockingCollection

int maxBufferCap = 500; 
BlockingCollection<MagicalObject> Collection 
          = new BlockingCollection<MagicalObject>(maxBufferCap); 
void Producer() 
{ 
    while (magic.HasMoreMagic) 
    { 
     this.Collection.Add(magic.ProduceMagic()); 
    } 
    this.Collection.CompleteAdding(); 
} 

void Consumer() 
{ 
    foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable()) 
    { 
     DoSomthing(magicalObject); 
    } 
} 

В foreach линия будет спать, если нет данных в буфере, он автоматически автоматически разбудит его, когда что-то добавится в коллекцию.

Причина, по которой я устанавливаю максимальный буфер, заключается в том, что ваш производитель намного быстрее, чем потребитель, который может в конечном итоге потреблять много памяти, поскольку все больше и больше объектов попадают в коллекцию. Установив максимальный размер буфера при создании коллекции блокировки при достижении размера буфера, вызов Add на производителя будет блокироваться, пока элемент не будет удален из коллекции потребителем.

Еще один бонус класса BlockingCollection - он может иметь столько производителей и потребителей, сколько вам нужно, это не должно быть соотношение 1: 1. Если DoSomthing поддерживает его, вы могли бы иметь foreach петли на ядро ​​компьютера (или даже использовать Parallel.ForEach и использовать потребляющие перечислимы в качестве источника данных)

void ConsumersInParalell() 
{ 
    //This assumes the method signature of DoSomthing is one of the following: 
    // Action<MagicalObject> 
    // Action<MagicalObject, ParallelLoopState> 
    // Action<MagicalObject, ParallelLoopState, long> 
    Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing); 
} 
+2

Обратите внимание, что TPL был перенесен на .NET 3.5: http://codeblog.theg2.net/2010/02/tpl-and-parallelforeach-in- net-35-using.html –

0

Вы можете получить то, что вы хотите использовать очереди и таймер. Производитель добавляет значения в очередь и запускает потребительский таймер. Истекшее событие потребительского таймера (которое находится в потоке Threadpool) останавливает таймер и проходит по очереди до тех пор, пока оно не станет пустым, а затем исчезнет (нет ненужного опроса). Производитель может добавить в очередь, пока потребитель все еще работает.

System.Timers.Timer consumerTimer; 
Queue<byte[]> queue = new Queue<byte[]>(); 

void Producer() 
{ 
    consumerTimer = new System.Timers.Timer(1000); 
    consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed); 
    while (true) 
    { 
     magic.BlockUntilMagicAvailable(); 
     lock (queue) 
     { 
      queue.Enqueue(magic.Produce()); 
      if (!consumerTimer.Enabled) 
      { 
       consumerTimer.Start(); 
      } 
     } 
    } 
} 

void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) 
{ 
    while (true) 
    { 
     consumerTimer.Stop(); 
     lock (queue) 
     { 
      if (queue.Count > 0) 
      { 
       DoSomething(queue.Dequeue()); 
      } 
      else 
      { 
       break; 
      } 
     } 
    } 
} 
+0

ваш фрагмент не является потокобезопасным ... и мой подразумевает отсутствие опроса –

+0

Что такое потокобезопасность? И он не опросит - таймер является одноразовым, который активируется только тогда, когда производитель добавляет в очередь. –

-1

Я использую Mutex's. Идея состоит в том, что они работают в разных потоках. Потребительский поток запускается затвором с помощью мьютекса, где он будет работать неограниченно до выпуска Продюсером. Затем он будет обрабатывать данные параллельно, оставив продюсера продолжать. Потребитель будет повторно заблокирован по завершении.

(код запустить поток, и другие биты качества были опущены для краткости.)

// Pre-create mutex owned by Producer thread, then start Consumer thread. 
Mutex mutex = new Mutex(true); 
Queue<T> queue = new Queue<T>(); 

void Producer_AddData(T data) 
{ 
    lock (queue) { 
    queue.Enqueue(GetData()); 
    } 

    // Release mutex to start thread: 
    mutex.ReleaseMutex(); 
    mutex.WaitOne(); 
} 

void Consumer() 
{ 
    while(true) 
    { 
    // Wait indefinitely on mutex 
    mutex.WaitOne(); 
    mutex.ReleaseMutex(); 

    T data; 
    lock (queue) { 
     data = queue.Dequeue(); 
    } 
    DoSomething(data); 
    } 

}

Это замедляет Producer на очень несколько миллисекунд, в то время как является ждет потребитель будить и отпустите мьютекс. Если вы можете жить с этим.

+0

Использование 'BlockingCollection' намного лучше. Во-первых, это гораздо более очевидно, когда это правильно, чем использование мьютексов, и в отличие от вашей модели производитель и потребитель могут работать параллельно; вы гарантируете, что ваш код * либо * производит *, либо * потребляет, но не тот и другой. Он также не хорошо масштабируется для более чем одного производителя или более одного потребителя, в отличие от блокирующей коллекции, где это тривиально. Вы могли бы использовать более сложный подход на основе мьютекса, который имел бы преимущества блокирующей коллекции, но это было бы * много * работы и было бы намного менее удобочитаемым/поддерживаемым. – Servy

+0

BlockingColletion недоступен для меня, поскольку я не могу запустить 4.5. Если бы я мог, то это, вероятно, было бы правильным решением. Однако этот код работает параллельно. Я, возможно, не был ясен, но эти два в разных потоках. Я использую это для выполнения тяжелых SQL-запросов в одном потоке, в то время как сбор данных по другому потоку, и он хорошо работает для меня. – Ben

+0

BlockingCollection был добавлен в 4.0, а не 4.5. – Servy