2013-10-26 2 views
0

Ищите лучший подход к чтению из источника данных, такого как хранилище таблиц Azure, которое требует много времени и преобразует данные в json или csv и записывает их в локальный файл с именем файла в зависимости от ключа раздела ,
Рассматривается один из подходов к выполнению задачи записи в файл по таймеру с истекшим событием триггера с фиксированным интервалом времени.Чтение и запись в виде параллельных задач

ответ

3

Для вещей, которые не хорошо парализуют (например, I/O), лучше всего использовать «Модель производителя-потребителя».

Как это работает, у вас есть один поток, который обрабатывает непараллелизуемую задачу, и вся эта задача выполняется в буфер. Затем у вас есть набор параллельных задач, которые все считывают из буфера и обрабатывают данные, затем они помещают данные в другой буфер, когда они обрабатывают данные. Если вам нужно снова записать результат в непараллелизированном виде, тогда у вас будет еще одна отдельная задача, выписывающая результат.

public Stream ProcessData(string filePath) 
{ 
    using(var sourceCollection = new BlockingCollection<string>()) 
    using(var destinationCollection = new BlockingCollection<SomeClass>()) 
    { 
     //Create a new background task to start reading in the file 
     Task.Factory.StartNew(() => ReadInFile(filePath, sourceCollection), TaskCreationOptions.LongRunning); 

     //Create a new background task to process the read in lines as they come in 
     Task.Factory.StartNew(() => TransformToClass(sourceCollection, destinationCollection), TaskCreationOptions.LongRunning); 

     //Process the newly created objects as they are created on the same thread that we originally called the function with 
     return TrasformToStream(destinationCollection); 
    } 
} 

private static void ReadInFile(string filePath, BlockingCollection<string> collection) 
{ 
    foreach(var line in File.ReadLines(filePath)) 
    { 
     collection.Add(line); 
    } 

    //This lets the consumer know that we will not be adding any more items to the collection. 
    collection.CompleteAdding(); 
} 

private static void TransformToClass(BlockingCollection<string> source, BlockingCollection<SomeClass> dest) 
{ 
    //GetConsumingEnumerable() will take items out of the collection and block the thread if there are no items available and CompleteAdding() has not been called yet. 
    Parallel.ForEeach(source.GetConsumingEnumerable(), 
         (line) => dest.Add(SomeClass.ExpensiveTransform(line)); 

    dest.CompleteAdding(); 
} 

private static Stream TrasformToStream(BlockingCollection<SomeClass> source) 
{ 
    var stream = new MemoryStream(); 
    foreach(var record in source.GetConsumingEnumerable()) 
    { 
     record.Seralize(stream); 
    } 
    return stream; 
} 

Я настоятельно рекомендую вам прочитать бесплатно книгу Patterns for Parallel Programming, он идет в какой-то подробно об этом. Существует целая секция, подробно объясняющая модель Продюсер-Потребитель.

ОБНОВЛЕНИЕ: Для использования загрузки небольшой производительности GetConsumingPartitioner() вместо GetConsumingEnumerable() из Parallel Extension Extras в петле Parallel.ForEach. ForEach делает некоторые предположения о том, что передается IEnumerable, в результате чего он получает дополнительные блокировки, которые ему не нужны, передавая секвенсор вместо перечислимого, ему не нужно брать эти дополнительные блокировки.

+1

'GetConsumingEnumerable()' не очень хорошо работает в 'Parallel.ForEach()'. Подумайте, вместо этого используйте ['GetConsumingPartitioner()'] (http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx). – svick

+0

Никогда не знал о 'GetConsumingPartitioner()', я знал о проблемах, которые он адресул, я просто не знал, как это исправить. –

+0

Я получаю эту ошибку «Коллекция была удалена. \ R \ nОбъект:« BlockingCollection »» – Seenu

Смежные вопросы