Я периодически загружаю JSON, скажем каждые 10 секунд ... Когда данные поступают, происходит событие. Запущенное событие просто добавляет JSON в BlockingCollection<string>
(для обработки).Почему я никогда не могу удалить BlockingCollection?
Я пытаюсь обработать JSON как можно быстрее (как только он приходит ...):
public class Engine
{
private BlockingCollection<string> Queue = new BlockingCollection<string>();
private DataDownloader DataDownloader;
public void Init(string url, int interaval)
{
dataDownloader = new DataDownloader(url, interaval);
dataDownloader .StartCollecting();
dataDownloader .DataReceivedEvent += DataArrived;
//Kick off a new task to process the incomming JSON
Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
}
/// <summary>
/// Processes the JSON in parallel
/// </summary>
private void Process()
{
Parallel.ForEach(Queue.GetConsumingEnumerable(), ProcessJson);
}
/// <summary>
/// Deserializes JSON and adds result to database
/// </summary>
/// <param name="json"></param>
private void ProcessJson(string json)
{
using (var db = new MyDataContext())
{
var items= Extensions.DeserializeData(json);
foreach (var item in items)
{
db.Items.Add(item);
db.SaveChanges();
}
}
}
private void DataArrived(object sender, string json)
{
Queue.Add(json);
Console.WriteLine("Queue length: " + Queue.Count);
}
}
Когда я запускаю программу, она работает и получает данные добавлены в базу данных, но если я смотрю сообщение консоли от Console.WriteLine("Queue length: " + Queue.Count);
, я получаю что-то вроде этого:
1
1
1
1
1
1
1
1
2
3
4
5
6
7
...
Я попытался модифицировать свою Process
выглядеть следующим образом:
/// <summary>
/// Processes the JSON in parallel
/// </summary>
private void Process()
{
foreach (var json in Queue.GetConsumingEnumerable())
{
ProcessJson(json);
}
}
Затем я добавить несколько Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
, но я получаю такую же проблему ...
Кто-нибудь есть какие-либо идеи о том, что происходит здесь не так?
Не зависит ли длина обработки от ваших входных данных json? Если первые несколько простых, то они будут выполняться быстро, а если ваш восьмой огромный, он будет складываться. Также вам нужно сохранить изменения в db после добавления каждого элемента? (У меня нет опыта работы с DataContext, но я думаю, что это должно быть дорого) – CrudaLilium
Это уже как можно быстрее, операционная система. Если ваш код работает быстрее, он потребляет, и это вполне вероятно, потому что обработка json никогда не бывает дешевой, тогда нет другого выбора, кроме как выращивать коллекцию. Именно поэтому важна блокировка в BlockingCollection, что гарантирует, что ваша программа не будет сбой с OOM. –
@CrudaLilium Nah, JSON, как правило, того же размера, так что это не так ... но хорошее мышление. – pookie