2016-11-22 4 views
4
эксперты

.Net TPL,Уведомить задачу, когда другие задачи полные

Примечание: Невозможно использовать библиотеку DataFlow; Дополнения не допускаются.

У меня есть четыре задачи, как показано на рисунке ниже:

enter image description here

  • Task_1 (data_producer) -> читает записи из большого файла (> 500000 записей) и добавляет записи в BlockingCollection

  • task_2, task_3 (data_consumers) -> Каждая из этих задач берет записи из BlockingCollection. Каждая задача выполняет некоторую работу над записью, взятой из BlockingCollection (связанная с сетью), и по завершении каждая задача может добавить запись в очередь результатов. Порядок обработки НЕ ВАЖНО.

  • task_4 (процессор результатов) -> Принимает записи из результатов_выхода и записывает в выходной файл.

Я тогда ждать задачи закончить, т.е .:

Task.WhenAll(t1, t2, t3, t4) 

Так что у меня одна задача продюсера, несколько задач потребителей, и одна задача для сохранения результатов.

Мой вопрос:

Как оповестить задачу 4, когда задачи 2 и 3 будут завершены, так что задача 4 также знает, когда закончится?

Я нашел много примеров, которые «перемещают» данные из одной задачи в другую линейным «конвейером», но не нашли примеров, иллюстрирующих вышеизложенное; то есть, как уведомить задачу 4, когда задачи 2 и 3 завершены, так что она будет знать, когда нужно закончить.

Моя первоначальная мысль состоит в том, чтобы «зарегистрировать» задачу 2 и 3 с задачей 4 и просто контролировать состояние каждой зарегистрированной задачи - когда задачи 2 и 3 больше не выполняются, тогда задача 4 может остановиться (если очередь результатов также пуст).

Заранее спасибо.

+0

Вы не можете добавить [NuGet пакет для 'TPL Dataflow'] (https://www.nuget.org/packages/Microsoft.Tpl.Dataflow) для вашего проекта? – VMAtm

+0

Исправить - для этого конкретного проекта поток данных TPL не разрешен. – bdcoder

ответ

0

Это немного расширения на что Thomas уже сказал.

Используя BlockingCollection, вы можете позвонить ему GetConsumingEnumerable() и просто обработать его как обычный цикл foreach. Это позволит вашим задачам «наследовать». Единственное, что вам нужно сделать, это добавить одну дополнительную задачу, которая следит за задачами 2 и 3, чтобы увидеть, когда они заканчиваются, и называть их полным добавлением.

private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>(); 
private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>(); 

Task RunProcess() 
{ 
    Task1Start(); 
    var t2 = Stage2Start(); 
    var t3 = Stage2Start(); 
    Stage2MonitorStart(t2,t3); 
    retrun Task4Start(); 
} 

public void Task1Start() 
{ 
    Task.Run(()=> 
    { 
     foreach(var item in GetFileSource()) 
     { 
      var processedItem = Process(item); 
      _stageOneBlockingCollection.Add(processedItem); 
     } 
     _stageOneBlockingCollection.CompleteAdding(); 
    } 
} 

public Task Stage2Start() 
{ 
    return Task.Run(()=> 
    { 
     foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable()) 
     { 
      var processedItem = ProcessStage2(item); 
      _stageTwoBlockingCollection.Add(processedItem); 
     } 
    } 
} 

void Stage2MonitorStart(params Task[] tasks) 
{ 
    //Once all tasks complete mark the collection complete adding. 
    Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding()); 
} 

public Task Stage4Start() 
{ 
    return Task.Run(()=> 
    { 
     foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable()) 
     { 
      var processedItem = ProcessStage4(item); 
      WriteToOutputFile(processedItem); 
     } 
    } 
} 
+0

Это может сделать это ... Попробует ... – bdcoder

+0

Это делает точное поведение. Все задачи запускаются одновременно, обратите внимание на 'Task.Run' во всех функциях. Задача, возвращаемая из «RunProcess», представляет собой задачу, которая представляет собой завершение всего процесса. –

+0

Yup - выглядит как ваш победитель - большое спасибо! – bdcoder

0

Если вы используете BlockingCollection также для results_queue, вы можете реализовать эти уведомления с использованием свойств BlockingCollection.IsCompleted и BlockingCollection.IsAddingCompleted. процесса:

  • task1 метод вызовы BlockingCollection.CompleteAdding() на сбор ввода, когда больше нет записей в входном файле.
  • task2 и task3 check regulary property IsCompleted на входной коллекции. Это свойство истинно, когда коллекция ввода пуста и производитель называется методом CompleteAdding(). После того как это свойство истинно, задачи 2 и 3 завершены, и они могут вызвать метод CompleteAdding() в очереди результатов и закончить работу.
  • task4 может обрабатывать записи в result_queue по мере их поступления или ожидать, что очередь результатов IsAddingCompleted станет действительной, а затем начнет обработку. Работа task4 завершается, когда свойство IsCompleted истинно в очереди результатов.

Edit: Я не уверен, если вы знакомы с этими IsCompleted и IsAddingCompleted свойствами. Они разные и идеально подходят для вашего дела. Я не думаю, что вам нужны другие элементы синхронизации, кроме свойств BlockingCollection. Пожалуйста, спросите, требуется ли дополнительное объяснение!

BlockingCollection<int> inputQueue; 
    BlockingCollection<int> resultQueue; 

    public void StartTasks() 
    { 
     inputQueue = new BlockingCollection<int>(); 
     resultQueue = new BlockingCollection<int>(); 

     Task task1 = Task.Run(() => Task1()); 
     Task task2 = Task.Run(() => Task2_3()); 
     Task task3 = Task.Run(() => Task2_3()); 
     Task[] tasksInTheMiddle = new Task[] { task2, task3 }; 
     Task waiting = Task.Run(() => Task.WhenAll(tasksInTheMiddle).ContinueWith(x => resultQueue.CompleteAdding())); 
     Task task4 = Task.Run(() => Task4()); 

     //Waiting for tasks to finish 
    } 
    private void Task1() 
    { 
     while(true) 
     { 
      int? input = ReadFromInputFile(); 
      if (input != null) 
      { 
       inputQueue.Add((int)input); 
      } 
      else 
      { 
       inputQueue.CompleteAdding(); 
       break; 
      } 
     } 
    } 

    private void Task2_3() 
    { 
     while(inputQueue.IsCompleted) 
     { 
      int input = inputQueue.Take(); 
      resultQueue.Add(input); 
     } 
    } 

    private void Task4() 
    { 
     while(resultQueue.IsCompleted) 
     { 
      int result = resultQueue.Take(); 
      WriteToOutputFile(result); 
     } 
    } 
+0

Непонятно, как это работает. Задача 2 и задача 3 все равно могут добавлять записи в очередь результатов даже после того, как будет достигнут конец входного файла. То, что мне действительно нужно знать, - это когда задача 2 и задача 3 закончены (доработаны до завершения), поэтому я думал о мониторинге состояния этих задач, чтобы убедиться, что все результаты выполнены. – bdcoder

+0

Я отредактирую свой ответ на более ясный. – Thomas

+0

Теперь я прочитал комментарий Скотта Чемберлена. Лучше всего, наверное, совместить мое решение с ним. Нет смысла ждать с окончательной обработкой task4 для task2 и 3, чтобы закончить, если вы можете сделать это параллельно. Но из его решения лучше использовать другую задачу (Continue.WhenAll), чтобы установить CompleteAdding в очередь результатов - таким образом вы действительно уверены, когда эти задачи будут завершены. В промежутке task4 может вызывать метод Take() в очереди результатов и одновременно добавлять в выходной файл (в то время как Task2 и 3 все еще записываются). – Thomas

0

Задача вы описываете может органично вписываться в TPL Dataflow library, небольшие дополнения для самого TPL (он может быть включен в проект через nuget package, .NET 4.5 поддерживается), вы просто легко ввести что-то вроде этого потока (код обновлен на основе комментариев с BroadcastBlock):

var buffer = new BroadcastBlock<string>(); 
var consumer1 = new TransformBlock<string, string>(s => { /* your action here for a string */}); 
var consumer2 = new TransformBlock<string, string>(s => { /* your action here for a string */}); 
var resultsProcessor = new ActionBlock<string>(s => { /* your logging logic here */ }); 

Do не уверен в своей логике решения, поэтому я думал, что вы просто управлять строками здесь. Вы должны asynchronously send все входящие данные для первого блока (если вы Post ваши данные, если буфер перегружен, сообщение будет отброшено), а также ссылка блоки между собой, как это:

buffer.LinkTo(consumer1, new DataflowLinkOptions { PropagateCompletion = true }); 
buffer.LinkTo(consumer2, new DataflowLinkOptions { PropagateCompletion = true }); 
consumer1.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true }); 
consumer2.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true }); 

foreach (var s in IncomingData) 
{ 
    await buffer.SendAsync(s); 
} 
buffer.Complete(); 

Если ваши потребители должны как процесс все элементов, то вы должны использовать BroadcastBlock (может возникнуть некоторая issues about the guaranteed delivery), другой вариант - отфильтровать ваши сообщения потребителями (может быть, остатком от идентификатора сообщения по количеству потребителей), но в этом случае вам следует установить ссылку другому потребителю, который «поймает» все сообщения, которые по какой-то причине не были использованы.

Как вы можете видеть, связи между блоками создаются с полным распространением, так что после этого вы можете просто прикрепить к свойству .Completion задачи для resultsProcessor:

resultsProcessor.Completion.ContinueWith(t => { /* Processing is complete */ }); 
+0

Обратите внимание, что BufferBlock будет предлагать товар только первому потребителю, который не предназначен OP. Чтобы преодолеть это, вы должны связать BufferBlock с TransmitBlock и связать TransmitBlock с каждым из потребителей. –

+0

Также обратите внимание, что SendAsync следует ожидать. –

+0

@EyalPerry благодарит вас за обмен! – VMAtm