2015-05-12 2 views
8

Я хочу использовать объект multiprocessing.Manager(), чтобы я мог асинхронно отправлять информацию от рабочего диспетчера для отправки информации на сервер. У меня около 10 экземпляров, записывающих PDF-файлы на диск. Затем я хотел использовать объект-менеджер в пакете многопроцессорности, чтобы отправить эти данные в мой ведро S3, потому что я не хочу задерживать создание локального контента.Правильное проектирование пользовательского объекта Multiprocessing.Manager

Так что мне было интересно, создаю ли я пользовательский объект-менеджер, это правильный способ сделать это? Будет ли каждый процесс передан объекту менеджера в очередь? или если я вызову несколько загрузок, менеджер отменит некоторые из вызовов?

Ниже приведен пример кода, что я имею в виду делать:

from multiprocessing.managers import BaseManager 

class UploadClass(object): 
    def upload(self, filePath, params, destUrl): 
     # do stuff 
     return results 

class MyManager(BaseManager): 
    pass 

MyManager.register('uploads', UploadClass) 

if __name__ == '__main__': 
    manager = MyManager() 
    manager.start() 
    upload = manager.uploads() 
    # do this wait for completion or do they perform this async 
    print upload.upload(r"<path>", {...}, "some url") 
    print upload.upload(r"<path>", {...}, "some url") 
+0

Просто поясните: вы хотите иметь десять различных процессов (это уникальные экземпляры одного и того же сценария python или просто многопроцессорные экземпляры. Процессы, порожденные внутри одного скрипта?), которые все пишут PDF-файлы на диск. После того, как они будут выполнены, каждый экземпляр отправит путь файла к одному «многопроцессорному.Manager», который должен загружать файлы по одному (что означает отсутствие параллельных загрузок). Это правильно? – dano

+0

Также вы хотите получить результат от процесса загрузки? Или вы просто хотите отключить загрузку в фоновом режиме и забыть об этом? – dano

+0

@ dano - было бы полезно вернуть какое-то сообщение из процесса, чтобы процесс работал правильно. –

ответ

2

Чтобы получить ответы на некоторые вопросы:

Будет ли каждый процесс, представленный на объект менеджера ставятся в очередь?

Сервер Manager генерирует новый поток для обработки каждого входящего запроса, поэтому все ваши запросы начнут обрабатываться мгновенно. Вы можете увидеть это внутри multiprocessing/managers.py:

def serve_forever(self): 
    ''' 
    Run the server forever 
    ''' 
    current_process()._manager_server = self 
    try: 
     try: 
      while 1: 
       try: 
        c = self.listener.accept() 
       except (OSError, IOError): 
        continue 
       t = threading.Thread(target=self.handle_request, args=(c,)) 
       t.daemon = True 
       t.start() 
     except (KeyboardInterrupt, SystemExit): 
      pass 
    finally: 
     self.stop = 999 
     self.listener.close() 

, если я позвоню загрузки нескольких, будет менеджер падение некоторые из вызовов?

Нет, ни один из вызовов не будет удален.

# do this wait for completion or do they perform this async 
print upload.upload(r"<path>", {...}, "some url") 
print upload.upload(r"<path>", {...}, "some url") 

Оба звонков upload.upload будут синхронны; они не вернутся, пока не закончится UploadClass.upload. Однако, если бы у вас было несколько сценариев/потоков/процессов, вызывающих одновременно upload.upload, каждый уникальный вызов будет происходить одновременно внутри собственного потока в процессе сервера Manager.

И ваш самый самый важный вопрос:

это правильный способ сделать это?

Я бы сказал, нет, если я правильно понял вопрос. Если есть только один сценарий, а затем икра десять multiprocessing.Process экземпляров внутри этого одного сценария, чтобы выписать PDF-файлы, то вы должны просто использовать другой multiprocessing.Process для обработки загрузки:

def upload(self, q): 
    for payload in iter(q.get, None): # Keep getting from the queue until a None is found 
     filePath, params, destUrl = payload 
     # do stuff 

def write_pdf(pdf_file_info, q): 
    # write a pdf to disk here 
    q.put((filepath, params, destUrl)) # Send work to the uploader 
    # Move on with whatever comes next. 

if __name__ == '__main__': 
    pdf_queue = multiprocessing.Queue() 

    # Start uploader 
    upload_proc = multiprocessing.Process(upload, args=(pdf_queue,)) 
    upload_proc.start() 

    # Start pdf writers 
    procs = [] 
    for pdf in pdfs_to_write: 
     p = multiprocessing.Process(write_pdf, args=(pdf, pdf_queue)) 
     p.start() 
     p.append(procs) 

    # Wait for pdf writers and uploader to finish. 
    for p in procs: 
     p.join() 
    pdf_queue.put(None) # Sending None breaks the for loop inside upload 
    upload_proc.join() 

Если вы на самом деле в порядке с одновременно загружает, тогда нет необходимости иметь отдельный процесс upload - просто загрузите его из процессов записи PDF напрямую.

Трудно сказать по вашему вопросу, если это именно то, что вы делаете. Как только вы уточните, я отрегулирую этот последний фрагмент, чтобы он соответствовал вашему конкретному случаю использования.

+0

Зачем обрабатывать процессы, чтобы помещать данные в очередь вместо прямого ввода из основного процесса? – sirfz

+0

@Sir_FZ OP заявила, что у него было несколько экземпляров, записывающих PDF-файлы: * «У меня примерно 10 экземпляров, записывающих PDF-файлы на диск» *. Таким образом, есть несколько работников, которые в конечном итоге помещают элементы в очередь параллельно. – dano

+0

Хорошая точка. Но при использовании диспетчера OP имеет преимущество одновременного обработки нескольких загрузок (поскольку диспетчер создает поток для каждого запроса), и поскольку он включает IO, применяется параллелизм. Хотя в вашем решении у вас есть один процесс обработки загрузки последовательно. Я предлагаю использовать ThreadPool в процессе загрузки для асинхронной обработки запросов на загрузку, помещенных в очередь. – sirfz

Смежные вопросы