У меня есть 1 поток потоковых данных и 2-й (поток) обработки данных. Обработка данных занимает около 100 мс, поэтому я использую второй поток, чтобы не задерживать 1-й поток.Как написать код производителя/потребителя на C#?
Пока второй поток обрабатывает данные, 1-й поток добавляет данные в кэш-словарь, а затем, когда второй поток завершен, он обрабатывает кешированные значения.
Мои вопросы: как должен быть код производителя/потребителя на C#?
public delegate void OnValue(ulong value);
public class Runner
{
public event OnValue OnValueEvent;
private readonly IDictionary<string, ulong> _cache = new Dictionary<string, ulong>(StringComparer.InvariantCultureIgnoreCase);
private readonly AutoResetEvent _cachePublisherWaitHandle = new AutoResetEvent(true);
public void Start()
{
for (ulong i = 0; i < 500; i++)
{
DataStreamHandler(i.ToString(), i);
}
}
private void DataStreamHandler(string id, ulong value)
{
_cache[id] = value;
if (_cachePublisherWaitHandle.WaitOne(1))
{
IList<ulong> tempValues = new List<ulong>(_cache.Values);
_cache.Clear();
_cachePublisherWaitHandle.Reset();
ThreadPool.UnsafeQueueUserWorkItem(delegate
{
try
{
foreach (ulong value1 in tempValues)
if (OnValueEvent != null)
OnValueEvent(value1);
}
finally
{
_cachePublisherWaitHandle.Set();
}
}, null);
}
else
{
Console.WriteLine(string.Format("Buffered value: {0}.", value));
}
}
}
class Program
{
static void Main(string[] args)
{
Stopwatch sw = Stopwatch.StartNew();
Runner r = new Runner();
r.OnValueEvent += delegate(ulong x)
{
Console.WriteLine(string.Format("Processed value: {0}.", x));
Thread.Sleep(100);
if(x == 499)
{
sw.Stop();
Console.WriteLine(string.Format("Time: {0}.", sw.ElapsedMilliseconds));
}
};
r.Start();
Console.WriteLine("Done");
Console.ReadLine();
}
}
Сайт Albahari является правильным. К сожалению, эта статья MSDN имеет неправильную реализацию, которая может привести к сценарию live-lock в ситуации, когда вы используете более одного потребителя. –