2017-02-05 6 views
0

Я периодически загружаю JSON, скажем каждые 10 секунд ... Когда данные поступают, происходит событие. Запущенное событие просто добавляет JSON в BlockingCollection<string> (для обработки).Почему я никогда не могу удалить BlockingCollection?

Я пытаюсь обработать JSON как можно быстрее (как только он приходит ...):

public class Engine 
{ 
    private BlockingCollection<string> Queue = new BlockingCollection<string>(); 
    private DataDownloader DataDownloader; 

    public void Init(string url, int interaval) 
    { 
     dataDownloader = new DataDownloader(url, interaval); 
     dataDownloader .StartCollecting(); 
     dataDownloader .DataReceivedEvent += DataArrived; 

     //Kick off a new task to process the incomming JSON 
     Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning); 
    } 

    /// <summary> 
    /// Processes the JSON in parallel 
    /// </summary> 
    private void Process() 
    { 
     Parallel.ForEach(Queue.GetConsumingEnumerable(), ProcessJson); 
    } 

    /// <summary> 
    /// Deserializes JSON and adds result to database 
    /// </summary> 
    /// <param name="json"></param> 
    private void ProcessJson(string json) 
    { 
     using (var db = new MyDataContext()) 
     { 
      var items= Extensions.DeserializeData(json); 
      foreach (var item in items) 
      { 
       db.Items.Add(item); 
       db.SaveChanges(); 
      } 
     } 
    } 

    private void DataArrived(object sender, string json) 
    { 
     Queue.Add(json); 
     Console.WriteLine("Queue length: " + Queue.Count); 
    } 
} 

Когда я запускаю программу, она работает и получает данные добавлены в базу данных, но если я смотрю сообщение консоли от Console.WriteLine("Queue length: " + Queue.Count);, я получаю что-то вроде этого:

1 
1 
1 
1 
1 
1 
1 
1 
2 
3 
4 
5 
6 
7 
... 

Я попытался модифицировать свою Process выглядеть следующим образом:

/// <summary> 
/// Processes the JSON in parallel 
/// </summary> 
private void Process() 
{ 

    foreach (var json in Queue.GetConsumingEnumerable()) 
    { 
     ProcessJson(json); 
    } 
} 

Затем я добавить несколько Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);, но я получаю такую ​​же проблему ...

Кто-нибудь есть какие-либо идеи о том, что происходит здесь не так?

+0

Не зависит ли длина обработки от ваших входных данных json? Если первые несколько простых, то они будут выполняться быстро, а если ваш восьмой огромный, он будет складываться. Также вам нужно сохранить изменения в db после добавления каждого элемента? (У меня нет опыта работы с DataContext, но я думаю, что это должно быть дорого) – CrudaLilium

+0

Это уже как можно быстрее, операционная система. Если ваш код работает быстрее, он потребляет, и это вполне вероятно, потому что обработка json никогда не бывает дешевой, тогда нет другого выбора, кроме как выращивать коллекцию. Именно поэтому важна блокировка в BlockingCollection, что гарантирует, что ваша программа не будет сбой с OOM. –

+0

@CrudaLilium Nah, JSON, как правило, того же размера, так что это не так ... но хорошее мышление. – pookie

ответ

0

Сначала очередь будет заполнена до начала обработки. Вероятно, потому что материал Entity Framework должен быть загружен, и соединение с базой данных должно быть установлено, это занимает некоторое время.

Затем GetConsumingEnumerable() начинает догонять процесс загрузки, истощая очередь во время загрузки. Коллекция пуста, MoveNext() возвращает false, Parallel.ForEach() выходы и Process() отделки.

Затем вы увидите, что очередь начинает заполняться, потому что она больше не потребляется.

Вам необходимо продолжить чтение с BlockingCollection до завершения процесса загрузки.

+0

Даже если я переношу свой 'foreach' в' while (true) 'и добавляю несколько задач, это все равно произойдет. – pookie

+0

Время для создания [mcve]. – CodeCaster