2012-11-22 1 views
20

Как я могу переписать код, который код завершает, когда BOTH transformblocks завершен? Я думал, что завершение означает, что он отмечен как полный И «очередь в очереди» пуста?TPL Dataflow, завершение гарантии только при завершении ВСЕХ блоков данных источника

public Test() 
    { 
     broadCastBlock = new BroadcastBlock<int>(i => 
      { 
       return i; 
      }); 

     transformBlock1 = new TransformBlock<int, string>(i => 
      { 
       Console.WriteLine("1 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(50); 
       return ("1_" + i); 
      }); 

     transformBlock2 = new TransformBlock<int, string>(i => 
      { 
       Console.WriteLine("2 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(20); 
       return ("2_" + i); 
      }); 

     processorBlock = new ActionBlock<string>(i => 
      { 
       Console.WriteLine(i); 
      }); 

     //Linking 
     broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); 
     broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    } 

    public void Start() 
    { 
     const int numElements = 100; 

     for (int i = 1; i <= numElements; i++) 
     { 
      broadCastBlock.SendAsync(i); 
     } 

     //mark completion 
     broadCastBlock.Complete(); 

     processorBlock.Completion.Wait(); 

     Console.WriteLine("Finished"); 
     Console.ReadLine(); 
    } 
} 

Я отредактировал код, добавив количество входных буферов для каждого блока преобразования. Очевидно, что все 100 элементов передаются в каждый из блоков преобразования. Но как только один из трансформационных блоков заканчивается, процессорный блок не принимает больше элементов, и вместо этого входной буфер неполного трансформационного блока просто очищает входной буфер.

+0

Опасайтесь, чтобы блоки преобразования могли не получать все сообщения из блока широковещательной передачи. Они получают сообщение _latest_. Если широковещательному блоку предлагаются сообщения быстрее, чем блок преобразования может их принять, блок преобразования будет пропускать сообщения. Кроме того, вы должны «ждать» на «SendAsync (i)», если вы хотите обеспечить порядок сообщений и т. Д. – urbanhusky

ответ

24

Вопрос в том, что именно casperOne сказал в своем ответ. Как только первый блок преобразования завершается, процессорный блок переходит в «режим завершения»: он будет обрабатывать оставшиеся элементы в своей очереди ввода, но он не примет никаких новых элементов.

Существует более простой фикс чем разделив ваш процессор блок пополам, хотя: не установлен PropagateCompletion, но вместо того, чтобы установить завершение блока процессора вручную, когда оба преобразования блоков в комплекте:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion) 
    .ContinueWith(_ => processorBlock.Complete()); 
+0

именно то, что я искал. Не знал, что Task.WhenAll возвращает ожидаемую задачу, мою небрежность. –

+0

Мне нужно то же самое, может быть, слишком поздно, но можете ли вы опубликовать обновление о том, где мне нужно добавить конструкцию Task.WhenAll? –

+0

@AttilaHajdrik Возможно, в конце вашего кода настройки потока данных, рядом с вашим 'LinkTo'. – svick

22

Проблема в том, что вы устанавливаете PropagateCompletion property каждый раз, когда вы вызываете LinkTo method, чтобы связать блоки и разные времена ожидания в ваших блоках преобразования.

Из документации по Complete method на IDataflowBlock interface (курсив мой):

Сигналы к IDataflowBlock, что не должны принимать и не производить больше никаких сообщений ни потребляющие больше откладывались сообщений.

Потому что вы шататься ваше время ожидания в каждом из TransformBlock<TInput, TOutput> случаев transformBlock2 (ожидание в течение 20 мсов) завершен до того transformBlock1 (ожидание в течение 50 мсов). transformBlock2 завершает сначала, а затем посылает сигнал processorBlock, который затем говорит: «Я ничего не принимаю» (и transformBlock1 еще не выпустил все свои сообщения).

Отметим, что обработка transformBlock1 до transformBlock1 не является абсолютно гарантия; возможно, что пул потоков (при условии, что вы используете планировщик по умолчанию) будет обрабатывать задачи в другом порядке (но, скорее всего, не будет, поскольку он будет красть работу из очередей после выполнения 20 мс элементов).

Ваш трубопровод выглядит следующим образом:

  broadcastBlock 
     /   \ 
transformBlock1 transformBlock2 
      \   /
      processorBlock 

Для того, чтобы обойти эту проблему, вы хотите иметь трубопровод, который выглядит следующим образом:

  broadcastBlock 
     /   \ 
transformBlock1 transformBlock2 
      |    | 
processorBlock1 processorBlock2 

Который осуществляется только создание двух отдельных ActionBlock<TInput> экземпляры, как так:

// The action, can be a method, makes it easier to share. 
Action<string> a = i => Console.WriteLine(i); 

// Create the processor blocks. 
processorBlock1 = new ActionBlock<string>(a); 
processorBlock2 = new ActionBlock<string>(a); 


// Linking 
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true }); 
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true }); 

затем нужно ждать на обоих Процессор блоков вместо одного:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait(); 

очень важное замечание; при создании ActionBlock<TInput> по умолчанию используется значение MaxDegreeOfParallelism property экземпляра ExecutionDataflowBlockOptions, переданного ему в один.

Это означает, что вызовы Action<T> delegate, которые вы передаете в ActionBlock<TInput>, являются потокобезопасными, только один будет выполняться одновременно.

Потому что теперь у вас есть два ActionBlock<TInput> экземпляры, указывающие на то же Action<T> делегата, вы не гарантируется безопасность потоков.

Если ваш метод является потокобезопасным, то вам не нужно ничего делать (что позволит вам установить свойство MaxDegreeOfParallelism на DataflowBlockOptions.Unbounded, так как нет причин блокировать).

Если это не поточно-безопасный, и вам необходимо его гарантировать, вам необходимо прибегнуть к традиционным примитивам синхронизации, например, lock statement.

В этом случае, вы могли бы сделать это как так (хотя это явно не требуется, так как WriteLine method на Console class потокобезопасно):

// The lock. 
var l = new object(); 

// The action, can be a method, makes it easier to share. 
Action<string> a = i => { 
    // Ensure one call at a time. 
    lock (l) Console.WriteLine(i); 
}; 

// And so on... 
+0

Спасибо за длинный ответ, но я выбрал ответ svick, потому что он напрямую относится к потоку данных TPL и предлагает очень сжатое и простое решение , –

+2

Вы можете легко избежать блокировки, если вы используете тот же ['ExclusiveScheduler'] (http://msdn.microsoft.com/en-us/library/system.threading.tasks.concurrentexclusivesulerpair.exclusivescheduler) для обоих блоков действий. – svick

7

дополнение к svick-х ответ: чтобы быть совместимым с поведением, которое вы получаете с опцией PropagateCompletion, вам также необходимо пересылать исключения в случае, если предыдущий блок был поврежден. Метод расширения как следующий заботится о том, что, как хорошо:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) { 
    if (target == null) return; 
    if (sources.Length == 0) { target.Complete(); return; } 
    Task.Factory.ContinueWhenAll(
     sources.Select(b => b.Completion).ToArray(), 
     tasks => { 
      var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList(); 
      if (exceptions.Count != 0) { 
       target.Fault(new AggregateException(exceptions)); 
      } else { 
       target.Complete(); 
      } 
     } 
    ); 
} 
0

Другие ответы совершенно ясно, почему PropagateCompletion = истинные натворить, когда блок имеет более двух источников.

Чтобы обеспечить простое решение проблемы, вы можете захотеть взглянуть на библиотеку с открытым исходным кодом DataflowEx, которая решает эту проблему с помощью более интеллектуальных правил завершения. (Он использует TPL DataFlow связывающее внутренне, но поддерживает комплексное распространение завершения. Реализация выглядит похожа на WhenAll, но и обрабатывает динамическую ссылку, добавив. Пожалуйста, проверьте Dataflow.RegisterDependency() и TaskEx.AwaitableWhenAll() для осущ подробно.)

Я немного изменил свой код, чтобы сделать все работу используя DataflowEx:

public CompletionDemo1() 
{ 
    broadCaster = new BroadcastBlock<int>(
     i => 
      { 
       return i; 
      }).ToDataflow(); 

    transformBlock1 = new TransformBlock<int, string>(
     i => 
      { 
       Console.WriteLine("1 input count: " + transformBlock1.InputCount); 
       Thread.Sleep(50); 
       return ("1_" + i); 
      }); 

    transformBlock2 = new TransformBlock<int, string>(
     i => 
      { 
       Console.WriteLine("2 input count: " + transformBlock2.InputCount); 
       Thread.Sleep(20); 
       return ("2_" + i); 
      }); 

    processor = new ActionBlock<string>(
     i => 
      { 
       Console.WriteLine(i); 
      }).ToDataflow(); 

    /** rather than TPL linking 
     broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); 
     broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
     transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    **/ 

    //Use DataflowEx linking 
    var transform1 = transformBlock1.ToDataflow(); 
    var transform2 = transformBlock2.ToDataflow(); 

    broadCaster.LinkTo(transform1); 
    broadCaster.LinkTo(transform2); 
    transform1.LinkTo(processor); 
    transform2.LinkTo(processor); 
} 

Полный код here.

Отказ от ответственности: Я являюсь автором DataflowEx, который публикуется под лицензией MIT.

+0

не могли бы вы раскрыть, если вы работаете в Gridsum? В моем вопросе явно упоминалось, что мне нужен ответ для TPL Dataflow, я не хотел использовать стороннее решение для этой проблемы. Благодарю. –

+1

Да, я работаю в Gridsum. Но библиотека полностью бесплатна и с открытым исходным кодом, поэтому я подумал, что это может вам помочь. Никакого коммерческого мышления. Если вам нужен внутренний механизм потока данных TPL, пожалуйста, игнорируйте мой ответ. Но если кто-то нуждается в * решении *, ответ имеет свою ценность. Спасибо :) – Dodd

+0

Обновлен ответ немного подробнее. Отказ от ответственности также добавлен. – Dodd

Смежные вопросы