2013-12-03 2 views
11

наблюдатель следующие функции:Ограничение количества одновременных задач в .NET 4.5

public Task RunInOrderAsync<TTaskSeed>(IEnumerable<TTaskSeed> taskSeedGenerator, 
    CreateTaskDelegate<TTaskSeed> createTask, 
    OnTaskErrorDelegate<TTaskSeed> onError = null, 
    OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class 
{ 
    Action<Exception, TTaskSeed> onFailed = (exc, taskSeed) => 
    { 
     if (onError != null) 
     { 
      onError(exc, taskSeed); 
     } 
    }; 

    Action<Task> onDone = t => 
    { 
     var taskSeed = (TTaskSeed)t.AsyncState; 
     if (t.Exception != null) 
     { 
      onFailed(t.Exception, taskSeed); 
     } 
     else if (onSuccess != null) 
     { 
      onSuccess(t, taskSeed); 
     } 
    }; 

    var enumerator = taskSeedGenerator.GetEnumerator(); 
    Task task = null; 
    while (enumerator.MoveNext()) 
    { 
     if (task == null) 
     { 
      try 
      { 
       task = createTask(enumerator.Current); 
       Debug.Assert(ReferenceEquals(task.AsyncState, enumerator.Current)); 
      } 
      catch (Exception exc) 
      { 
       onFailed(exc, enumerator.Current); 
      } 
     } 
     else 
     { 
      task = task.ContinueWith((t, taskSeed) => 
      { 
       onDone(t); 
       var res = createTask((TTaskSeed)taskSeed); 
       Debug.Assert(ReferenceEquals(res.AsyncState, taskSeed)); 
       return res; 
      }, enumerator.Current).TaskUnwrap(); 
     } 
    } 

    if (task != null) 
    { 
     task = task.ContinueWith(onDone); 
    } 

    return task; 
} 

Где TaskUnwrap это состояние сохранение версия стандартного Task.Unwrap:

public static class Extensions 
{ 
    public static Task TaskUnwrap(this Task<Task> task, object state = null) 
    { 
     return task.Unwrap().ContinueWith((t, _) => 
     { 
      if (t.Exception != null) 
      { 
       throw t.Exception; 
      } 
     }, state ?? task.AsyncState); 
    } 
} 

Метод RunInOrderAsync позволяет запускать N задач асинхронно, но последовательно - один за другим. Фактически, он выполняет задачи, созданные из данных семян, с пределом параллелизма 1.

Предположим, что задачи, созданные из семян делегатом createTask, не соответствуют нескольким параллельным задачам.

Теперь я хотел бы бросить в параметре maxConcurrencyLevel, поэтому функция подписи будет выглядеть следующим образом:

Task RunInOrderAsync<TTaskSeed>(int maxConcurrencyLevel, 
    IEnumerable<TTaskSeed> taskSeedGenerator, 
    CreateTaskDelegate<TTaskSeed> createTask, 
    OnTaskErrorDelegate<TTaskSeed> onError = null, 
    OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class 

И здесь я немного застрял.

так есть такие вопросы:

Которые в основном предлагают два пути решения данной проблемы:

  1. Использование Parallel.ForEach с ParallelOptions с указанием значения свойства MaxDegreeOfParallelism, равного требуемому максимальному уровню параллелизма.
  2. Использование пользовательского TaskScheduler с желаемым значением MaximumConcurrencyLevel.

Второй подход не сокращает его, поскольку все задействованные задачи должны использовать один и тот же экземпляр планировщика задач. Для этого все методы, используемые для возврата Task, должны иметь перегруз, принимающий пользовательский экземпляр TaskScheduler. К сожалению, Microsoft не очень согласна с этим. Например, SqlConnection.OpenAsync не принимает такой аргумент (но TaskFactory.FromAsync делает).

Первый подход предполагает, что у меня будет конвертировать задания на действия, что-то вроде этого:

() => t.Wait() 

Я не уверен, что это хорошая идея, но я буду рад получить больший вклад в том, что ,

Другой подход заключается в использовании TaskFactory.ContinueWhenAny, но это грязно.

Любые идеи?

EDIT 1

Я хотел бы уточнить причины желать предела. Наши задачи в конечном счете выполняют SQL-запросы на одном сервере SQL.Мы хотим, чтобы ограничить количество параллельных исходящих SQL-запросов. Вполне возможно, что другие операторы SQL будут выполняться одновременно с другими фрагментами кода, но это пакетный процессор и потенциально может наводнить сервер.

Теперь имейте в виду, что хотя мы говорим об одном и том же SQL-сервере, на этом же сервере имеется множество баз данных. Таким образом, речь идет не о ограничении количества открытых SQL-подключений к одной и той же базе данных, потому что база данных может быть совсем не такой.

Вот почему решения на основе doom's day, такие как ThreadPool.SetMaxThreads(), не имеют отношения к делу.

Сейчас, о SqlConnection.OpenAsync. Он был сделан асинхронным по какой-либо причине - он мог бы сделать обратный переход к серверу и, следовательно, может быть подвержен сетевой латентности и другим прекрасным побочным эффектам распределенной среды. Таким образом, это не отличается от других асинхронных методов, которые принимают параметр TaskScheduler. Я склонен думать, что не принимать его - это просто ошибка.

EDIT 2

Я хотел бы сохранить асинхронный дух оригинальной функции. Поэтому я хочу избежать каких-либо явных блокирующих решений.

EDIT 3

Благодаря @fsimonazzi's answer теперь у меня есть рабочая реализация требуемой функциональности. Вот код:

 var sem = new SemaphoreSlim(maxConcurrencyLevel); 
     var tasks = new List<Task>(); 

     var enumerator = taskSeedGenerator.GetEnumerator(); 
     while (enumerator.MoveNext()) 
     { 
      tasks.Add(sem.WaitAsync().ContinueWith((_, taskSeed) => 
      { 
       Task task = null; 
       try 
       { 
        task = createTask((TTaskSeed)taskSeed); 
        if (task != null) 
        { 
         Debug.Assert(ReferenceEquals(task.AsyncState, taskSeed)); 
         task = task.ContinueWith(t => 
         { 
          sem.Release(); 
          onDone(t); 
         }); 
        } 
       } 
       catch (Exception exc) 
       { 
        sem.Release(); 
        onFailed(exc, (TTaskSeed)taskSeed); 
       } 
       return task; 
      }, enumerator.Current).TaskUnwrap()); 
     } 

     return Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => sem.Dispose()); 
+0

BlockingCollection позволяет вам установить BoundedCapacity – Paparazzi

+0

Ну, конечно SQLConnection.openAsync не имеет этого варианта. Он не собирается записывать нити N. На самом деле, хорошие шансы он не потребляет * any *. Если ваши задачи настолько непоколебимы, что вы должны предоставить такую ​​гарантию, тогда ThreadPool.SetMaxThreads() является ядерным вариантом. –

+0

См. ** ИЗМЕНИТЬ 1 **. – mark

ответ

13

С помощью семафора вы можете использовать дроссельную заслонку. Используя метод WaitAsync(), вы получаете ожидаемую асинхронность. Нечто подобное (обработка ошибок удалена для краткости):

private static async Task DoStuff<T>(int maxConcurrency, IEnumerable<T> items, Func<T, Task> createTask) 
{ 
    using (var sem = new SemaphoreSlim(maxConcurrency)) 
    { 
     var tasks = new List<Task>(); 

     foreach (var item in items) 
     { 
      await sem.WaitAsync(); 
      var task = createTask(item).ContinueWith(t => sem.Release()); 
      tasks.Add(task); 
     } 

     await Task.WhenAll(tasks); 
    } 
} 

Edited удалить ошибку, где семафор может быть расположен перед всеми операциями релиза имели шанс быть выполнено.

+0

Может ли последний 'WhenAll' быть заменен на' ContinueWhenAll'? – mark

+0

Если у вас есть продолжение, они да используют ContinueWhenAll. – fsimonazzi

+0

Спасибо. Ваш подход оказался довольно простым в реализации - отправил код в ** EDIT 3 **. – mark

4

Две лучшие решения, доступные сегодня Semaphoreslim (согласно @fsimonazzi's answer) и блок TPL Потоковые (т.е. ActionBlock<T> или TransformBlock<T>). Оба этих блока имеют simple way to set the level of concurrency.

Parallel не является идеальным подходом, поскольку вам необходимо блокировать ваши асинхронные операции, используя поток потока нитей для каждого из них.

Также TaskScheduler здесь не работает. FYI, TaskScheduler- «унаследовано» по методу async, как я опишу на my async intro blog post. Причина, по которой это не будет работать для вашей проблемы, заключается в том, что планировщики задач управляют только , выполняя задачи, а не события задач - поэтому операции SQL, такие как OpenAsync, не «подсчитываются» по отношению к пределу параллелизма.

+1

Не могли бы вы набросать подход, используя блок потока данных TPL? – mark

+0

См. Мой ответ, в котором я использовал «ActionBlock ». –

4

Вот вариация ответа @ fsimonazzi без SemaphoreSlim, такая же крутая, как и есть.

private static async Task DoStuff<T>(int maxConcurrency, IEnumerable<T> items, Func<T, Task> createTask) 
{ 
    var tasks = new List<Task>(); 
    foreach (var item in items) 
    { 
     if (tasks.Count >= maxConcurrency) 
     { 
      await Task.WhenAll(tasks); 
      tasks.Clear(); 
     } 
     var task = createTask(item); 
     tasks.Add(task); 
    } 
    await Task.WhenAll(tasks); 
} 
3

Вот вариант ответа @ scott-turner, такой же классный, как и есть. Его ответ отправляет работу в куски maxConcurrency и ждет, пока каждый кусок не завершится полностью, прежде чем отправить следующий фрагмент.Это изменение представляет собой новые задачи по мере необходимости, чтобы гарантировать, что задачи maxConcurrency всегда находятся в полете. Он также демонстрирует работу с Task < T> вместо задачи.

Обратите внимание, что преимущество над версией SemaphoreSlim с помощью SemaphoreSlim вам нужно подождать двух разных типов задач - семафоров и работы. Это проблематично, если работа имеет тип Task < T> вместо задачи.

private static async Task<R[]> concurrentAsync<T, R>(int maxConcurrency, IEnumerable<T> items, Func<T, Task<R>> createTask) 
    { 
     var allTasks = new List<Task<R>>(); 
     var activeTasks = new List<Task<R>>(); 
     foreach (var item in items) 
     { 
      if (activeTasks.Count >= maxConcurrency) 
      { 
       var completedTask = await Task.WhenAny(activeTasks); 
       activeTasks.Remove(completedTask); 
      } 
      var task = createTask(item); 
      allTasks.Add(task); 
      activeTasks.Add(task); 
     } 
     return await Task.WhenAll(allTasks); 
    } 
+0

Я больше не использую подход на основе задач для параллелизма и асинхронности. См. Мой другой вопрос и принятый ответ - http://stackoverflow.com/questions/29263695/how-to-constraint-concurrency-the-right-way-in-rx-net – mark

1

Здесь уже много ответов. Я хочу ответить на комментарий, который вы сделали в ответ Стивенса, о примере использования потока данных TPL для ограничения параллелизма. Даже жестко вы оставили комментарий в другом ответе на этот вопрос, что вы больше не используете подход, основанный на задаче, это может помочь другим людям.

Пример использования ActionBlock<T> для этого:

private static async Task DoStuff<T>(int maxConcurrency, IEnumerable<T> items, Func<T, Task> createTask) 
{ 
    var ab = new ActionBlock<T>(createTask, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxConcurrency }); 

    foreach (var item in items) 
    { 
     ab.Post(item); 
    } 

    ab.Complete(); 
    await ab.Completion; 
} 

Более подробную информацию о TPL DataFlow можно найти здесь: https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow(v=vs.110).aspx

Смежные вопросы