2015-07-02 3 views
2

Я пытаюсь запустить это как процесс из многопроцессорной обработки, но при запуске нитки pickler падает, и я не могу решить, что его останавливает от травления. Я пробовал комментировать код сокета и код obj сообщения, но все еще не работает - что я делаю неправильно?Зачем это рассол?

class TransmitThread(Process): 

    def __init__(self, send_queue, reply_queue, control_pipe, recv_timeout=2, buffer_size=4096): 
     """ 
      This init function is called when the thread is created. Function simply calls the Process class init 
      function, and stores the class vars. 
     """ 
     # Call process class init 
     Process.__init__(self) 

     # Store class vars 
     self.send_queue  = send_queue 
     self.reply_queue = reply_queue 
     self.control_pipe = control_pipe 
     self._recv_timeout = recv_timeout 
     self._buffer_size = buffer_size 

    def run(self): 
     """ 
      This is the main function that is called when the thread is started. 
      The function loops forever, waiting for a send message in the queue, and processes the message to send 
      and fetches the response. The thread loops forever until it's terminated or the KILL THREAD command is 
      passed through the control pipe. 
     """ 
     # Start our forever running loop 
     while True: 

      # Check if there is anything in the pipe 
      if self.control_pipe.poll(): 
       # Check if we received the kill thread command 
       if self.control_pipe.recv() == KILL_THREAD_COMMAND: 
        # Kill the while loop and end the thread 
        break 

      # Check if there is anything in message queue 
      if not self.send_queue.empty(): 
       # Fetch message from the queue to send, and unpickle 
       message_obj, message_pickle = self.send_queue.get() 

       # Open socket and set timeout 
       sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
       sock.settimeout(self._recv_timeout) 

       # Connect socket to the recipient 
       sock.connect(message_obj.recipient_address) 

       # Push the pickled message down the socket 
       sock.sendall(message_pickle) 

       # Check if the message we send is a request (should get a response) 
       if str(message_obj.message_type) == str(Message.REQUEST): 

        print "fetching reply" 

        # Lets fetch the response, and push the pickled message onto the queue 
        self.reply_queue.put(sock.recv(self._buffer_size)) 

        print "got a reply" 

       # All done, close the socket 
       sock.close() 

      # Add small delay to stop this thread consuming too much CPU time 
      sleep(0.1) 

Сообщение об ошибке:

File "C:/Users/oliver/OneDrive/GIT/pyke/pyke.py", line 127, in __init__ 
    self.thread.start() 
    File "C:\Python27\lib\multiprocessing\process.py", line 130, in start 
    self._popen = Popen(self) 
    File "C:\Python27\lib\multiprocessing\forking.py", line 277, in __init__ 
    dump(process_obj, to_child, HIGHEST_PROTOCOL) 
    File "C:\Python27\lib\multiprocessing\forking.py", line 199, in dump 
    ForkingPickler(file, protocol).dump(obj) 
    File "C:\Python27\lib\pickle.py", line 224, in dump 
    self.save(obj) 
    File "C:\Python27\lib\pickle.py", line 331, in save 
    self.save_reduce(obj=obj, *rv) 
    File "C:\Python27\lib\pickle.py", line 419, in save_reduce 
    save(state) 
    File "C:\Python27\lib\pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "C:\Python27\lib\pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 
    File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems 
    save(v) 
    File "C:\Python27\lib\pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "C:\Python27\lib\pickle.py", line 725, in save_inst 
    save(stuff) 
    File "C:\Python27\lib\pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "C:\Python27\lib\pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 
    File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems 
    save(v) 
    File "C:\Python27\lib\pickle.py", line 331, in save 
    self.save_reduce(obj=obj, *rv) 
    File "C:\Python27\lib\pickle.py", line 419, in save_reduce 
    save(state) 
    File "C:\Python27\lib\pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "C:\Python27\lib\pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 
    File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems 
    save(v) 
    File "C:\Python27\lib\pickle.py", line 331, in save 
    self.save_reduce(obj=obj, *rv) 
    File "C:\Python27\lib\pickle.py", line 396, in save_reduce 
    save(cls) 
    File "C:\Python27\lib\pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "C:\Python27\lib\pickle.py", line 748, in save_global 
    (obj, module, name)) 
pickle.PicklingError: Can't pickle <type 'thread.lock'>: it's not found as thread.lock 
Traceback (most recent call last): 
    File "<string>", line 1, in <module> 
    File "C:\Python27\lib\multiprocessing\forking.py", line 381, in main 
    self = load(from_parent) 
    File "C:\Python27\lib\pickle.py", line 1378, in load 
    return Unpickler(file).load() 
    File "C:\Python27\lib\pickle.py", line 858, in load 
    dispatch[key](self) 
    File "C:\Python27\lib\pickle.py", line 880, in load_eof 
    raise EOFError 
EOFError 
+0

Как выглядят сообщения? Что такое сообщение об ошибке от сортировщика? Где происходит сбой? Вы уверены, что у вас уже есть полное сообщение ('recv()' может вернуться коротким)? – dhke

+0

Я добавил сообщения об ошибках из сортировщика. Я пробовал комментировать вызов recv() и все функции сокета, и он по-прежнему не работает. У меня это работалось около часа назад, и я не могу понять, что я изменил, чтобы его обрушить. – Oliver

+1

У моего модуля 'thread' также нет' thread.lock'. Итак, возможно, вы настроили несколько путей по-разному, чтобы изменить то, что знает python как модуль «thread»? Кстати, вы должны указать свою версию python. –

ответ

1

Ошибка Pickler приходит из multiprocessing.Process пытается внутренне сам рассол к подпроцессу. Я уверен, что одна из ваших переменных экземпляра не правильно раскроет дочерний процесс. Какой из них не ясно из вашего вопроса

# Store class vars 
    self.send_queue  = send_queue 
    self.reply_queue = reply_queue 
    self.control_pipe = control_pipe 
    self._recv_timeout = recv_timeout 
    self._buffer_size = buffer_size 

[Edit после комментариев OP]:

Проблема заключалась в том, что send_queue и reply_queue были Queue.Queue s, а не multiprocessing.Queue. При раздвоении дочернего работника Process пытается сериализовать себя и любые переменные экземпляра для ребенка. Однако Queue.Queue s являются локальными объектами, которые не являются сериализуемыми, следовательно, являются ошибкой.

Также связано с вопросом, что multiprocessing.Queuere-uses the exceptions от Queue без реэкспорта. Это is actually documented, хотя и несколько скрыты под беспорядком:

Примечание:multiprocessing использует обычные Queue.Empty и Queue.Full исключения сигнала тайм-аут. Они недоступны в пространстве имен multiprocessing, поэтому вам необходимо импортировать их из Queue.

+0

Вы правы, но это опечатка в публикации для SO больше всего на свете. Я переключаюсь между Thread и Process во время моего тестирования, потому что Thread работает все время, а Process - нет. Я импортирую процесс из многопроцессорной обработки и Thread из потоковой обработки, как и следовало ожидать. Исправление этого не решает проблему :( – Oliver

+0

@Oliver См. Edit – dhke

+0

hurr. Я немного запутался. Очереди ответа и отправки - это объекты Queue(). Я использую их для перемещения данных взад и вперед между процесса и родителя, который породил его. Что я сделал не так? Итак, у родителя есть очередь obj, и процесс имеет это, и вы выталкиваете данные между ними.Это моя связь между процессами - не так ли, как предполагается использовать очередь? – Oliver