наблюдатель следующие функции:Ограничение количества одновременных задач в .NET 4.5
public Task RunInOrderAsync<TTaskSeed>(IEnumerable<TTaskSeed> taskSeedGenerator,
CreateTaskDelegate<TTaskSeed> createTask,
OnTaskErrorDelegate<TTaskSeed> onError = null,
OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class
{
Action<Exception, TTaskSeed> onFailed = (exc, taskSeed) =>
{
if (onError != null)
{
onError(exc, taskSeed);
}
};
Action<Task> onDone = t =>
{
var taskSeed = (TTaskSeed)t.AsyncState;
if (t.Exception != null)
{
onFailed(t.Exception, taskSeed);
}
else if (onSuccess != null)
{
onSuccess(t, taskSeed);
}
};
var enumerator = taskSeedGenerator.GetEnumerator();
Task task = null;
while (enumerator.MoveNext())
{
if (task == null)
{
try
{
task = createTask(enumerator.Current);
Debug.Assert(ReferenceEquals(task.AsyncState, enumerator.Current));
}
catch (Exception exc)
{
onFailed(exc, enumerator.Current);
}
}
else
{
task = task.ContinueWith((t, taskSeed) =>
{
onDone(t);
var res = createTask((TTaskSeed)taskSeed);
Debug.Assert(ReferenceEquals(res.AsyncState, taskSeed));
return res;
}, enumerator.Current).TaskUnwrap();
}
}
if (task != null)
{
task = task.ContinueWith(onDone);
}
return task;
}
Где TaskUnwrap
это состояние сохранение версия стандартного Task.Unwrap
:
public static class Extensions
{
public static Task TaskUnwrap(this Task<Task> task, object state = null)
{
return task.Unwrap().ContinueWith((t, _) =>
{
if (t.Exception != null)
{
throw t.Exception;
}
}, state ?? task.AsyncState);
}
}
Метод RunInOrderAsync
позволяет запускать N задач асинхронно, но последовательно - один за другим. Фактически, он выполняет задачи, созданные из данных семян, с пределом параллелизма 1.
Предположим, что задачи, созданные из семян делегатом createTask
, не соответствуют нескольким параллельным задачам.
Теперь я хотел бы бросить в параметре maxConcurrencyLevel, поэтому функция подписи будет выглядеть следующим образом:
Task RunInOrderAsync<TTaskSeed>(int maxConcurrencyLevel,
IEnumerable<TTaskSeed> taskSeedGenerator,
CreateTaskDelegate<TTaskSeed> createTask,
OnTaskErrorDelegate<TTaskSeed> onError = null,
OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class
И здесь я немного застрял.
так есть такие вопросы:
- System.Threading.Tasks - Limit the number of concurrent Tasks
- Task based processing with a limit for concurrent task number with .NET 4.5 and c#
- .Net TPL: Limited Concurrency Level Task scheduler with task priority?
Которые в основном предлагают два пути решения данной проблемы:
- Использование
Parallel.ForEach
сParallelOptions
с указанием значения свойстваMaxDegreeOfParallelism
, равного требуемому максимальному уровню параллелизма. - Использование пользовательского
TaskScheduler
с желаемым значениемMaximumConcurrencyLevel
.
Второй подход не сокращает его, поскольку все задействованные задачи должны использовать один и тот же экземпляр планировщика задач. Для этого все методы, используемые для возврата Task
, должны иметь перегруз, принимающий пользовательский экземпляр TaskScheduler
. К сожалению, Microsoft не очень согласна с этим. Например, SqlConnection.OpenAsync
не принимает такой аргумент (но TaskFactory.FromAsync
делает).
Первый подход предполагает, что у меня будет конвертировать задания на действия, что-то вроде этого:
() => t.Wait()
Я не уверен, что это хорошая идея, но я буду рад получить больший вклад в том, что ,
Другой подход заключается в использовании TaskFactory.ContinueWhenAny
, но это грязно.
Любые идеи?
EDIT 1
Я хотел бы уточнить причины желать предела. Наши задачи в конечном счете выполняют SQL-запросы на одном сервере SQL.Мы хотим, чтобы ограничить количество параллельных исходящих SQL-запросов. Вполне возможно, что другие операторы SQL будут выполняться одновременно с другими фрагментами кода, но это пакетный процессор и потенциально может наводнить сервер.
Теперь имейте в виду, что хотя мы говорим об одном и том же SQL-сервере, на этом же сервере имеется множество баз данных. Таким образом, речь идет не о ограничении количества открытых SQL-подключений к одной и той же базе данных, потому что база данных может быть совсем не такой.
Вот почему решения на основе doom's day, такие как ThreadPool.SetMaxThreads()
, не имеют отношения к делу.
Сейчас, о SqlConnection.OpenAsync
. Он был сделан асинхронным по какой-либо причине - он мог бы сделать обратный переход к серверу и, следовательно, может быть подвержен сетевой латентности и другим прекрасным побочным эффектам распределенной среды. Таким образом, это не отличается от других асинхронных методов, которые принимают параметр TaskScheduler
. Я склонен думать, что не принимать его - это просто ошибка.
EDIT 2
Я хотел бы сохранить асинхронный дух оригинальной функции. Поэтому я хочу избежать каких-либо явных блокирующих решений.
EDIT 3
Благодаря @fsimonazzi's answer теперь у меня есть рабочая реализация требуемой функциональности. Вот код:
var sem = new SemaphoreSlim(maxConcurrencyLevel);
var tasks = new List<Task>();
var enumerator = taskSeedGenerator.GetEnumerator();
while (enumerator.MoveNext())
{
tasks.Add(sem.WaitAsync().ContinueWith((_, taskSeed) =>
{
Task task = null;
try
{
task = createTask((TTaskSeed)taskSeed);
if (task != null)
{
Debug.Assert(ReferenceEquals(task.AsyncState, taskSeed));
task = task.ContinueWith(t =>
{
sem.Release();
onDone(t);
});
}
}
catch (Exception exc)
{
sem.Release();
onFailed(exc, (TTaskSeed)taskSeed);
}
return task;
}, enumerator.Current).TaskUnwrap());
}
return Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => sem.Dispose());
BlockingCollection позволяет вам установить BoundedCapacity – Paparazzi
Ну, конечно SQLConnection.openAsync не имеет этого варианта. Он не собирается записывать нити N. На самом деле, хорошие шансы он не потребляет * any *. Если ваши задачи настолько непоколебимы, что вы должны предоставить такую гарантию, тогда ThreadPool.SetMaxThreads() является ядерным вариантом. –
См. ** ИЗМЕНИТЬ 1 **. – mark