10

Я запускаю довольно типичную модель производителя/потребителя для разных задач.Это работа для потока данных TPL?

Task1: Считывает партии байтов [] из двоичных файлов и запускает новую задачу для каждой коллекции байт-массивов. (операция выполняется для управления памятью).

Задача 2-n: Это рабочие задачи, и каждый из них работает с переданной коллекцией (из Tasks1) байт-массивов и де-сериализует байт-массивы, сортирует их по определенным критериям и затем сохраняет коллекцию результирующих объектов (каждый массив байтов десериализуется в такой объект) в параллельном словаре.

Задача (n + 1) Я выбрал параллельный словарь, потому что задача этой задачи заключается в объединении тех коллекций, которые хранятся в параллельном словаре в том же порядке, что и исходные данные из Task1. Я достигаю этого, передавая идентификатор collectionID (он имеет тип int и увеличивается для каждой новой коллекции в Task1) вплоть до задачи Task1 до этой задачи. Эта задача в основном проверяет, сохраняется ли следующий ожидаемый идентификатор коллекции в параллельном словаре, и если да, вынимает его, добавляет в конечную очередь и проверяет следующую коллекцию в параллельном словаре.

Теперь, из того, что я прочитал, и видео, которое я смотрел, мне кажется, что TPL Dataflow может быть идеальным кандидатом для такой модели производителя/потребителя. Я просто не могу разработать дизайн и, таким образом, начать работу, потому что никогда не работал с TPL Dataflow. С точки зрения пропускной способности и латентности эта библиотека даже до задачи? В настоящее время я обрабатываю 2,5 миллиона байт-массивов и, следовательно, объекты в секунду в результирующих коллекциях. Может ли облегчить поток данных TPL? Меня особенно интересует ответ на следующий вопрос: может ли поток данных TPL сохранять порядок сборочных партий из Task1 при нерестах рабочих задач и повторного слияния их после выполнения рабочими задачами? Это оптимизирует ситуацию? Профилировав всю структуру, я чувствую, что довольно много времени тратится впустую из-за вращения и слишком много параллельных коллекций.

Любые идеи, мысли?

ответ

12

EDIT: Оказывается, я был очень неправ. TransformBlockделает возвращает элементы в том же порядке, в каком они вошли, даже если он настроен на параллелизм. Из-за этого код в моем первоначальном ответе совершенно бесполезен, и вместо этого можно использовать обычный TransformBlock.


Оригинальный ответ:

Насколько я знаю только один параллелизм конструкция в .NET поддерживает возвращение обработанных элементов в том порядке, они пришли: PLINQ с AsOrdered(). Но мне кажется, что PLINQ не подходит, как вам хочется.

TPL Dataflow, с другой стороны, хорошо подходит, я думаю, но у него нет блока, который бы поддерживал параллельность и возвращал элементы в порядке в одно и то же время (TransformBlock поддерживает оба из них, но не на в то же время). К счастью, блоки потока данных были разработаны с учетом возможности компоновки, поэтому мы можем создать собственный блок, который это делает.

Но сначала мы должны выяснить, как заказать результаты. Использование параллельного словаря, как вы и предполагали, наряду с некоторым механизмом синхронизации, безусловно, будет работать. Но я думаю, что есть более простое решение: используйте очередь Task s. В выходной задаче вы удалите Task, дождитесь ее завершения (асинхронно), и когда это произойдет, вы отправите свой результат.Нам по-прежнему нужна некоторая синхронизация для случая, когда очередь пуста, но мы можем получить это бесплатно, если мы выберем, какую очередь использовать умно.

Итак, общая идея такова: то, что мы пишем, будет IPropagatorBlock с некоторым вводом и выходом. Самый простой способ создать пользовательский IPropagatorBlock - создать один блок, который обрабатывает входные данные, другой блок, который производит результаты, и обрабатывает их как один, используя DataflowBlock.Encapsulate().

Входной блок должен обрабатывать входящие элементы в правильном порядке, поэтому там нет распараллеливания. Он создаст новый Task (на самом деле, TaskCompletionSource, чтобы мы могли установить результат Task), добавьте его в очередь, а затем отправьте элемент для обработки, а также каким-то образом установите результат правильного Task , Поскольку нам не нужно связывать этот блок с чем-либо, мы можем использовать ActionBlock.

Выходной блок должен принимать Task s из очереди, асинхронно ждать их, а затем отправлять их. Но так как все блоки имеют встроенную в них очередь, а блоки, в которых делегаты имеют асинхронное ожидание, встроены, это будет очень просто: new TransformBlock<Task<TOutput>, TOutput>(t => t). Этот блок будет работать как в очереди, так и в качестве выходного блока. Из-за этого нам не нужно иметь дело с какой-либо синхронизацией.

Последний фрагмент головоломки фактически обрабатывает элементы параллельно. Для этого мы можем использовать еще один ActionBlock, на этот раз с набором MaxDegreeOfParallelism. Он будет принимать входные данные, обрабатывать их и задавать результат в правильном Task в очереди.

Соединенный, это может выглядеть следующим образом:

public static IPropagatorBlock<TInput, TOutput> 
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform) 
{ 
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t); 

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
     tuple => tuple.Item2(transform(tuple.Item1)), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    var enqueuer = new ActionBlock<TInput>(
     async item => 
     { 
      var tcs = new TaskCompletionSource<TOutput>(); 
      await processor.SendAsync(
       new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult)); 
      await queue.SendAsync(tcs.Task); 
     }); 

    enqueuer.Completion.ContinueWith(
     _ => 
     { 
      queue.Complete(); 
      processor.Complete(); 
     }); 

    return DataflowBlock.Encapsulate(enqueuer, queue); 
} 

После стольких разговоров, что довольно небольшое количество кода, я думаю.

Кажется, вы очень заботитесь о производительности, поэтому вам может потребоваться тонкая настройка этого кода. Например, имеет смысл установить MaxDegreeOfParallelism блока processor на что-то вроде Environment.ProcessorCount, чтобы избежать переподписки. Кроме того, если задержка более важна, чем пропускная способность для вас, может возникнуть смысл установить MaxMessagesPerTask того же блока на 1 (или другое небольшое число), чтобы при завершении обработки элемента его сразу отправляли на выход.

Кроме того, если вы хотите дросселировать входящие элементы, вы можете установить из enqueuer.

+0

Ничего себе довольно кучу положительных героев, которые я сначала хотел бы переварить и попробовать. Огромное спасибо тем, кто по крайней мере заслужил выдержку ;-) Позвольте мне поиграть с этими идеями, и я вернусь. Queuing Tasks имеет большой смысл, и я удивляюсь, почему я этого не понял раньше. –

+0

ok Я провожу некоторое время, проходя через ваше сообщение, и читаю информацию о потоке данных TPL, здесь пара вопросов, чтобы полностью понять ваше предлагаемое решение: (1) почему вы предлагаете собственный IPropagatorBlock и IDataflowBlock.Encapsulate(), если Transformblock уже существует? (2) Я не вижу, как вы на самом деле планируете связывать блоки. Вы сначала говорите о ActionBlocks TransformBlocks. Из того, что я прочитал, ActionBlock не будет «конечной точкой» всей архитектуры? –

+1

1. Это объясняется во втором абзаце: «TransformBlock» не может обрабатывать элементы параллельно и возвращать их в порядке в одно и то же время. Он может сделать любой из них, но не тот и другой. – svick

Смежные вопросы