Как я могу переписать код, который код завершает, когда BOTH transformblocks завершен? Я думал, что завершение означает, что он отмечен как полный И «очередь в очереди» пуста?TPL Dataflow, завершение гарантии только при завершении ВСЕХ блоков данных источника
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
Я отредактировал код, добавив количество входных буферов для каждого блока преобразования. Очевидно, что все 100 элементов передаются в каждый из блоков преобразования. Но как только один из трансформационных блоков заканчивается, процессорный блок не принимает больше элементов, и вместо этого входной буфер неполного трансформационного блока просто очищает входной буфер.
Опасайтесь, чтобы блоки преобразования могли не получать все сообщения из блока широковещательной передачи. Они получают сообщение _latest_. Если широковещательному блоку предлагаются сообщения быстрее, чем блок преобразования может их принять, блок преобразования будет пропускать сообщения. Кроме того, вы должны «ждать» на «SendAsync (i)», если вы хотите обеспечить порядок сообщений и т. Д. – urbanhusky