1

Я пытаюсь реализовать многопроцессорное приложение, которое может обращаться к общему ресурсу данных. Я использую механизм блокировки, чтобы обеспечить безопасный доступ к общему ресурсу. Однако я ошибаюсь. Удивительно, что если процесс 1 получает блокировку сначала, он обслуживает запрос, и он терпит неудачу при следующем процессе, который пытается получить блокировку. Но если какой-либо другой процесс, отличный от 1, пытается получить блокировку, сначала он не работает в самом первом запуске. Я новичок в Python и использования документации для реализации этого Так я не знаю, если мне не хватает каких-либо основные механизмы безопасности here.Any точки данных, как, почему я являюсь свидетелем, это будет большим подспорьемМеханизм блокировки многопроцессорности Python сбой при обнаружении блокировки

ПРОГРАММА:

#!/usr/bin/python 
from multiprocessing import Process, Manager, Lock 
import os 
import Queue 
import time 
lock = Lock() 
def launch_worker(d,l,index): 
    global lock 
    lock.acquire() 
    d[index] = "new" 
    print "in process"+str(index) 
    print d 
    lock.release() 
    return None 

def dispatcher(): 
    i=1 
    d={} 
    mp = Manager() 
    d = mp.dict() 
    d[1] = "a" 
    d[2] = "b" 
    d[3] = "c" 
    d[4] = "d" 
    d[5] = "e" 
    l = mp.list(range(10)) 
    for i in range(4): 
     p = Process(target=launch_worker, args=(d,l,i)) 
     i = i+1 
     p.start() 
    return None 

if __name__ == '__main__': 
    dispatcher() 

ОШИБКА, когда процесс 1 обслуживается первого

in process0 
{0: 'new', 1: 'a', 2: 'b', 3: 'c', 4: 'd', 5: 'e'} 
Process Process-3: 
Traceback (most recent call last): 
    File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap 
    self.run() 
    File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run 
    self._target(*self._args, **self._kwargs) 
    File "dispatcher.py", line 10, in launch_worker 
    d[index] = "new" 
    File "<string>", line 2, in __setitem__ 
    File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod 
    self._connect() 
    File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect 
    conn = self._Client(self._token.address, authkey=self._authkey) 
    File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client 
    c = SocketClient(address) 
    File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient 
    s.connect(address) 
    File "<string>", line 1, in connect 
error: [Errno 2] No such file or directory 

ERROR, когда процесс 2 обслуживается первым

Process Process-2: 
Traceback (most recent call last): 
    File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap 
    self.run() 
    File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run 
    self._target(*self._args, **self._kwargs) 
    File "dispatcher.py", line 10, in launch_worker 
    d[index] = "new" 
    File "<string>", line 2, in __setitem__ 
    File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod 
    self._connect() 
    File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect 
    conn = self._Client(self._token.address, authkey=self._authkey) 
    File "/usr/lib/python2.6/multiprocessing/connection.py", line 150, in Client 
    deliver_challenge(c, authkey) 
    File "/usr/lib/python2.6/multiprocessing/connection.py", line 373, in deliver_challenge 
    response = connection.recv_bytes(256)  # reject large message 
IOError: [Errno 104] Connection reset by peer 

ответ

1

Запрет, который ваши рабочие изменения изменяют, является общим объектом, управляемым процессом диспетчеризации; изменения этого объекта рабочими требуют, чтобы они общались с процессом диспетчеризации. Ошибки, которые вы видите, связаны с тем, что ваш диспетчер не ждет рабочих процессов после их запуска; он выходит слишком рано, поэтому для них может не существовать связи, когда им это нужно.

Первый рабочий или два, которые пытаются обновить общий файл, могут преуспеть, потому что, когда они изменяют общий файл, процесс, содержащий экземпляр Manager, может все еще существовать (например, он все еще может быть в процессе создания новых рабочих) , Таким образом, в ваших примерах вы видите успешный выход. Но процесс управления вскоре завершается, и следующий рабочий, который пытается выполнить модификацию, потерпит неудачу. (Сообщения об ошибках, которые вы видите, типичны для неудачных попыток связи между процессами, вы также увидите ошибки EOF, если вы запустите свою программу еще несколько раз.)

Что вам нужно сделать, это вызвать метод join на объектах Process в качестве способа ожидания выхода каждого из них. Следующая модификация вашего dispatcher показывает основную идею:

def dispatcher(): 
    mp = Manager() 
    d = mp.dict() 
    d[1] = "a" 
    d[2] = "b" 
    d[3] = "c" 
    d[4] = "d" 
    d[5] = "e" 
    procs = [] 
    for i in range(4): 
     p = Process(target=launch_worker, args=(d,i)) 
     procs.append(p) 
     p.start() 
    for p in procs: 
     p.join() 
+0

Да, это в значительной степени имеет смысл для меня. Родительский процесс завершается. Но я не хочу использовать join, так как это может сделать мою сериализацию программы, поскольку я хочу самостоятельно решать каждый процесс. Возможно, я захочу спать в родительской функции, если не уведомлен ни одним из подпроцессов. – pavan

Смежные вопросы