Ищите лучший подход к чтению из источника данных, такого как хранилище таблиц Azure, которое требует много времени и преобразует данные в json или csv и записывает их в локальный файл с именем файла в зависимости от ключа раздела ,
Рассматривается один из подходов к выполнению задачи записи в файл по таймеру с истекшим событием триггера с фиксированным интервалом времени.Чтение и запись в виде параллельных задач
ответ
Для вещей, которые не хорошо парализуют (например, I/O), лучше всего использовать «Модель производителя-потребителя».
Как это работает, у вас есть один поток, который обрабатывает непараллелизуемую задачу, и вся эта задача выполняется в буфер. Затем у вас есть набор параллельных задач, которые все считывают из буфера и обрабатывают данные, затем они помещают данные в другой буфер, когда они обрабатывают данные. Если вам нужно снова записать результат в непараллелизированном виде, тогда у вас будет еще одна отдельная задача, выписывающая результат.
public Stream ProcessData(string filePath)
{
using(var sourceCollection = new BlockingCollection<string>())
using(var destinationCollection = new BlockingCollection<SomeClass>())
{
//Create a new background task to start reading in the file
Task.Factory.StartNew(() => ReadInFile(filePath, sourceCollection), TaskCreationOptions.LongRunning);
//Create a new background task to process the read in lines as they come in
Task.Factory.StartNew(() => TransformToClass(sourceCollection, destinationCollection), TaskCreationOptions.LongRunning);
//Process the newly created objects as they are created on the same thread that we originally called the function with
return TrasformToStream(destinationCollection);
}
}
private static void ReadInFile(string filePath, BlockingCollection<string> collection)
{
foreach(var line in File.ReadLines(filePath))
{
collection.Add(line);
}
//This lets the consumer know that we will not be adding any more items to the collection.
collection.CompleteAdding();
}
private static void TransformToClass(BlockingCollection<string> source, BlockingCollection<SomeClass> dest)
{
//GetConsumingEnumerable() will take items out of the collection and block the thread if there are no items available and CompleteAdding() has not been called yet.
Parallel.ForEeach(source.GetConsumingEnumerable(),
(line) => dest.Add(SomeClass.ExpensiveTransform(line));
dest.CompleteAdding();
}
private static Stream TrasformToStream(BlockingCollection<SomeClass> source)
{
var stream = new MemoryStream();
foreach(var record in source.GetConsumingEnumerable())
{
record.Seralize(stream);
}
return stream;
}
Я настоятельно рекомендую вам прочитать бесплатно книгу Patterns for Parallel Programming, он идет в какой-то подробно об этом. Существует целая секция, подробно объясняющая модель Продюсер-Потребитель.
ОБНОВЛЕНИЕ: Для использования загрузки небольшой производительности GetConsumingPartitioner()
вместо GetConsumingEnumerable()
из Parallel Extension Extras в петле Parallel.ForEach
. ForEach
делает некоторые предположения о том, что передается IEnumerable
, в результате чего он получает дополнительные блокировки, которые ему не нужны, передавая секвенсор вместо перечислимого, ему не нужно брать эти дополнительные блокировки.
- 1. нерест несколько параллельных задач
- 2. Несколько параллельных задач
- 3. Параллельных задач организатор
- 4. Выполнение параллельных задач async?
- 5. Выполнение параллельных задач
- 6. запуская последовательность параллельных задач
- 7. чтение из файла в виде параллельных массивов в Java
- 8. iOS-Управление и сопровождение нескольких параллельных задач
- 9. Выполнение параллельных задач в Jenkins
- 10. Выполнение параллельных задач в C#
- 11. Выполнение параллельных задач в R
- 12. 3D-запись и чтение в виде текстуры в CUDA
- 13. Bluebird для параллельных независимых задач
- 14. Алгоритм планирования независимых параллельных задач
- 15. oozie: работает сотни параллельных задач
- 16. Чтение и запись файла изображения в виде «обычных файлов»
- 17. Запуск параллельных задач на C#
- 18. использовать asyncio для параллельных задач
- 19. оркестровка задач побежала в параллельных блоках
- 20. Как увеличить количество параллельных задач в C#
- 21. Безопасность потоков для параллельных задач в .NET.
- 22. Соберите результаты из параллельных задач выполнения Celery
- 23. асинхронная запись и чтение файла
- 24. Чтение и запись файлов python
- 25. WebDAV и частичное чтение/запись?
- 26. с использованием последовательных и параллельных задач в gulp 4.0
- 27. Чтение и запись файла
- 28. Запись и чтение данных
- 29. Чтение и запись NSDecimalNumber
- 30. Запись и чтение Android
'GetConsumingEnumerable()' не очень хорошо работает в 'Parallel.ForEach()'. Подумайте, вместо этого используйте ['GetConsumingPartitioner()'] (http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx). – svick
Никогда не знал о 'GetConsumingPartitioner()', я знал о проблемах, которые он адресул, я просто не знал, как это исправить. –
Я получаю эту ошибку «Коллекция была удалена. \ R \ nОбъект:« BlockingCollection »» – Seenu