2012-02-12 1 views
5

Несколько дней назад я задал вопрос о SO о помощи мне разработать парадигму для структурирования несколько HTTP запросовразличия пропускной способности при использовании сопрограмм против пронизывающие

Вот сценарий. Я бы хотел, чтобы у вас была многопроцессорная многопользовательская система. Мои продюсеры сканируют и очищают несколько сайтов и добавляют ссылки, которые он находит в очередь. Поскольку я буду сканировать несколько сайтов, я хотел бы иметь несколько продюсеров/сканеров.

Потребители/работники передают эту очередь, делают запросы TCP/UDP на эти ссылки и сохраняют результаты в моем Django DB. Я также хотел бы иметь нескольких сотрудников, поскольку каждый элемент очереди полностью независим друг от друга.

Люди предложили использовать библиотеку сопрограммы для этого, например, Gevent или Eventlet. Никогда не работая с сопрограммами, я читал, что, хотя парадигма программирования похожа на потоковые парадигмы, активно выполняется только один поток, но когда происходит блокировка вызовов - например, вызовы ввода-вывода - стеки переключаются в памяти, а другая зеленая нить берет на себя, пока не встретит какой-то блокирующий вызов ввода-вывода. Надеюсь, у меня все получилось? Вот код из одного из моих постов SO:

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 


def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 


def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 


for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

# This doesn't work. 
for j in range(2): 
    producers.append(gevent.spawn(producer)) 

# Uncommenting this makes this script work. 
# producer() 

q.join() 

Это хорошо работает, потому что sleep вызовы блокирующие вызовы и когда происходит событие sleep, другой зеленый нить берет на себя. Это намного быстрее, чем последовательное выполнение. Как вы можете видеть, у меня нет кода в моей программе, который преднамеренно дает выполнение одного потока в другой поток. Я не вижу, как это вписывается в описанный выше сценарий, поскольку я хотел бы, чтобы все потоки выполнялись одновременно.

Все работает отлично, но я чувствую, что пропускная способность, которую я достиг с помощью Gevent/Eventlets, выше, чем исходная программа, работающая последовательно, но значительно ниже, чем можно было бы достичь с помощью реальной потоковой передачи.

Если бы я должен был выполнить повторную реализацию моей программы с использованием механизмов потоковой обработки, каждый из моих производителей и потребителей мог бы одновременно работать без необходимости свопить стеки внутрь и наружу как сопрограммы.

Должно ли это быть реализовано с использованием резьбы? Является ли мой дизайн неправильным? Я не видел реальных преимуществ использования сопрограмм.

Возможно, мои понятия немного грязные, но это то, что я усвоил. Любая помощь или разъяснение моей парадигмы и концепций будет отличной.

Благодаря

+0

Почему бы не использовать несколько процессов? –

+0

Я не знаю плюсов и минусов многопоточности против многопроцессорной обработки, поэтому я не знаю, хорошо ли это или нет. –

+1

нет такой вещи, как «настоящая потоковая передача» (только один фактический поток ОС выполняется в любой момент времени) в программах Python, не прибегая к C-расширениям (или тяжелым ОС ОС) из-за глобальной блокировки интерпретатора. –

ответ

5

Как вы можете видеть, у меня нет никакого кода в моей программе, которая намеренно дает выполнение одного потока к другому потоку. Я не вижу , как это вписывается в описанный выше сценарий, так как я хочу, чтобы все потоки выполнялись одновременно.

Существует одна нить ОС, но несколько зеленых. В вашем случае gevent.sleep() позволяет рабочим выполнять одновременно. Блокирование вызовов ввода-вывода, таких как urllib2.urlopen(url).read(), делает то же самое, если вы используете urllib2, исправленный для работы с gevent (по телефону gevent.monkey.patch_*()).

См. Также A Curious Course on Coroutines and Concurrency, чтобы понять, как код может работать одновременно в однопоточной среде.

Для сравнения пропускной различия между GEvent, нарезания резьбы, многопроцессорная вы могли бы написать код, который совместим со всеми aproaches:

#!/usr/bin/env python 
concurrency_impl = 'gevent' # single process, single thread 
##concurrency_impl = 'threading' # single process, multiple threads 
##concurrency_impl = 'multiprocessing' # multiple processes 

if concurrency_impl == 'gevent': 
    import gevent.monkey; gevent.monkey.patch_all() 

import logging 
import time 
import random 
from itertools import count, islice 

info = logging.info 

if concurrency_impl in ['gevent', 'threading']: 
    from Queue import Queue as JoinableQueue 
    from threading import Thread 
if concurrency_impl == 'multiprocessing': 
    from multiprocessing import Process as Thread, JoinableQueue 

Остальная часть сценария одинакова для всех реализаций параллелизм:

def do_work(wid, value): 
    time.sleep(random.randint(0,2)) 
    info("%d Task %s done" % (wid, value)) 

def worker(wid, q): 
    while True: 
     item = q.get() 
     try: 
      info("%d Got item %s" % (wid, item)) 
      do_work(wid, item) 
     finally: 
      q.task_done() 
      info("%d Done item %s" % (wid, item)) 

def producer(pid, q): 
    for item in iter(lambda: random.randint(1, 11), 10): 
     time.sleep(.1) # simulate a green blocking call that yields control 
     info("%d Added item %s" % (pid, item)) 
     q.put(item) 
    info("%d Signal Received" % (pid,)) 

не выполнять код на уровне модуля поместить его в main():

def main(): 
    logging.basicConfig(level=logging.INFO, 
         format="%(asctime)s %(process)d %(message)s") 

    q = JoinableQueue() 
    it = count(1) 
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)] 
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)] 
    for t in producers+workers: 
     t.daemon = True 
     t.start() 

    for t in producers: t.join() # put items in the queue 
    q.join() # wait while it is empty 
    # exit main thread (daemon workers die at this point) 

if __name__=="__main__":  
    main() 
+0

Привет, Себастьян, я просмотрел свой код и увидел, что мои производители и потребители работают одновременно. Когда операция блокировки происходит в одной из моих зеленых, она дает контроль над другими зелеными. Я добавил отсутствующий вызов 'monkey_patch', так что модуль сокета не блокируется, но я не могу получить достаточный хруст моего процессора. У обычного ПК достаточно сока, чтобы иметь больше одновременных соединений и больше зеленых, но я не получаю достаточной скорости. Я очень потерял и смутил, почему он не использует больше процессора и работает быстрее. Не могли бы вы помочь мне понять, пожалуйста? Я очень потерян. Благодарю. –

+0

@Mridang Agarwalla: Я прокомментировал код, который вы разместили в своем вопросе. «производители» * не работают одновременно. – jfs

+1

@Mridang Agarwalla: если ваша проблема связана с IO (диск, сеть), то не имеет значения, насколько быстр ваш процессор, например, если вы можете писать на диск только со скоростью 50 МБ/с, то не имеет значения, что ваш процессор может процесс 1 ГБ/с. Также ваша программа может потреблять другие конечные ресурсы, такие как количество открытых файлов. Если вы используете 'gevent', убедитесь, что все блокирующие вызовы являются« зелеными », то есть они не блокируются, например, ваш драйвер базы данных может быть несовместим с' gevent'. – jfs

1

GEvent это здорово, когда у вас есть очень много (зеленый) темы. Я тестировал его с тысячами, и он работал очень хорошо. вы убедитесь, что все библиотеки, которые вы используете как для очистки, так и для сохранения в db, становятся зелеными. afaik, если они используют сокет python, gentent-инъекция должна работать. расширения, написанные на языке C (например, mysqldb), будут блокироваться, и вместо этого вам нужно будет использовать зеленые эквиваленты.

Если вы используете gevent, вы могли бы в большинстве своем уйти с очередями, порождать новую (зеленую) нить для каждой задачи, код для потока такой же простой, как db.save(web.get(address)). gevent позаботится об упреждении, когда какая-либо библиотека в db или веб-блоках. он будет работать до тех пор, пока ваши задачи будут соответствовать памяти.

0

В этом случае проблема заключается не в скорости программы (например, при выборе gevent или threading), а в пропускной способности сети IO. Это (должно быть) узкое место, определяющее скорость выполнения программы.

Gevent - отличный способ убедиться, что является узким местом, а не вашей архитектурой вашей программы.

Это своего рода процесс вы хотите:

import gevent 
from gevent.queue import Queue, JoinableQueue 
from gevent.monkey import patch_all 


patch_all() # Patch urllib2, etc 


def worker(work_queue, output_queue): 
    for work_unit in work_queue: 
     finished = do_work(work_unit) 
     output_queue.put(finished) 
     work_queue.task_done() 


def producer(input_queue, work_queue): 
    for url in input_queue: 
     url_list = crawl(url) 
     for work in url_list: 
      work_queue.put(work) 
     input_queue.task_done() 


def do_work(work): 
    gevent.sleep(0) # Actually proces link here 
    return work 


def crawl(url): 
    gevent.sleep(0) 
    return list(url) # Actually process url here 

input = JoinableQueue() 
work = JoinableQueue() 
output = Queue() 

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)] 
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)] 


list_of_urls = ['foo', 'bar'] 

for url in list_of_urls: 
    input.put(url) 

# Wait for input to finish processing 
input.join() 
print 'finished producing' 
# Wait for workers to finish processing work 
work.join() 
print 'finished working' 

# We now have output! 
print 'output:' 
for message in output: 
    print message 
# Or if you'd like, you could use the output as it comes! 

Вам не нужно ждать ввода и очереди работ до конца, я только что показал, что здесь.

Смежные вопросы