0

Я выполняю приведенный ниже код и его рабочий режим, но не размножаюсь на другой процесс, а иногда все работает в одном процессе, а иногда и в двух процессах. И я использую 4-процессорную машину. Что не так с этим кодом?Python Multiprocessing - число процессов

def f(values): 
    print(multiprocessing.current_process()) 
    return values 

def main(): 
    p = Pool(4) #number of processes = number of CPUs 
    keys, values= zip(*data.items()) #ordered keys and values 
    processed_values= p.map(f, values) 
    result= dict(zip(keys, processed_values)) 
    p.close() # no more tasks 
    p.join() # wrap up current tasks 

И результат

<SpawnProcess(SpawnPoolWorker-1, started daemon)> 
<SpawnProcess(SpawnPoolWorker-1, started daemon)> 
<SpawnProcess(SpawnPoolWorker-1, started daemon)> 
<SpawnProcess(SpawnPoolWorker-1, started daemon)> 

А иногда, как это,

<SpawnProcess(SpawnPoolWorker-3, started daemon)> 
<SpawnProcess(SpawnPoolWorker-2, started daemon)> 
<SpawnProcess(SpawnPoolWorker-1, started daemon)> 
<SpawnProcess(SpawnPoolWorker-3, started daemon)> 

Иногда

<SpawnProcess(SpawnPoolWorker-1, started daemon)> 
<SpawnProcess(SpawnPoolWorker-4, started daemon)> 
<SpawnProcess(SpawnPoolWorker-2, started daemon)> 
<SpawnProcess(SpawnPoolWorker-1, started daemon)> 

И мой вопрос, на каком основании она присваивает функции для рабочих? Я пишу код таким образом, чтобы он определял количество процессов на основе количества ключей в моем словаре (учитывая, что мои данные всегда будут иметь меньше ключей, чем мои процессоры). Мой код начнется следующим образом: - Основной код считывает файл и выводит из него словарь с использованием одного процесса и должен разветвлять его на число параллельных процессов и ждать их обработки данных (для этого я использую pool.map), затем как только он получает результат дочерних процессов, он начинает их обрабатывать. Как я могу достичь этого родительского ожидания для этапа дочернего процесса?

ответ

2

В коде нет ничего плохого. Ваш рабочий элемент выполняется очень быстро - так быстро, что один и тот же рабочий процесс может запустить функцию, вернуть результат, а затем выиграть гонку, чтобы использовать следующую задачу из внутренней очереди, которую использует multiprocessing.Pool для распространения работы. Когда вы звоните map, рабочие элементы разбиваются на партии и помещаются в Queue. Вот часть реализации pool.map, что Куски до предметов в итерации и помещает их в очереди:

task_batches = Pool._get_tasks(func, iterable, chunksize) 
    result = MapResult(self._cache, chunksize, len(iterable), callback) 
    self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 
          for i, x in enumerate(task_batches)), None)) 

Каждый рабочий процесс запускает функцию, которая имеет бесконечное время цикл, который потребляет элементы из этой очереди *:

while maxtasks is None or (maxtasks and completed < maxtasks): 
    try: 
     task = get() # Pulls an item off the taskqueue 
    except (EOFError, IOError): 
     debug('worker got EOFError or IOError -- exiting') 
     break 

    if task is None: 
     debug('worker got sentinel -- exiting') 
     break 

    job, i, func, args, kwds = task 
    try: 
     result = (True, func(*args, **kwds)) # Runs the function you passed to map 
    except Exception, e: 
     result = (False, e) 
    try: 
     put((job, i, result)) # Sends the result back to the parent 
    except Exception as e: 
     wrapped = MaybeEncodingError(e, result[1]) 
     debug("Possible encoding error while sending result: %s" % (
      wrapped)) 

Скорее всего, тот же самый работник, по возможности, мог потреблять предмет, запустил func и затем использовал следующий предмет. Это - несколько странно - я не могу воспроизвести его на своей машине с тем же кодом, что и ваш пример, но при этом один и тот же рабочий захватит два из четырех элементов из очереди, это довольно нормально.

Вы всегда должны видеть равномерное распределение, если вы сделаете вашу функцию работника занять больше времени, вставив вызов time.sleep:

def f(values): 
    print(multiprocessing.current_process()) 
    time.sleep(1) 
    return values 

* Это на самом деле не совсем верно - есть нить, которая проходит в основном процесс, который потребляет от taskqueue, а затем прилипает, что она вытаскивает в другую Queue и это то, что дочерние процессы потребляют от)

+0

Круто, это только для тестирования, мой расчет я n функция будет большой для большого набора данных, который я буду использовать позже. И для последней части моего вопроса, как это реализовать? - Мой код начнется следующим образом: - Основной код считывает файл и выводит из него словарь с использованием одного процесса и должен разветвлять его на число параллельных процессов и ждать, пока они обработают данные (для этого я использую pool.map) , то как только он получит результат дочерних процессов, он начнет их обрабатывать.Как я могу достичь этого родительского ожидания для этапа дочернего процесса? –

+1

@Jeeva 'pool.map' не будет возвращаться до тех пор, пока все элементы в итерируемом вами переходе (' data', в вашем примере) не будут обработаны дочерними процессами. Список 'обработанных_значений' в вашем примере будет содержать результат каждого выполнения' f'. Поэтому вам не нужно ничего делать, чем вы уже делаете. – dano

+0

Проблема с этим - task_batches = Pool._get_tasks (func, iterable, chunksize), я хочу, чтобы моя функция была применена к каждому ключу в моем словаре. Это не определенная фраза. Так что в основном словарь n_process = n_keys. Такая же функция для n_key раз параллельно –

Смежные вопросы