2017-02-21 5 views
1

Я пытаюсь сделать многопроцессорную обработку ServerApp для работы в Windows. Я предполагаю, что проблема отсутствует. os.fork() функция, поэтому мне придется пройти socket как-то, что не разборчиво (?!).Python3 Windows многопроцессорный проход для обработки

Я видел, что это может быть возможно с помощью reduce_handle и rebuild_handle из multiprocessing.reduction, как показано here, но эти методы не доступны в Python 3 (?!). Хотя у меня есть доступные duplicate и steal_handle доступны. Я не могу найти пример, как их использовать или нужны ли они вообще.

Кроме того, я хотел бы знать, будет ли проблема logging при создании нового процесса?

Вот мой ServerApp пример:

import logging 
import socket 

from select import select 
from threading import Thread 
from multiprocessing import Queue 
from multiprocessing import Process 
from sys import stdout 
from time import sleep 


class ServerApp(object): 

    logger = logging.getLogger(__name__) 
    logger.setLevel(logging.DEBUG) 
    handler = logging.StreamHandler(stdout) 
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 
    handler.setFormatter(formatter) 
    logger.addHandler(handler) 


    def conn_handler(self, connection, address, buffer): 

     self.logger.info("[%d] - Connection from %s:%d", self.id, address[0], address[1]) 

     try: 
      while True: 

       command = None 
       received_data = b'' 
       readable, writable, exceptional = select([connection], [], [], 0) # Check for client commands 

       if readable: 
        # Get Command ... There is more code here 
        command = 'Something' 


       if command == 'Something': 
        connection.sendall(command_response) 
       else: 
        print(':(') 

     except Exception as e: 
      print(e) 
     finally: 
      connection.close() 
      self.client_buffers.remove(buffer) 
      self.logger.info("[%d] - Connection from %s:%d has been closed.", self.id, address[0], address[1]) 


    def join(self): 

     while self.listener.is_alive(): 
      self.listener.join(0.5) 


    def acceptor(self): 

     while True: 
      self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, self.ip, self.port) 

      # Accept a connection on the bound socket and fork a child process to handle it. 
      conn, address = self.socket.accept() 

      # Create Queue which will represent buffer for specific client and add it o list of all client buffers 
      buffer = Queue() 
      self.client_buffers.append(buffer) 

      process = Process(target=self.conn_handler, args=(conn, address, buffer)) 
      process.daemon = True 
      process.start() 
      self.clients.append(process) 

      # Close the connection fd in the parent, since the child process has its own reference. 
      conn.close() 


    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', buffer_size=2048): 

     self.id = id 
     self.port = port 
     self.ip = ip 

     self.socket = None 
     self.listener = None 
     self.buffer_size = buffer_size 

     # Additional attributes here.... 

     self.clients = [] 
     self.client_buffers = [] 


    def run(self): 

     # Create TCP socket, bind port and listen for incoming connections 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.bind((self.ip, self.port)) 
     self.socket.listen(5) 

     self.listener = Thread(target=self.acceptor) # Run acceptor thread to handle new connection 
     self.listener.daemon = True 
     self.listener.start() 
+0

Вы пишете какой-то код, но не видите никаких определений 'protocol'. Нельзя определить какое-либо принимающее правило, если оно уже принято (что такое фильтр?). – dsgdfg

+0

@dsgdfg Не уверен, что я правильно понял, но каждое соединение должно приниматься и обрабатываться отдельным процессом. – sstevan

ответ

3

Чтобы разрешить подключение травление (включая сокеты) для Python3, вы должны использовать mulitprocessing.allow_connection_pickling. Он регистрирует редукторы для сокетов в ForkingPickler. Например:

import socket 
import multiprocessing as mp 
mp.allow_connection_pickling() 


def _test_connection(conn): 
    msg = conn.recv(2) 
    conn.send(msg) 
    conn.close() 
    print("ok") 

if __name__ == '__main__': 
    server, client = socket.socketpair() 

    p = mp.Process(target=_test_connection, args=(server,)) 
    p.start() 

    client.settimeout(5) 

    msg = b'42' 
    client.send(msg) 
    assert client.recv(2) == msg 

    p.join() 
    assert p.exitcode == 0 

    client.close() 
    server.close() 

Я также заметил, что у вас есть какие-то другие вопросы unrealted к засолкам socket.

  • При использовании self.conn_handler в качестве мишени, мультипроцессорная будет пытаться законсервировать весь объект self. Это проблема, поскольку в вашем объекте содержится Thread, который не может быть маринован. Таким образом, вы должны удалить self от закрытия вашей целевой функции. Это можно сделать, используя декоратор @staticmethod и удалив все упоминания о функции self.

  • Кроме того, модуль logging не предназначен для обработки нескольких процессов. В принципе, все журналы с запущенного Process будут потеряны с вашим текущим кодом. Чтобы исправить это, вы можете начать новый logging после запуска второго Process (в начале conn_handler) или использовать утилиту регистрации multiprocessing.

Это может дает что-то вроде этого:

import logging 
import socket 

from select import select 
from threading import Thread 
from multiprocessing import util, get_context 
from sys import stdout 
from time import sleep 

util.log_to_stderr(20) 
ctx = get_context("spawn") 


class ServerApp(object): 

    logger = logging.getLogger(__name__) 
    logger.setLevel(logging.DEBUG) 
    handler = logging.StreamHandler(stdout) 
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 
    handler.setFormatter(formatter) 
    logger.addHandler(handler) 

    def __init__(self, id, port=4545, ip='127.0.0.1', method='tcp', 
       buffer_size=2048): 

     self.id = id 
     self.port = port 
     self.ip = ip 

     self.socket = None 
     self.listener = None 
     self.buffer_size = buffer_size 

     # Additional attributes here.... 

     self.clients = [] 
     self.client_buffers = [] 

    @staticmethod 
    def conn_handler(id, connection, address, buffer): 

     print("test") 
     util.info("[%d] - Connection from %s:%d", id, address[0], address[1]) 

     try: 
      while True: 

       command = None 
       received_data = b'' 
       # Check for client commands 
       readable, writable, exceptional = select([connection], [], [], 
                 0) 

       if readable: 
        # Get Command ... There is more code here 
        command = 'Something' 

       if command == 'Something': 
        connection.sendall(b"Coucouc") 
        break 
       else: 
        print(':(') 
       sleep(.1) 

     except Exception as e: 
      print(e) 
     finally: 
      connection.close() 
      util.info("[%d] - Connection from %s:%d has been closed.", id, 
        address[0], address[1]) 
      print("Close") 

    def join(self): 

     while self.listener.is_alive(): 
      self.listener.join(0.5) 

    def acceptor(self): 

     while True: 
      self.logger.info("[%d] - Waiting for connection on %s:%d", self.id, 
          self.ip, self.port) 

      # Accept a connection on the bound socket and fork a child process 
      # to handle it. 
      conn, address = self.socket.accept() 

      # Create Queue which will represent buffer for specific client and 
      # add it o list of all client buffers 
      buffer = ctx.Queue() 
      self.client_buffers.append(buffer) 

      process = ctx.Process(target=self.conn_handler, 
           args=(self.id, conn, address, buffer)) 
      process.daemon = True 
      process.start() 
      self.clients.append(process) 

      # Close the connection fd in the parent, since the child process 
      # has its own reference. 
      conn.close() 

    def run(self): 

     # Create TCP socket, bind port and listen for incoming connections 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.bind((self.ip, self.port)) 
     self.socket.listen(5) 

     # Run acceptor thread to handle new connection 
     self.listener = Thread(target=self.acceptor) 
     self.listener.daemon = True 
     self.listener.start() 

     self.listener.join() 


def main(): 
    app = ServerApp(0) 
    app.run() 


if __name__ == '__main__': 
    main() 

я только проверил его на Unix и python3.6, но это не должно быть поведение слишком разные, как я использую контекстную икру , which should behave like the Process` в окнах ,

+0

Извините за задержку. Я протестировал 'allow_connection_pickling', но это не имеет никакого эффекта. Тем не менее, я заметил, что я получаю две разные ошибки (дважды повторяя один и тот же код). Вот [первый] (http://pastebin.com/zyS2Sbtm), и вот [второй] (http://pastebin.com/7FQ1nvxN). Когда я не назначаю прослушиватель как свойство ServerApp (просто 'listener' вместо' self.listener'), у меня нет этой ошибки, но процесс обработчика никогда не выполняется. – sstevan

+0

Это не проблема с травлением сокета. Если вы читаете ошибки, это не позволяет рассортировать объекты '_thread.Lock' и некоторые' io'.Я бы сказал, что это связано с травлением всего объекта ServerApp, необходимого, когда вы используете метод экземпляра, чтобы начать новый «процесс». Вы должны либо использовать декоратор '@ staticmethod' для' conn_handler', либо удалить его из класса. Это также хорошая практика, так как небезопасно рассортировать весь объект (например, если он обрабатывает некоторый пароль). Кроме того, попробуйте предоставить базовый скрипт, воспроизводящий вашу ошибку, чтобы разрешить тестирование. –

+0

Кроме того, как хороший практический способ, вы должны использовать 'multiprocessing.utils.log_to_stderr' и' multiprocessing.utils.debug/info', чтобы получить последовательную регистрацию через ваш «процесс». Если вам нужно использовать собственное ведение журнала, вы должны запустить его в начале вашей целевой функции, поскольку «регистрация» предназначена только для работы с одним «процессом». –

Смежные вопросы