2

У меня есть 100 записей для Parallelization, от 1 до 100. Теперь я могу удобно использовать Parallel.For для их выполнения в Parallel следующим образом, который будет работать на основе на вычислительных ресурсахParallel.For loop - Назначение уникального объекта данных для каждого потока

Parallel.For(0, limit, i => 
    { 
     DoWork(i); 
    }); 

но существует определенные ограничения, каждый поток должен работать с идентичным объектом данных и существует ограниченное число субъектов данных говорят 10, которые создаются в передовом клонировании друг друга и сохранять их в такой как словарь или список. Теперь я могу ограничить количество распараллеливания, используя следующий код:

Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i => 
    { 
     DoWork(i); 
    }); 

Но вопрос заключается в том, чтобы присвоить уникальный объект данных для каждого входящего потока, таким образом, что объект данных не используются каким-либо другой текущим потоком исполнения , так как число потоков и объект данных одинаковы, поэтому голодание не является проблемой. Я могу представить себе путь, в котором я создаю логическое значение для каждого объекта данных, указывая, используется ли оно или нет, и поэтому мы перебираем через словарь или список, чтобы найти следующий доступный объект данных и заблокировать общий процесс присваивания, одному потоку назначается объект данных в данный момент времени, но, на мой взгляд, эта проблема будет иметь гораздо более элегантное решение, моя версия - всего лишь обходной путь, а не исправление. Моя логика:

Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i => 
     { 
      lock(All_Threads_Common_Object) 
      { 
       Check for available data entity using boolean 
       Assign the Data entity 
      } 
      DoWork(i); 

      Reset the Boolean value for another thread to use it 
     }); 

Пожалуйста, дайте мне знать, если вопрос нуждается в дальнейшем уточнении

+2

Образец, в котором вы нуждаетесь, изменяется производитель-потребитель. Продюсер добавляет 10 сущностей в начале, клиенты пытаются получить entites. Когда 10 объектов получены, остальные потоки ожидают, что производитель поместит новые элементы. Когда потребительское задание заканчивается, он возвращает объект обратно в стек производителя. –

+0

@pwas - это объединение объектов, а не производитель. В любом случае Parallel.For позволяет вам указать функцию инициализации потока для передачи уникального объекта в каждый поток без блокировки или объединения. –

ответ

4

Вы можете использовать concurrent collection для хранения ваших 10 объектов. Каждый Рабочий вытащит один объект данных, использует его и возвращает его. Использование параллельного сбора важно, потому что в вашем сценарии нормальный не является потокобезопасным.

Как так:

var queue = new ConcurrentQueue<DataEntity>(); 
// fill the queue with 10 items 

Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i => 
    { 
     DataEntity x; 
     if(!queue.TryDequeue(out x)) 
      throw new InvalidOperationException(); 
     DoWork(i, x); 
     queue.Enqueue(x); 
    }); 

Или, если блокирование необходимо предусмотреть, завернуть вещь в BlockingCollection.

Редактировать: Do не оберните его в петлю, чтобы продолжать ждать. Скорее, используйте BlockingCollection следующим образом:

var entities = new BlockingCollection(new ConcurrentQueue<DataEntity>()); 

// fill the collection with 10 items 

Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i => 
    { 
     DataEntity x = entities.Take(); 
     DoWork(i, x); 
     entities.Add(x); 
    }); 
+0

Простой одновременной коллекции недостаточно IMHO. Да, это поточно-безопасный, но он не заставляет поток ждать новых элементов внутри коллекции. –

+0

по определению проблемы будет создано 10 работников и 10 объектов данных. Работнику не удастся найти пустую очередь. –

+0

Правильно, как было предложено @pwas thread safety, это не моя главная проблема, но создание потока для следующего доступного объекта данных является основной проблемой. –

5

Используйте перегрузку Parallel.For которая принимает локальную функцию инициализации потока.

Parallel.For<DataEntity>(0, limit, 
    //will run once for each thread 
    () => GetThreadLocalDataEntity(), 

    //main loop body, will run once per iteration 
    (i, loop, threadDataEntity) => 
    { 
     DoWork(i, threadDataEntity); 
     return threadDataEntity; //we must return it here to adhere to the Func signature. 
    }, 

    //will run once for each thread after the loop 
    (threadDataEntity) => threadDataEntity.Dispose() //if necessary 
); 

Основное преимущество этого метода по сравнению с одним вы публикуемыми в этом вопросе, является то, что назначение DataEntity происходит один раз в поток, а не один раз в итерации цикла.

+0

Для моего сценария, когда объект данных должен быть переназначен, мне все равно нужно заблокировать код в GetThreadLocalDataEntity() и проверить, какой объект данных доступен для использования на основе логического, пожалуйста, исправьте меня, если я неправильно понял. Также, если я установил MaxDegreeOfParallelism = 10, то не вызвал бы метод GetThreadLocalDataEntity() для 11-го потока до тех пор, пока точка одного из потоков не завершит цикл –

+1

@MrinalKamboj 'GetThreadLocalDataEntity()' все еще нужно блокировать.Вам не нужно использовать логический флаг, вы можете, например, поместить все ваши доступные объекты DataEntity в очередь и удалить один объект для каждого потока. Что касается вашего второго вопроса, если вы установите 'MaxDegreeOfParallelism = 10', то не будет 11-го потока. – Rotem

+0

На самом деле мое требование - ничего не возвращать, так что есть опция без Func, поскольку объект данных использует цикл и его выполнение, он не имеет ничего общего с возвратом –

Смежные вопросы