2014-11-16 2 views
0

Я использую модель процессов «keep alive» (которая общается с Pipes с основным процессом) в моем программном обеспечении, и я пытаюсь обмениваться объектами только для чтения между ними и основным процессом. Пример, чтобы показать мою проблему:Доля объектов только для чтения между процессами

from multiprocessing import Process, Pipe#, Manager 
from multiprocessing.connection import wait 
import os 


def start_in_oneshot_processes(obj, nb_process): 
    """ 
    Start nb_process processes to do the job. Then process finish the job they die. 
    """ 
    processes = [] 
    for i in range(nb_process): 
     # Simple process style 
     p = Process(target=oneshot_in_process, args=(obj,)) 
     p.start() 
     processes.append(p) 

    for process in processes: 
     # Wait all process finish 
     process.join() 

def oneshot_in_process(obj): 
    """ 
    Main job (don't matter if in oneshot, keeped alive process. It have job to do) 
    """ 
    print('p', obj, os.getpid()) 


def start_in_keepedalive_processes(obj, nb_process): 
    """ 
    Start nb_process and keep them alive. Send job to them multiple times, then close thems. 
    """ 
    processes = [] 
    readers_pipes = [] 
    writers_pipes = [] 
    for i in range(nb_process): 
     # Start process with Pipes for communicate 
     local_read_pipe, local_write_pipe = Pipe(duplex=False) 
     process_read_pipe, process_write_pipe = Pipe(duplex=False) 
     readers_pipes.append(local_read_pipe) 
     writers_pipes.append(process_write_pipe) 
     p = Process(target=run_keepedalive_process, args=(local_write_pipe, process_read_pipe, obj)) 
     p.start() 
     processes.append(p) 
    # Send to process some job to do 
    for job in range(3): 
     print('send new job to processes:') 
     for process_number in range(nb_process): 
      # Send data to process 
      writers_pipes[process_number].send(obj) 
      reader_useds = [] 
     # Wait response from processes 
     while readers_pipes: 
      for r in wait(readers_pipes): 
       try: 
        r.recv() 
       except EOFError: 
        pass 
       finally: 
        reader_useds.append(r) 
        readers_pipes.remove(r) 
     readers_pipes = reader_useds 

    # Kill processes 
    for writer_pipe in writers_pipes: 
     writer_pipe.send('stop') 

def run_keepedalive_process(main_write_pipe, process_read_pipe, obj): 
    """ 
    Procees who don't finish while job to do 
    """ 
    while obj != 'stop': 
     oneshot_in_process(obj) 
     # Send to main process "I've done my job" 
     main_write_pipe.send('job is done') 
     # Wait for new job to do (this part can be simplified no ?) 
     readers = [process_read_pipe] 
     while readers: 
      for r in wait(readers): 
       try: 
        obj = r.recv() 
       except EOFError: 
        pass 
       finally: 
        readers.remove(r) 


obj = object() 
print('m', obj, os.getpid()) 

print('One shot processes:') 
start_in_oneshot_processes(obj, 5) 

print('Keeped alive processes:') 
start_in_keepedalive_processes(obj, 5) 

print('f', obj, os.getpid()) 

Выход:

➜ sandbox git:(dev/opt) ✗ python3.4 sharedd.py 
m <object object at 0xb7266dc8> 3225 
One shot processes: 
p <object object at 0xb7266dc8> 3227 
p <object object at 0xb7266dc8> 3226 
p <object object at 0xb7266dc8> 3229 
p <object object at 0xb7266dc8> 3228 
p <object object at 0xb7266dc8> 3230 
Keeped alive processes: 
p <object object at 0xb7266dc8> 3231 
p <object object at 0xb7266dc8> 3232 
send new job to processes: 
p <object object at 0xb7266dc8> 3235 
p <object object at 0xb7266dc8> 3233 
p <object object at 0xb7266dc8> 3234 
p <object object at 0xb7266488> 3231 
send new job to processes: 
p <object object at 0xb7266488> 3232 
p <object object at 0xb7266488> 3235 
p <object object at 0xb7266488> 3234 
p <object object at 0xb7266490> 3231 
p <object object at 0xb7266488> 3233 
p <object object at 0xb7266490> 3232 
p <object object at 0xb7266490> 3235 
p <object object at 0xb7266490> 3233 
send new job to processes: 
p <object object at 0xb7266488> 3232 
p <object object at 0xb7266488> 3235 
p <object object at 0xb7266490> 3234 
p <object object at 0xb7266488> 3231 
f <object object at 0xb7266dc8> 3225 
p <object object at 0xb7266488> 3233 
p <object object at 0xb7266488> 3234 

Если я создаю простые процессы (start_in_oneshot_processes), obj имеют один и тот же адрес памяти в подпроцесс и в основном процессе: 0xb7266dc8.

Но когда мой процесс приема объектов с трубой (start_in_keepedalive_processes), объекты адрес памяти не являются таким же, как основным процесс: пример: 0xb7266488 вместо 0xb7266dc8. Эти объекты доступны только для чтения подпроцессами. Как я могу поделиться ими между основным процессом и подпроцессом (и сохранить время копирования памяти)?

ответ

3

Вы не можете достичь того, что хотите, используя Pipe. Когда данные передаются по каналу другому процессу, этот процесс должен хранить копию данных в своем собственном адресном пространстве --- новый объект в Python. Большинство других способов доставки данных между процессами одинаковы.

Это совпадение, что вы видели тот же адрес памяти из функции start_in_oneshot_process, вероятно, из-за вашего выбора операционной системы. В общем, два процесса вообще не будут иметь одинаковую ОЗУ. (Проверьте раздел Process DOCS на Contexts and Start Methods для различий между Windows, Unix и spawnfork.)

Если это действительно важно, что два процесса могут проверять один и тот же блок памяти, вы можете попробовать либо shared memory or an object manager process. Обратите внимание, что те же документы говорят вам, что это редко бывает хорошей идеей.

Смежные вопросы