EDIT: Оказывается, я был очень неправ. TransformBlock
делает возвращает элементы в том же порядке, в каком они вошли, даже если он настроен на параллелизм. Из-за этого код в моем первоначальном ответе совершенно бесполезен, и вместо этого можно использовать обычный TransformBlock
.
Оригинальный ответ:
Насколько я знаю только один параллелизм конструкция в .NET поддерживает возвращение обработанных элементов в том порядке, они пришли: PLINQ с AsOrdered()
. Но мне кажется, что PLINQ не подходит, как вам хочется.
TPL Dataflow, с другой стороны, хорошо подходит, я думаю, но у него нет блока, который бы поддерживал параллельность и возвращал элементы в порядке в одно и то же время (TransformBlock
поддерживает оба из них, но не на в то же время). К счастью, блоки потока данных были разработаны с учетом возможности компоновки, поэтому мы можем создать собственный блок, который это делает.
Но сначала мы должны выяснить, как заказать результаты. Использование параллельного словаря, как вы и предполагали, наряду с некоторым механизмом синхронизации, безусловно, будет работать. Но я думаю, что есть более простое решение: используйте очередь Task
s. В выходной задаче вы удалите Task
, дождитесь ее завершения (асинхронно), и когда это произойдет, вы отправите свой результат.Нам по-прежнему нужна некоторая синхронизация для случая, когда очередь пуста, но мы можем получить это бесплатно, если мы выберем, какую очередь использовать умно.
Итак, общая идея такова: то, что мы пишем, будет IPropagatorBlock
с некоторым вводом и выходом. Самый простой способ создать пользовательский IPropagatorBlock
- создать один блок, который обрабатывает входные данные, другой блок, который производит результаты, и обрабатывает их как один, используя DataflowBlock.Encapsulate()
.
Входной блок должен обрабатывать входящие элементы в правильном порядке, поэтому там нет распараллеливания. Он создаст новый Task
(на самом деле, TaskCompletionSource
, чтобы мы могли установить результат Task
), добавьте его в очередь, а затем отправьте элемент для обработки, а также каким-то образом установите результат правильного Task
, Поскольку нам не нужно связывать этот блок с чем-либо, мы можем использовать ActionBlock
.
Выходной блок должен принимать Task
s из очереди, асинхронно ждать их, а затем отправлять их. Но так как все блоки имеют встроенную в них очередь, а блоки, в которых делегаты имеют асинхронное ожидание, встроены, это будет очень просто: new TransformBlock<Task<TOutput>, TOutput>(t => t)
. Этот блок будет работать как в очереди, так и в качестве выходного блока. Из-за этого нам не нужно иметь дело с какой-либо синхронизацией.
Последний фрагмент головоломки фактически обрабатывает элементы параллельно. Для этого мы можем использовать еще один ActionBlock
, на этот раз с набором MaxDegreeOfParallelism
. Он будет принимать входные данные, обрабатывать их и задавать результат в правильном Task
в очереди.
Соединенный, это может выглядеть следующим образом:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
После стольких разговоров, что довольно небольшое количество кода, я думаю.
Кажется, вы очень заботитесь о производительности, поэтому вам может потребоваться тонкая настройка этого кода. Например, имеет смысл установить MaxDegreeOfParallelism
блока processor
на что-то вроде Environment.ProcessorCount
, чтобы избежать переподписки. Кроме того, если задержка более важна, чем пропускная способность для вас, может возникнуть смысл установить MaxMessagesPerTask
того же блока на 1 (или другое небольшое число), чтобы при завершении обработки элемента его сразу отправляли на выход.
Кроме того, если вы хотите дросселировать входящие элементы, вы можете установить из enqueuer
.
Ничего себе довольно кучу положительных героев, которые я сначала хотел бы переварить и попробовать. Огромное спасибо тем, кто по крайней мере заслужил выдержку ;-) Позвольте мне поиграть с этими идеями, и я вернусь. Queuing Tasks имеет большой смысл, и я удивляюсь, почему я этого не понял раньше. –
ok Я провожу некоторое время, проходя через ваше сообщение, и читаю информацию о потоке данных TPL, здесь пара вопросов, чтобы полностью понять ваше предлагаемое решение: (1) почему вы предлагаете собственный IPropagatorBlock и IDataflowBlock.Encapsulate(), если Transformblock уже существует? (2) Я не вижу, как вы на самом деле планируете связывать блоки. Вы сначала говорите о ActionBlocks TransformBlocks. Из того, что я прочитал, ActionBlock не будет «конечной точкой» всей архитектуры? –
1. Это объясняется во втором абзаце: «TransformBlock» не может обрабатывать элементы параллельно и возвращать их в порядке в одно и то же время. Он может сделать любой из них, но не тот и другой. – svick