2010-09-02 2 views
2

У меня есть 1 поток потоковых данных и 2-й (поток) обработки данных. Обработка данных занимает около 100 мс, поэтому я использую второй поток, чтобы не задерживать 1-й поток.Как написать код производителя/потребителя на C#?

Пока второй поток обрабатывает данные, 1-й поток добавляет данные в кэш-словарь, а затем, когда второй поток завершен, он обрабатывает кешированные значения.

Мои вопросы: как должен быть код производителя/потребителя на C#?

public delegate void OnValue(ulong value); 

public class Runner 
{ 
    public event OnValue OnValueEvent; 
    private readonly IDictionary<string, ulong> _cache = new Dictionary<string, ulong>(StringComparer.InvariantCultureIgnoreCase); 
    private readonly AutoResetEvent _cachePublisherWaitHandle = new AutoResetEvent(true); 

    public void Start() 
    { 
     for (ulong i = 0; i < 500; i++) 
     { 
      DataStreamHandler(i.ToString(), i); 
     } 
    } 

    private void DataStreamHandler(string id, ulong value) 
    { 
     _cache[id] = value; 

     if (_cachePublisherWaitHandle.WaitOne(1)) 
     { 
      IList<ulong> tempValues = new List<ulong>(_cache.Values); 
      _cache.Clear(); 

      _cachePublisherWaitHandle.Reset(); 

      ThreadPool.UnsafeQueueUserWorkItem(delegate 
      { 
       try 
       { 
        foreach (ulong value1 in tempValues) 
         if (OnValueEvent != null) 
          OnValueEvent(value1); 
       } 
       finally 
       { 
        _cachePublisherWaitHandle.Set(); 
       } 
      }, null); 
     } 
     else 
     { 
      Console.WriteLine(string.Format("Buffered value: {0}.", value)); 
     } 
    } 
} 

class Program 
{ 
    static void Main(string[] args) 
    { 
     Stopwatch sw = Stopwatch.StartNew(); 
     Runner r = new Runner(); 
     r.OnValueEvent += delegate(ulong x) 
           { 
            Console.WriteLine(string.Format("Processed value: {0}.", x)); 
            Thread.Sleep(100); 

            if(x == 499) 
            { 
             sw.Stop(); 
             Console.WriteLine(string.Format("Time: {0}.", sw.ElapsedMilliseconds)); 
            } 
           }; 
     r.Start(); 
     Console.WriteLine("Done"); 
     Console.ReadLine(); 
    } 
} 

ответ

2

Существует хороший article на MSDN о синхронизации производителя и потребителя. Также есть хороший пример на сайте Albahari.

+1

Сайт Albahari является правильным. К сожалению, эта статья MSDN имеет неправильную реализацию, которая может привести к сценарию live-lock в ситуации, когда вы используете более одного потребителя. –

4

Лучшей практикой настройки шаблона производителя-потребителя является использование класса BlockingCollection, который доступен в .NET 4.0 или в виде отдельной загрузки фрейма Reactive Extensions. Идея заключается в том, что производители будут входить в очередь с использованием метода Add, и потребители будут деинсталлироваться с использованием метода Take, который блокирует, если очередь пуста. Например, SwDevMan81 указал, что на сайте Albahari есть действительно хорошая запись о том, как заставить его работать правильно, если вы хотите идти по ручному маршруту.

Смежные вопросы