2013-09-01 2 views
4

Как реализовать эффективный алгоритм ограничения скорости (N выполнения определенного действия за промежуток времени) при использовании .Net 4.0/4.5 Задачи? Например, я хочу, чтобы SendMessageAsync блокировал (не отправляя другое сообщение), если больше N сообщений, отправленных за секунду.Эффективное ограничение скорости асинхронных задач (N выполнений за промежуток времени)

 await Task.WhenAll(
        Enumerable.Range(0, TOTAL_MESSAGES) 
        .Select(async x => 
        { 
         await clientSession.SendMessageAsync(CreateMessage()); 
        })); 

Я попытался с помощью Task.Delay внутри SendMessageAsync, но так как в ожидании Task.Delay мгновенную возвращается, то следующее сообщение будет отправлено без блокировки.

public async Task<ResponseMessage> SendMessageAsync(RequestMessage message) 
{ 

    int sleepTime; 
    if (throttler.ShouldThrottle(out sleepTime)) 
    { 
     await Task.Delay(sleepTime); 
    } 

    return await InternalMessageWithoutWaitingAsync(message); 
} 

Я мог бы изменить await Task.Delay(sleepTime) в Thread.Sleep(sleepTime) таким образом ждет, прежде чем асинхронная, но мне интересно, если это хорошая практика, при использовании задач.

+0

Я бы только начать следующую задачу, когда сигналы «throttler», «идти». Я считаю, что лучший шаблон, чем запуск всех задач и ожидание сразу с большинством из них. – usr

+2

Операции, зависящие от времени, такие как дросселирование * по интервалу времени *, лучше всего использовать с использованием Rx вместо 'async'. –

+0

@usr - или передать задание дросселю и позволить ему начинать его, когда он думает, что это правильно. В любом случае - почему бы вам не написать это как ответ? – Vadim

ответ

2

Использование Thread.Sleep не является хорошей практикой в ​​коде async.

Вы могли бы получить то, что вам нужно, как это:

private SemaphoreSlim _semaphore = new SemaphoreSlim(N); 
public async Task<ResponseMessage> SendMessageAsync(RequestMessage message) 
{ 
    await _semaphore.WaitAsync(); 
    try 
    { 
    return await InternalMessageWithoutWaitingAsync(message); 
    } 
    finally 
    { 
    ReleaseSemaphoreAfterDelayAsync(); 
    } 
} 

private async Task ReleaseSemaphoreAfterDelayAsync() 
{ 
    await Task.Delay(TimeInterval); 
    _semaphore.Release(); 
} 
+0

Я использовал этот фрагмент кода для генерации 400K msgs. Проблема в том, что многие задачи создаются, потребляя 800 МБ ОЗУ, в то время как только N фактически запускают обрабатывающие сообщения, остальные блокируются семафором. Как я могу прекратить создавать новые задачи в ожидании завершения обработки? '' ' ждет Task.WhenAll ( Enumerable.Range (0, 400000) .Select (асинхронный х => { вара responsePdu = жду client.SendPduAsync (NewMessage()); [...] })); '' ' –

+0

Когда я запускаю [этот код] (https://gist.github.com/StephenCleary/7193d2e9b66eabe559f7) в ANTI Memory Profiler, я вижу 400 000 задач, занимающих 17 600 000 байт. Вероятно, некоторые локальные переменные поднимаются, что вызывает давление памяти; см. ресурсы Стивена Тууба [здесь] (http://msdn.microsoft.com/en-us/magazine/hh456402.aspx) и [здесь] (http://channel9.msdn.com/Events/Build/BUILD2011/TOOL -829T) для настройки идей. –

+0

Сверху моей головы: 1) Убедитесь, что 'InternalMessageWithoutWaitingAsync' является отдельным методом и что' SendMessageAsync' действительно является только кодом выше (т. Е. Удаляет все локальные вары из этого метода); 2) Вы можете перемещать семафор более дросселирующимся в вашем коде; 3) Пересмотреть Rx; 'async' просто не был предназначен для этого. –

Смежные вопросы