2016-01-07 7 views
0

Я пишу что-то в Python 3, чтобы получить прокси с сайтов и проверить, действительны ли прокси. Я использовал модуль очереди и потоковой передачи, чтобы ускорить процедуру проверки. Однако следствие было странным.queue and thread, while loop in thread

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join() 

Это пример из документа очереди. Мой код основан на этом примере.

Итак, мой вопрос: Когда будет цикл while в рабочем() конце?

Когда число элементов в очереди превышает 200, q сохраняет блок-код и 1 элемент в очереди не может обрабатываться, а 1 поток продолжает делать q.get(), в то время как другие потоки говорят, что q было пустой.

Пожалуйста, помогите мне. Благодарю. И сожалею о моем бедном английском. Я все еще работаю над этим.

---- Обновление ----------------------------------------- ----------------------------

Я пробовал ThreadPoolExecutor, и он работал, как потоки и очереди. Но ситуация с блокировкой не изменилась.

После 20-минутной игры один пробный запуск кода закончился и распечатал ожидаемый результат.

Я обнаружил, что процедура проверки заканчивается через 2 или 3 минуты (для 100 прокси), а код просто блокируется в течение примерно 10 минут до его окончания.

И второй вопрос: Что может быть причиной этого?

Спасибо!)

---- Обновление --------------------------------------- -------------------------------

Проблема решена!

Я думал, что это была нить, которая вызывает блок, но оказывается, что соединение и время передачи - это причинность.

Поскольку я использую pycurl для проверки прокси, и pycurl По умолчанию TIMEOUT 300.

Я только установить ConnectTimeout 5 и игнорировали АУТ, который ограничивает все время передачи.

И это новый код я использую для проверки прокси:

c = pycurl.Curl() 

c.setopt(c.URL, url) 
c.setopt(c.HTTPHEADER, headers) 
c.setopt(c.PROXY, proxy) 
c.setopt(c.WRITEFUNCTION, lambda x: None) 
c.setopt(c.CONNECTTIMEOUT, 5) 
*c.setopt(c.TIMEOUT, 5)* 

c.perform() 
c.close() 

Однако установка ТАЙМОУТ 5 уменьшено число действительных прокси значительно. Я буду продолжать пытаться получить лучшее значение TIMEOUT.

+0

Цикл wile не закончится, так как True имеет всегда значение true. Используйте, например. 'while not q.empty()' – Finwood

+0

@Finwood Спасибо за ваш ответ. Поскольку while цикл не закончится, я полагаю, что нити не закончится. Это правильно? Или потоки заканчиваются, когда q пуст? Или просто продолжайте делать цикл? – CSSer

+0

Поскольку ваши потоки настроены как _daemon_, они заканчиваются, когда заканчивается основная программа. ['q.join()'] (https://docs.python.org/3.5/library/queue.html#queue.Queue.join) в конце ждет, что очередь станет пустой, поэтому все ваши потоки завершатся в то время. Однако вы можете взглянуть на ['ThreadPoolExecutor'] (https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example). – Finwood

ответ

1

A while True петля без этого никогда не закончится, и ваши потоки никогда не прекратятся. Когда выходите, вы должны явно указать свои потоки.

способ сделать это с помощью дозорных, как это:

end_of_queue = object() 

def worker(): 
    while True: 
     item = q.get() 
     if item is end_of_queue: 
      q.task_done() 
      break 
     do_work(item) 
     q.task_done() 

q = Queue() 

for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

for i in range(num_worker_threads): 
    q.put(end_of_queue) 

q.join() 

Что я сделал здесь, добавив несколько end_of_queue элементов в очереди, по одному для каждого потока. Когда поток видит этот объект end_of_queue, это означает, что он должен выйти и может выйти из цикла.

Если вы предпочитаете другой подход, вы можете рассмотреть возможность использования Event object уведомить нити, когда они должны бросить курить, как это:

quit_event = Event() 

def worker(): 
    while not q.empty() or not quit_event.is_set(): 
     try: 
      item = q.get(timeout=.1) 
     except Empty: 
      continue 
     do_work(item) 
     q.task_done() 

q = Queue() 

for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

quit_event.set() 
q.join() 

Недостатком этого решения является то, что вы должны get() с тайм-аут.

И последнее, но не в последнюю очередь, ваш код, кажется, мог бы извлечь выгоду из использования thread pool, как это:

with ThreadPoolExecutor(max_workers=num_worker_threads) as executor: 
    executor.map(do_work, source()) 

(Для справки, ThreadPoolExecutor uses the end_of_queue approach, только два различия, что end_of_queue является None и каждый поток отвечает для уведомления других из них.)

+0

Благодарим вас за ответ. Я попробовал пул потоков, и он работал отлично, на некоторое время. Ознакомьтесь с обновлением. – CSSer

0

просто еще один пример использования потоков, очереди и цикла из класса

import threading 
import Queue 

q = Queue.Queue() 

class listener(object): 
    def __init__(self): 
     thread = threading.Thread(target=self.loop) 
     # thread.daemon = True 
     thread.start() 

    def loop(self): 
     for i in xrange(0,13): 
      q.put(i) 

class ui(object): 
    def __init__(self): 
     listener() 
     while True: 
      item = q.get() 
      print item 
      if item == 10: 
       break 
ui()