2013-12-03 1 views
2

Представьте длинный список данных для обработки. Обработка связана с процессором и может выполняться параллельно.Использование потоков-локальных ресурсов с параллельными для циклов

Для обработки элемента данных требуется большой объект (~ 50 МБ) для хранения промежуточных результатов обработки. Этот объект может быть повторно использован во время обработки последующей задачи.

Я хочу сделать что-то вроде этого:

Processor[] processors = GetProcessors(Environment.ProcessorCount); 

Parallel.For(
    0, 
    itemCount, 
    new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, 
    item => 
    { 
     int threadIndex = /* TODO */; 
     processors[threadIndex].Process(item); 
    } 
); 

Цель состоит в том, чтобы только когда-либо Environment.ProcessorCount экземпляры моего большого объекта, и использовать их как можно более эффективно.

Как это сделать?

ответ

2

Вам нужно всего лишь use the overload of Parallel.For, который выполняет две функции для настройки и разрыва локального объекта потока.

Parallel.For(
    0, 
    itemCount, 
    () => new Processor(), 
    new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, 
    (item, loopState, processor) => 
    { 
     processor.Process(item); 

     // return the processor to be used for another invocation 
     return processor; 
    } 
    processor => 
    { 
     //Do any tear down work you need to do, like dispose the object if it is disposeable 
     processor.Dispose(); 
    } 
); 

Поскольку функция Parallel класс не сразу перейти к используя ParallelOptions.MaxDegreeOfParallelism темы (они начинаются в одном потоке, то разгоняться до максимума вы определили) будет создать только один экземпляр Processor, если только одна нить будет создаваться и до ParallelOptions.MaxDegreeOfParallelism объектов, созданных одновременно.

Я не знаю детали реализации планировщика по умолчанию, но он может или не останавливать потоки, а затем создавать новые, вызывая создание нового объекта Processor. Однако, если это произойдет (это может быть не так, я не знаю), вы все равно будете иметь только одно из ParallelOptions.MaxDegreeOfParallelism объектов, существующих одновременно.

+0

Выглядит интересно. К сожалению, в моем конкретном случае у меня есть внешний цикл с этим параллельным циклом внутри. Мне нужно повторно использовать объекты во всех итерациях внешнего цикла. Я попытался использовать перегрузку, которую вы показываете, чтобы назначать целые индексы для потоков, но это не сработало. Но ваш ответ, вероятно, является идеальным сценарием для тех, кому не нужно повторно использовать ресурсы так, как я. Я мог бы объединить две петли в одну, но это сделает код нечитаемым. Внешний цикл работает порядка 10 раз, а внутренний - 1000 раз. –

1

Вот подход, который работает. Я придумал этот ответ, когда писал вопрос, но думал, что вопрос достаточно интересен, чтобы публиковать его в любом случае. Если у кого-то есть лучшее решение, я бы хотел его изучить.

Используйте параллельную коллекцию (например, ConcurrentQueue<Processor>) для размещения экземпляров Processor между потоками.

Processor[] processors = GetProcessors(Environment.ProcessorCount); 
var queue = new ConcurrentQueue<Processor>(processors); 

Parallel.For(
    0, 
    itemCount, 
    new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, 
    item => 
    { 
     // Obtain the processor 
     Processor processor; 
     queue.TryDequeue(out processor); 

     processor.Process(item); 

     // Store the processor again for another invocation 
     queue.Enqueue(processor); 
    } 
); 

Фактическая реализация должна утверждать, что TryDequeue возвращает истинное, а также епдиеие процессоры снова в случае исключения.

До тех пор, пока время обработки намного больше, чем время, проведенное в конфликте очереди, накладные расходы должны быть минимальными.

+0

Yeh interesting. Это, по сути, пул ресурсов, реализованный как циклический буфер/очередь, где мы инициализируем пул с N ресурсами, где N является MaxDegreeOfParallelism, и поэтому TryDequeue в принципе всегда будет успешным. Я просто задаюсь вопросом, есть ли сценарии, в которых он потерпит неудачу, и поэтому потребуется повторить попытку, я не могу придумать, почему это произойдет. Существует также стоимость синхронизации связи с пулом, но я думаю, что я мог бы жить с этим, а не воссоздавать ресурсы для каждого вызова Parallel.For. – redcalx

Смежные вопросы