2016-11-21 3 views
3

Мне нужно выполнить обширные вставки данных в мою базу данных. Я могу реализовать код многопоточным способом с дросселированным планировщиком, который ограничивает количество параллельных операций. На каждом M строках блок формируется и вводится в базу данных как атомная операция. Несколько параллельных операций должны происходить из-за того, что база данных работает медленнее, чем чтение и анализ файла данных. Я часто реализую эту модель с помощью многопоточности.Ограничение количества одновременных System.Threading.Tasks.Task

Если вместо этого я решил реализовать свой код, используя Await/асинхронный (Entity Framework поддерживает асинхронное программирование), как я могу убедиться, что не более чем N параллельных задач выполнения (т.е. перейти в базу данных), в то же время?

В моем первоначальном дизайне я создал экземпляр List<Task>, добавив новые задачи, как только я прочитал блок данных, который нужно вставить атомарно, а затем позвольте моему методу вернуться после await всей задачи. Проблема времени разработки заключается в том, что количество параллельных Task s (и, следовательно, объем памяти) будет взорваться, потому что задачи будут загружаться быстрее, чем они завершаются для больших файлов данных.

Я думал об использовании SemaphoreSlim, но у меня мало опыта с асинхронным программированием (в отличие от многопоточного). Поэтому я задаю этот вопрос, чтобы получить отзывы о лучших практиках, если они есть.

+0

Я определенно думаю, что 'SemaphoreSlim' это путь в этом случае , если вы хотите запустить ограниченное количество нескольких потоков. – Gertsen

+0

Поскольку ваша работа связана с IO, у вас нет необходимости в нескольких потоках. Вы можете выполнять несколько параллельных запросов БД без каких-либо дополнительных потоков. – Servy

+1

[Здесь есть ограниченный планировщик задач параллелизма] (https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler%28v=vs.110%29.aspx?f=255&MSPPError=- 2147217396) (прокрутите немного вниз для реализации). –

ответ

1

вопрос время разработки является то, что число параллельных задач (и, таким образом память след) собирается взорвать, потому что задачи подаются быстрее, чем они завершаются для больших файлов данных. Я думал об использовании SemaphoreSlim

Да, SemaphoreSlim является подходящим выбором для дросселирования параллельных асинхронных операций:

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(10); 

async Task ThrottledWorkAsync() 
{ 
    await _semaphore.WaitAsync(); 
    try 
    { 
    await WorkAsync(); 
    } 
    finally 
    { 
    _semaphore.Release(); 
    } 
} 

Однако ...

Если вместо этого я решил осуществить свою код с использованием await/async (Entity Framework поддерживает асинхронное программирование), как я могу убедиться, что одновременно выполняются не более N одновременных задач (т.е. перейдите в базу данных)?

Одна вещь, о которой следует знать, заключается в том, что Entity Framework - при поддержке асинхронных API - по-прежнему требует одного соединения для каждого запроса. Таким образом, вы не можете иметь несколько одновременных асинхронных запросов с тем же DbContext; вам необходимо создать отдельное соединение для каждого параллельного запроса (или, по крайней мере, N соединений, которые «заимствованы» параллельными запросами).

+0

Возможно, возникла проблема с вашим кодом. Я решил подождать до того, как вы разложили задачу, чтобы не потреблять «StreamReader», если работники все еще идут (см. Ответ Денниса) –

+0

@ usr-local-ΕΨΗΕΩΩΝ: Я понятия не имею, о чем «StreamReader» вы говорите. Мой код не запускает 'WorkAsync' до тех пор, пока не будет сделан слот семафора. –

+0

Я имел в виду источник данных, который передает приложение, которое в моем случае является «StreamReader», о котором я не упоминал в вопросе. Ну, да, ваш код «WorkAsync» действительно не начинается до тех пор, пока слот не будет бесплатным, но IMO, если вы обернете это «async Task ThrottledWorkAsync» в цикле for, который вставляет ваш код async 'wait-perform-release', тогда есть риск того, что TaskExecutor забивается задачами, ожидающими слот. В этом случае одновременно будет выполняться только 10 задач, но будет выделено множество задач (с их памятью footiprint). Мне нужно изменить свой вопрос (завтра) с небольшим кодом –

0

Я использую этот кусок кода, чтобы выполнить мои темы:

public static async Task WhenAll(this List<Func<Task>> actions, int threadCount) 
{ 
    var _countdownEvent = new CountdownEvent(actions.Count); 
    var _throttler = new SemaphoreSlim(threadCount); 

    foreach (Func<Task> action in actions) 
    { 
     await _throttler.WaitAsync(); 

     Task.Run(async() => 
     { 
      try 
      { 
       await action(); 
      } 
      finally 
      { 
       _throttler.Release(); 
       _countdownEvent.Signal(); 
      } 
     }); 
    } 

    _countdownEvent.Wait(); 
} 

Этот код основан на коде, представленной в этой теме: How to limit the amount of concurrent async I/O operations?

Calling это должно выглядеть примерно так. Она будет выполнять все задачи, но только 40 (в данном случае) одновременно:

var tasks = new List<Func<Task>>(); 
tasks.Add(() => saveAsync()); 
//add more 
await tasks.WhenAll(40); 
+3

Здесь нет необходимости использовать 'Task.Run'. У вас уже есть асинхронная операция, вы не должны запускать ее в другом потоке. Кроме того, вы синхронно дожидаетесь их завершения, чего вы абсолютно не должны делать, учитывая, что это асинхронная операция. Вы должны асинхронно ждать, пока все они закончатся. – Servy

1

Если у вас есть по крайней мере n значения для вставки первоначально (n быть максимальное количество одновременных задач), вы можете принять следующий подход:

  1. Invoke InsertAsync()n раз с разными значениями.
  2. Когда каждая из задач завершена, продолжайте новый вызов до InsertAsync() (повторите 2).

Таким образом, вам не нужно будет контролировать уровень параллелизма с помощью семафора и будет неблокировать.

I've just published a package, которые могут быть полезны для этого сценария, он предоставляет 2 метода Times() и Map(): https://github.com/jorgebay/concurrent-utils

Например:

// Execute MyMethodAsync() 1,000,000 times limiting the maximum amount 
// of parallel async operations to 512 
await ConcurrentUtils.Times(1000000, 512, (index) => MyMethodAsync(index)); 
+0

Работает ли «Карта» с неограниченным размером «IEnumerable» неизвестного размера? Мне не нужно хранить все записи в памяти перед вставкой, они могут быть более 10M –

+0

Мне по-прежнему нравится ваш подход, но из чтения вашего ответа я беспокоюсь о неблокирующей части.Смотрите, мне нужно заблокировать чтение ввода 'Stream', потому что, если я приду, чтобы прочитать 10M или более записей в памяти, тогда« OutOfMemoryException »неизбежно. Умным решением может быть использование «MemoryFailPoint», который я оцениваю –

+0

Если вы хотите работать с неопределенным количеством данных (время от времени извлекаемым в фоновом режиме), вы можете использовать очередь заданий 'ConcurrentUtils.CreateQueue () '. – jorgebg

Смежные вопросы