2012-05-29 2 views
4

Здравствуйте! Я пытаюсь написать веб-искатель с помощью python. Я хотел использовать многопоточность python. Даже после прочтения ранее предложенных статей и учебников у меня все еще есть проблема. Мой код здесь (весь исходный код here):Python многопоточный гусеничный

class Crawler(threading.Thread): 

    global g_URLsDict 
    varLock = threading.Lock() 
    count = 0 

    def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 
     self.url = self.queue.get() 

    def run(self): 
     while 1: 
      print self.getName()+" started" 
      self.page = getPage(self.url) 
      self.parsedPage = getParsedPage(self.page, fix=True) 
      self.urls = getLinksFromParsedPage(self.parsedPage) 

      for url in self.urls: 

       self.fp = hashlib.sha1(url).hexdigest() 

       #url-seen check 
       Crawler.varLock.acquire() #lock for global variable g_URLs 
       if self.fp in g_URLsDict: 
        Crawler.varLock.release() #releasing lock 
       else: 
        #print url+" does not exist" 
        Crawler.count +=1 
        print "total links: %d"%len(g_URLsDict) 
        print self.fp 
        g_URLsDict[self.fp] = url 
        Crawler.varLock.release() #releasing lock 
        self.queue.put(url) 

        print self.getName()+ " %d"%self.queue.qsize() 
        self.queue.task_done() 
      #self.queue.task_done() 
     #self.queue.task_done() 


print g_URLsDict 
queue = Queue.Queue() 
queue.put("http://www.ertir.com") 

for i in range(5): 
    t = Crawler(queue) 
    t.setDaemon(True) 
    t.start() 

queue.join() 

он не работает по мере необходимости, она не дает никакого результата после того, как нить 1 и excutes иначе некоторое время дает эту ошибку:

Exception in thread Thread-2 (most likely raised during interpreter shutdown): 

Как это исправить? А также я не думаю, что это более эффективно, чем просто для цикла.

Я попытался исправить Run():

def run(self): 
    while 1: 
     print self.getName()+" started" 
     self.page = getPage(self.url) 
     self.parsedPage = getParsedPage(self.page, fix=True) 
     self.urls = getLinksFromParsedPage(self.parsedPage) 

     for url in self.urls: 

      self.fp = hashlib.sha1(url).hexdigest() 

      #url-seen check 
      Crawler.varLock.acquire() #lock for global variable g_URLs 
      if self.fp in g_URLsDict: 
       Crawler.varLock.release() #releasing lock 
      else: 
       #print url+" does not exist" 
       print self.fp 
       g_URLsDict[self.fp] = url 
       Crawler.varLock.release() #releasing lock 
       self.queue.put(url) 

       print self.getName()+ " %d"%self.queue.qsize() 
       #self.queue.task_done() 
     #self.queue.task_done() 
    self.queue.task_done() 

Я экспериментировал с командой task_done(), в разных местах, может кто-нибудь объяснить разницу?

+0

является то, что первый пример отсутствуют некоторые отступы? Похоже, что участники класса должны быть на одном уровне.? –

+0

обновлен первый пример – torayeff

+0

Можете ли вы опубликовать рабочий пример? Какие модули вы импортируете? –

ответ

3

Вызываете только self.url = self.queue.get(), когда инициализируются потоки. Вам нужно попробовать и повторно приобрести URL-адреса из своей очереди внутри цикла while, если вы хотите забрать новые URL-адреса для дальнейшей обработки по строке.

Попробуйте заменить self.page = getPage(self.url) на self.page = getPage(self.queue.get()). Имейте в виду, что функция get будет блокироваться бесконечно. Вероятно, вы захотите тайм-аута через некоторое время и добавьте какой-то способ, чтобы ваши фоновые потоки могли изящно выйти из запроса (что устранит исключение, которое вы видели).

Есть some good examples on effbot.org которые используют get() так, как я описал выше.

Редактировать - Ответы на ваши первоначальные комментарии:

Посмотрите the docs for task_done(); Для каждого звонка до get() (который не требует таймаута) вы должны позвонить task_done(), который сообщает любым блокирующим вызовам join(), что все в этой очереди теперь обработано. Каждый вызов get() будет блокировать (спящий режим), пока он ожидает, что новый URL-адрес будет отправлен в очередь.

Edit2 - Попробуйте альтернативную функцию запуска:

def run(self): 
    while 1: 
     print self.getName()+" started" 
     url = self.queue.get() # <-- note that we're blocking here to wait for a url from the queue 
     self.page = getPage(url) 
     self.parsedPage = getParsedPage(self.page, fix=True) 
     self.urls = getLinksFromParsedPage(self.parsedPage) 

     for url in self.urls: 

      self.fp = hashlib.sha1(url).hexdigest() 

      #url-seen check 
      Crawler.varLock.acquire() #lock for global variable g_URLs 
      if self.fp in g_URLsDict: 
       Crawler.varLock.release() #releasing lock 
      else: 
       #print url+" does not exist" 
       Crawler.count +=1 
       print "total links: %d"%len(g_URLsDict) 
       print self.fp 
       g_URLsDict[self.fp] = url 
       Crawler.varLock.release() #releasing lock 
       self.queue.put(url) 

       print self.getName()+ " %d"%self.queue.qsize() 

     self.queue.task_done() # <-- We've processed the url this thread pulled off the queue so indicate we're done with it. 
+0

и как насчет команды task_done(), где следует я так выразился, и как это влияет? поток спит и время передается другому потоку, когда вызывается команда task_done()? если да, то где параллелизм? Я запутался. – torayeff

+0

См. Мое редактирование для ответов –

+0

вызов task_done() будет просто указывать другие потоки, которые можно использовать в очереди? Я имею в виду, я получаю url из очереди, а затем сразу вызываю task_done(), а второй способ получения url, обрабатывать его, анализировать страницы (в это время я хочу, чтобы другие потоки использовали Queue, потому что обработка страниц занимает некоторое время), затем Я вызываю task_done(), в чем разница? Какой из них будет эффективен – torayeff

Смежные вопросы