3

Я новичок в TPL (Task Parallel Library), и мне сложно с трудом настроить мой процесс для запуска задач параллельно.Лучший способ отправить рассылку писем по электронной почте параллельно

Я работаю над приложением для отправки массовых писем (например, тысячи в минуту, это идея), но когда я вижу производительность процессоров, это не хорошо: Я уверен, что есть много накладных расходов Я не правильно использую библиотеку задач.

Вот мой код:

public async void MainProcess() 
{ 
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE); 

    foreach (var batch in batches.AsParallel() 
     .WithDegreeOfParallelism(Environment.ProcessorCount)) 
    { 
     await Task.WhenAll(from emailToProcess in batch 
        select ProcessSingleEmail(emailToProcess)); 
     _emailsToProcessRepository.MarkBatchAsProcessed(batch); 
    } 
} 

private async Task ProcessSingleEmail(EmailToProcess emailToProcess) 
{ 
    try 
    { 
     MailMessage mail = GetMail(emailToProcess); //static light method 
     await _smtpClient.SendAsync(sendGridMail); 
     emailToProcess.Processed = true; 
    } 
    catch (Exception e) 
    { 
     _logger.Error(ErrorHelper.GetExceptionMessage(e, 
        string.Format("Error sending Email ID #{0} : ", 
        emailToProcess.Id)), e); 
    } 
} 

(я знаю, что это может выглядеть ужасно: пожалуйста, не стесняйтесь жарить меня ☺)

мне нужно вести себя так: Мне нужно, чтобы обработать количество записей в пакете (btw, я использую библиотеку, которая позволяет мне использовать метод «Batch»), поэтому мне нужно отметить пакет записей, обработанных в базе данных, когда процесс завершает их отправку.

Процесс на самом деле делает то, что я хочу: кроме slow as hell. И как вы можете видеть в PerfMon, процессоры не работают с очень высокой производительностью:

enter image description here

Какой самый лучший способ сделать это? Любой совет?

EDIT: Я понимаю, что у меня есть накладная проблема. Есть ли какой-нибудь инструмент или простой способ их обнаружить и исправить?

+0

Я ожидаю, что ограничивающий фактор является вашей пропускной способностью сети, а не ваш процессора ... распараллелить свои процессоры все, что вам нужно, не даст вам более быстрого сетевого подключения. – abelenky

+0

@abelenky Спасибо за ваш ответ, но я не думаю, что это проблема. Я даже попытался заменить SendAsync на Task.Delay (1500), что примерно соответствует отправке электронной почты, и результат был точно таким же. – Silvestre

+0

Конечно, ваша сеть является ограничивающим фактором здесь, но я думаю, вы можете сделать больше, чем '.WithDegreeOfParallelism (Environment.ProcessorCount)' здесь, потому что вы пытаетесь быть асинхронным (немного экспериментируйте) - кроме этого вам, возможно, придется попробовать out альтернативы структурам Smtpclient ... btw: какой объемной электронной почты это будет? У нас достаточно спама;) – Carsten

ответ

6

То, что вы делаете, не связано с ЦП, а связано с I/O, поэтому ограничение количества параллельных задач на число, если процессоры, вероятно, повлияют на вашу производительность. Попробуйте запустить дополнительные задачи параллельно.

Например, приведенный ниже код будет обрабатывать все электронные письма асинхронно, но ограничиваться 100 электронными письмами параллельно. Он использует метод расширения ForEachAsync для выполнения обработки, этот метод позволяет ограничить степень параллелизма параметром, поэтому я бы попытался поэкспериментировать с увеличением этого параметра.

Возможно, вы также захотите сделать асинхронным метод MarkBatchAsProcessed, поскольку это также может ограничить производительность.

public static class Extensions 
{ 
    public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body) 
    { 
     using (partition) 
      while (partition.MoveNext()) 
       await body(partition.Current); 
    } 

    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
    { 
     return Task.WhenAll(
      from partition in Partitioner.Create(source).GetPartitions(dop) 
      select ExecuteInPartition(partition, body)); 
    } 
} 

public Task MainProcess() 
{ 
    // Process 100 emails at a time 
    return emailsToProcess.ForEachAsync(100, async (m) => 
    { 
     await ProcessSingleEmail(m);     
    }); 

    _emailsToProcessRepository.MarkBatchAsProcessed(emailsToProcess); 
} 

Вы также должны избегать использование void возвращающихся методов асинхронных, они не распространяются исключениями и не можем быть составлены или долгожданными и их использование в основном для обработчиков событий, поэтому я изменил MainProcess вернуть Task.

Update

число 100 в коде выше означает, что в любой момент времени будет не более 100 одновременных задач, так что это больше похоже на скользящем окне, а не партии. Если вы хотите обрабатывать сообщения электронной почты в пакетном режиме, вы можете сделать что-то вроде этого (предполагая, что партии имеют свойство Count:

public async Task MainProcess() 
{ 
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE); 

    foreach (var batch in batches) 
    { 
     return batch.ForEachAsync(batch.Count, async (m) => 
     { 
      await ProcessSingleEmail(m);     
     }); 

     _emailsToProcessRepository.MarkBatchAsProcessed(batch);    
    } 
} 
+1

Спасибо @ не-стоянов за то, что нашли время, чтобы показать мне этот подход !! Всего пару комментариев: 1) Я думаю, что 100 означает количество разделов, а не размер раздела. 2) Где я должен обновить пакет? должен ли я передать его в качестве параметра действия одному и тому же методу расширения и сделать это в конце использования (раздела)? – Silvestre