2015-05-24 2 views
1

я сначала обеспечить псевдокод и описать его ниже:Хитрый способ сделать Еогеасп в C#

public void RunUntilEmpty(List<Job> jobs) 
{ 
    while (jobs.Any()) // the list "jobs" will be modified during the execution 
    { 
     List<Job> childJobs = new List<Job>(); 

     Parallel.ForEach(jobs, job => // this will be done in parallel 
     { 
      List<Job> newJobs = job.Do(); // after a job is done, it may return new jobs to do 
      lock (childJobs) 
       childJobs.AddRange(newJobs); // I would like to add those jobs to the "pool" 
     }); 

     jobs = childJobs; 
    } 
} 

Как вы можете видеть, я совершаю уникальный тип foreach. Источник, набор (jobs), может быть просто расширен во время выполнения, и это поведение не может быть определено ранее. Когда метод Do() вызывается на объект (здесь, job), он может возвращать новые задания для выполнения и, таким образом, улучшать источник (jobs).

Я мог бы назвать этот метод (RunUntilEmpty) рекурсивно, но, к сожалению, стек может быть действительно огромным и может привести к переполнению.

Не могли бы вы рассказать мне, как достичь этого? Есть ли способ делать подобные действия на C#?

+6

Похоже, вы действительно должны смотреть на [поток данных TPL] (https://msdn.microsoft.com/en-us/library/hh228603%28v=vs.110%29.aspx). Помимо всего прочего, 'List ' не является потокобезопасным ... –

+0

Ваша 'lock (jobs)' покрывает AddRange, но не Parallel.ForEach. Это означает, что на работе все еще есть состояние гонки. –

+1

Я только что отредактировал его. Что вы думаете о таком решении? –

ответ

2

Если я правильно понимаю, вы в основном начинаете с некоторой коллекции объектов Job, каждая из которых представляет собой некоторую задачу, которая сама может создать один или несколько новых объектов Job в результате выполнения своей задачи.

Ваш обновленный пример кода выглядит так, как будто это в основном выполнит это. Но учтите, что, как комментирует CommuSoft, он не будет наиболее эффективно использовать ваши ядра процессора. Поскольку вы только обновляете список заданий после завершения каждой группы заданий, нет возможности для запуска вновь созданных заданий до все из ранее сгенерированных заданий завершены.

Лучшая реализация будет использовать одну очередь заданий, постоянно получая новые объекты Job для выполнения, поскольку старые завершены.

Я согласен с тем, что поток данных TPL может быть полезным способом реализации этого. Однако, в зависимости от ваших потребностей, вы можете найти его достаточно простым, чтобы просто поставить очередь задач непосредственно в пул потоков и использовать CountdownEvent для отслеживания хода работы, чтобы ваш метод RunUntilEmpty() знал, когда нужно возвращаться.

Без a good, minimal, complete code example невозможно предоставить ответ, содержащий аналогичный полный пример кода. Но, надеюсь, ниже фрагмент кода иллюстрирует основную идею достаточно хорошо:

public void RunUntilEmpty(List<Job> jobs) 
{ 
    CountdownEvent countdown = new CountdownEvent(1); 

    QueueJobs(jobs, countdown); 

    countdown.Signal(); 
    countdown.Wait(); 
} 

private static void QueueJobs(List<Job> jobs, CountdownEvent countdown) 
{ 
    foreach (Job job in jobs) 
    { 
     countdown.AddCount(1); 

     Task.Run(() => 
     { 
      // after a job is done, it may return new jobs to do 
      QueueJobs(job.Do(), countdown); 

      countdown.Signal(); 
     }); 
    } 
} 

Основная идея заключается в том, чтобы в очереди новую задачу для каждого Job объекта, увеличивая счетчик CountdownEvent для каждой задачи, которая поставлена ​​в очередь. Задачи сами по себе сделать три вещи:

  1. Выполнить метод Do(),
  2. Очереди новых задач, используя метод QueueJobs() таким образом, чтобы счетчик на CountdownEvent объекта увеличивается соответственно, и
  3. сигнализировать о CountdownEvent, декременте его счетчик для текущей задачи

Основная информация RunUntilEmpty() сигнализирует CountdownEvent, чтобы указать счет, который был внесен на счетчик объекта, когда он создал его, а затем ждет, пока счетчик достигнет нуля.

Обратите внимание, что звонки на номер QueueJobs(): нет рекурсивный. Метод QueueJobs() не вызывается сам по себе, а анонимный метод, объявленный внутри него, который сам также не вызывается QueueJobs(). Таким образом, здесь нет проблемы с переполнением стека.

Ключевая особенность вышеизложенного заключается в том, что задачи постоянно ставятся в очередь, когда они становятся известными, то есть когда они возвращаются ранее выполненными вызовами Do(). Таким образом, доступные ядра ЦП остаются занятыми пулом потоков, по крайней мере, в той мере, в какой любой завершенный метод Do() фактически возвратил любой новый объект Job для запуска. Это касается основной проблемы с версией кода, который вы включили в свой вопрос.