Есть демо-приложение, которое я подготовил.Реагирующие события буферов до тех пор, пока не будет запрошен
using System.Collections.Concurrent;
using System.Reactive.Linq;
class Program
{
static void Main(string[] args)
{
var stored = new ConcurrentQueue<long>();
Observable.Interval(TimeSpan.FromMilliseconds(20))
.Subscribe(it => stored.Enqueue(it));
var random = new Random();
Task.Run(async() =>
{
while (true)
{
await Task.Delay((int)(random.NextDouble() * 1000));
var currBatch = stored.ToArray();
for (int i = 0; i < currBatch.Length; i++)
{
long res;
stored.TryDequeue(out res);
}
Console.WriteLine("[" + string.Join(",", currBatch) + "]");
}
});
Console.ReadLine();
}
}
Он моделирует независимого потребителя, который срабатывает случайным интервалом времени. В реальном случае источник событий будет поступать из файловой системы, хотя может быть и всплеск.
Что это значит - это хранить неопределенное количество событий в параллельной очереди, пока потребитель не решит использовать собранные события.
У меня есть сильное чувство, что этот код небезопасен. Можно ли воспроизвести такое поведение чисто ручным образом?
Если нет, можете ли вы предложить лучший/более безопасный подход?
Ok, после того, как Мессинг с решением Эндрю, оказалось, что я на самом деле хотел, чтобы тянуть наблюдаемым. Это легко сделать с помощью метода расширения Collect. var ys = xs.Collect (() => новый список(), (list, x) => {list.Add (x); return list;}, lst => new List ()); –
v00d00