2016-02-25 5 views
1

Есть демо-приложение, которое я подготовил.Реагирующие события буферов до тех пор, пока не будет запрошен

using System.Collections.Concurrent; 
using System.Reactive.Linq; 

class Program 
{ 
    static void Main(string[] args) 
    { 
     var stored = new ConcurrentQueue<long>(); 

     Observable.Interval(TimeSpan.FromMilliseconds(20)) 
      .Subscribe(it => stored.Enqueue(it)); 

     var random = new Random(); 

     Task.Run(async() => 
     { 
      while (true) 
      { 
       await Task.Delay((int)(random.NextDouble() * 1000)); 
       var currBatch = stored.ToArray(); 
       for (int i = 0; i < currBatch.Length; i++) 
       { 
        long res; 
        stored.TryDequeue(out res); 
       } 
       Console.WriteLine("[" + string.Join(",", currBatch) + "]"); 
      } 
     }); 

     Console.ReadLine(); 
    } 
} 

Он моделирует независимого потребителя, который срабатывает случайным интервалом времени. В реальном случае источник событий будет поступать из файловой системы, хотя может быть и всплеск.

Что это значит - это хранить неопределенное количество событий в параллельной очереди, пока потребитель не решит использовать собранные события.

У меня есть сильное чувство, что этот код небезопасен. Можно ли воспроизвести такое поведение чисто ручным образом?

Если нет, можете ли вы предложить лучший/более безопасный подход?

+0

Ok, после того, как Мессинг с решением Эндрю, оказалось, что я на самом деле хотел, чтобы тянуть наблюдаемым. Это легко сделать с помощью метода расширения Collect. var ys = xs.Collect (() => новый список (), (list, x) => {list.Add (x); return list;}, lst => new List ()); – v00d00

ответ

2

Здесь вы идете:

var producer = Observable.Interval(TimeSpan.FromMilliseconds(20)); 
var random = new Random(); 

Task.Run(async() => 
{ 
    var notify = new Subject<int>(); 
    producer.Window(() => notify) 
     .SelectMany(ev => ev.ToList()) 
     .Subscribe(currBatch => Console.WriteLine("[" + string.Join(",", currBatch) + "]")); 

    while (true) 
    { 
     await Task.Delay((int)(random.NextDouble() * 1000)); 
     notify.OnNext(1); 
    } 
}); 

Console.ReadLine(); 
+0

<3 Спасибо за такой быстрый ответ. – v00d00

Смежные вопросы