2016-05-01 3 views
0

Я изменил пример на очереди Объединения по этой ссылке https://pymotw.com/2/multiprocessing/communication.html, чтобы запустить функцию, которую я написал вместо объекта Task. Измененный код указан ниже. Проблема, которую я получаю, заключается в том, что потребители становятся отравленными, не помещая None в очередь задач. Они выходят перед выполнением задач. Поэтому я удалил проверку на None (как показано ниже) от функции запуска и я поймал это исключение:queue.get возвращает объект NoneType?

объекта «NoneType» не отозван

Я уверен, что никто не еще с тех пор сообщений не прошел «Отравляющие потребители» еще не напечатаны

import multiprocessing as mp 
import MyLib 

# Subclass of Process 
class Consumer(mp.Process): 

    def __init__(self, task_queue, result_queue): 
     mp.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 
     self.daemon = True 

    # A method that defines the behavior of the process 
    def run(self): 
     proc_name = self.name 
     while True: 
      try: 
       next_task = self.task_queue.get() 
       # if next_task is None: 
       #  # Poison pill means shutdown 
       #  print('%s: Exiting' % proc_name) 
       #  self.task_queue.task_done() 
       #  break 

       mxR, disC = next_task() 
       self.task_queue.task_done() 
       self.result_queue.put((mxR, disC)) 
      except Exception as e: 
       print(e) 

     return 


if __name__ == '__main__': 
    # Establish communication queues 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 

    # Start consumers 
    num_consumers = mp.cpu_count() * 2 
    print('Creating %d consumers' % num_consumers) 
    consumers = [ Consumer(tasks, results) 
        for i in range(num_consumers) ] 
    for w in consumers: 
     w.start() 

    # Enqueue jobs 

    trials = 10 
    Tx_Range = 50 
    prnts = 4 
    for tx in list(range(30, 200, 20)): 
     file_name = 'output_{}_{}.txt'.format(tx,prnts) 
     output_file = open(file_name,'a') 
     output_file.write('Nodes\tTx_Range\tAvg_Rings\tAvg_Disc\n') 
     for n in list(range(50, 101, 50)): 
      ring_sum, disc_sum = 0, 0 
      for i in range (0, trials): 
       tasks.put(MyLib.GBMR_mp(1000, 1000, n, prnts, tx, False, results)) 

      print('Done putting jobs') 
      for i in range (0, trials): 
       mxR, discN = results.get() 
       ring_sum += mxR 
       disc_sum += discN 
      avg_ring = ring_sum/trials 
      avg_disc = disc_sum/trials 
      print('Done Collecting Results, avg_disc = ', avg_disc,' and avg_rings = ', avg_ring) 
      s = '{}\t\t{}\t\t{}\t\t{}\n'.format(n,tx,avg_ring,avg_disc) 
      print('Nodes', n, 'is Done for Tx_range', tx) 
      output_file.write(s) 
     output_file.close() 



    # Add a poison pill for each consumer 
    print('Poisoning Consumers') 
    for i in range(num_consumers): 
     tasks.put(None) 

    # Wait for all of the tasks to finish 
    tasks.join() 

Что может быть причиной этой проблемы? Может ли быть queue.get() возвращать None?

Заранее спасибо

+0

Вы удалили чек на «Нет», но не изменили остальную часть кода, чтобы работать с «Нет», теперь это значение, с которым ему придется иметь дело. – kindall

+0

Пока ничего не ставится в очередь. Кроме того, не следует ли блокировать функцию до тех пор, пока она не прочитает задание? – osmak

+1

Ваша строка 'tasks.put' не помещает задачу/функцию в очередь. Он помещает результат вызова функции, который, как я предполагаю, возвращает «Нет». – bj0

ответ

0

Поскольку я не получил каких-либо предложений, я пытался решить эту проблему, и я пришел с решением (плохой я считаю), где я просто просто забыли о потребителях и начали каждый функция запускается в процессе. Я ограничил количество одновременных процессов, проверив количество запущенных, как показано ниже. Но я уверен, что я делаю что-то не так, потому что производительность этого решения намного хуже, чем не использование многопроцессов. При многопроцессорной обработке внутренний цикл for на «n» занимает около 2 минут, но без многопроцессорности требуется несколько секунд. Я все еще ноул, может ли кто-нибудь указать мне в правильном направлении? вот код:

import multiprocessing as mp 
import MyLib 


if __name__ == '__main__': 

    results = mp.Queue() 


    num_consumers = mp.cpu_count() 


    trials = 500 
    prnts = 4 
    num_of_proc = 0 
    consumers = [] 
    joined = 0 
    for tx in list(range(30, 200, 20)): 
     file_name = 'Centered_BS_output_{}_{}.txt'.format(tx,prnts) 
     output_file = open(file_name,'a') 
     output_file.write('Nodes\tTx_Range\tAvg_Rings\tAvg_Disc\n') 
     for n in list(range(30, 1030, 30)): 
      consumers.clear() 
      ring_sum, disc_sum, joined, i, num_of_proc = 0, 0, 0, 0, 0 
      #for i in range (0, trials): 
      while i < trials: 
       if num_of_proc < num_consumers: 
        consumers.append(mp.Process(target=MyLib.GBMR_mp, args=(1000, 1000, n, prnts, tx, False, results))) 
        consumers[i].daemon = True 
        consumers[i].start() 
        num_of_proc += 1 
        i += 1 

       else: 

        consumers[joined].join() 
        num_of_proc -= 1 
        joined += 1 

      print('Done putting jobs') 
      for i in range (0, trials): 
       mxR, discN = results.get() 
       ring_sum += mxR 
       disc_sum += discN 
      avg_ring = ring_sum/trials 
      avg_disc = disc_sum/trials 
      print('Done Collecting Results, avg_disc = ', avg_disc,' and avg_rings = ', avg_ring) 
      s = '{}\t\t{}\t\t{}\t\t{}\n'.format(n,tx,avg_ring,avg_disc) 
      print('Nodes', n, 'is Done for Tx_range', tx) 
      output_file.write(s) 
     output_file.close() 
Смежные вопросы