Я изменил пример на очереди Объединения по этой ссылке https://pymotw.com/2/multiprocessing/communication.html, чтобы запустить функцию, которую я написал вместо объекта Task. Измененный код указан ниже. Проблема, которую я получаю, заключается в том, что потребители становятся отравленными, не помещая None в очередь задач. Они выходят перед выполнением задач. Поэтому я удалил проверку на None (как показано ниже) от функции запуска и я поймал это исключение:queue.get возвращает объект NoneType?
объекта «NoneType» не отозван
Я уверен, что никто не еще с тех пор сообщений не прошел «Отравляющие потребители» еще не напечатаны
import multiprocessing as mp
import MyLib
# Subclass of Process
class Consumer(mp.Process):
def __init__(self, task_queue, result_queue):
mp.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.daemon = True
# A method that defines the behavior of the process
def run(self):
proc_name = self.name
while True:
try:
next_task = self.task_queue.get()
# if next_task is None:
# # Poison pill means shutdown
# print('%s: Exiting' % proc_name)
# self.task_queue.task_done()
# break
mxR, disC = next_task()
self.task_queue.task_done()
self.result_queue.put((mxR, disC))
except Exception as e:
print(e)
return
if __name__ == '__main__':
# Establish communication queues
tasks = mp.JoinableQueue()
results = mp.Queue()
# Start consumers
num_consumers = mp.cpu_count() * 2
print('Creating %d consumers' % num_consumers)
consumers = [ Consumer(tasks, results)
for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs
trials = 10
Tx_Range = 50
prnts = 4
for tx in list(range(30, 200, 20)):
file_name = 'output_{}_{}.txt'.format(tx,prnts)
output_file = open(file_name,'a')
output_file.write('Nodes\tTx_Range\tAvg_Rings\tAvg_Disc\n')
for n in list(range(50, 101, 50)):
ring_sum, disc_sum = 0, 0
for i in range (0, trials):
tasks.put(MyLib.GBMR_mp(1000, 1000, n, prnts, tx, False, results))
print('Done putting jobs')
for i in range (0, trials):
mxR, discN = results.get()
ring_sum += mxR
disc_sum += discN
avg_ring = ring_sum/trials
avg_disc = disc_sum/trials
print('Done Collecting Results, avg_disc = ', avg_disc,' and avg_rings = ', avg_ring)
s = '{}\t\t{}\t\t{}\t\t{}\n'.format(n,tx,avg_ring,avg_disc)
print('Nodes', n, 'is Done for Tx_range', tx)
output_file.write(s)
output_file.close()
# Add a poison pill for each consumer
print('Poisoning Consumers')
for i in range(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
Что может быть причиной этой проблемы? Может ли быть queue.get() возвращать None?
Заранее спасибо
Вы удалили чек на «Нет», но не изменили остальную часть кода, чтобы работать с «Нет», теперь это значение, с которым ему придется иметь дело. – kindall
Пока ничего не ставится в очередь. Кроме того, не следует ли блокировать функцию до тех пор, пока она не прочитает задание? – osmak
Ваша строка 'tasks.put' не помещает задачу/функцию в очередь. Он помещает результат вызова функции, который, как я предполагаю, возвращает «Нет». – bj0