2010-09-06 3 views
0

как хорошо этот код на Python? нужна критика) В этом коде есть ошибка, иногда сценарий делает печать «ALL WAIT - CAN FINISH!» и заморозить (больше никаких действий не происходит ..), но я не могу найти причину, почему это произошло?критика этот код python (искатель с threadpool)

сайт гусеничный с ThreadPool:

import sys 
from urllib import urlopen 
from BeautifulSoup import BeautifulSoup, SoupStrainer 
import re 
from Queue import Queue, Empty 
from threading import Thread 

W_WAIT = 1 
W_WORK = 0 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, pool, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.start() 
     self.pool = pool 
     self.state = None 

    def is_wait(self): 
     return self.state == W_WAIT 


    def run(self): 
     while True: 
      #if all workers wait - time to exsit 
      print "CHECK WAIT: !!! ",self.pool.is_all_wait() 
      if self.pool.is_all_wait(): 
       print "ALL WAIT - CAN FINISH!" 
       return 
      try: 
       func, args, kargs = self.tasks.get(timeout=3) 
      except Empty: 
       print "task wait timeout" 
       continue 

      self.state = W_WORK 
      print "START !!! in thread %s" % str(self) 
      #print args 

      try: func(*args, **kargs) 
      except Exception, e: print e 
      print "!!! STOP in thread %s", str(self) 
      self.tasks.task_done() 
      self.state = W_WAIT 
      #threads can fast empty it! 
      #if self.tasks.qsize() == 0: 
      # print "QUIT!!!!!!" 
      # break 

class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     #self.tasks = Queue(num_threads) 
     self.tasks = Queue() 
     self.workers = [] 
     for _ in range(num_threads): 
      self.workers.append(Worker(self,self.tasks)) 


    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 

    def is_all_wait(self): 
     for w in self.workers: 
      if not w.is_wait(): 
       return False 
     return True 

visited = set() 
queue = Queue() 
external_links_set = set() 
internal_links_set = set() 
external_links = 0 

def process(pool,host,url): 

    try: 

     content = urlopen(url).read() 
    except UnicodeDecodeError: 
     return 


    for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')): 
     try: 
      href = link['href'] 
     except KeyError: 
      continue 


     if not href.startswith('http://'): 
      href = 'http://%s%s' % (host, href) 
     if not href.startswith('http://%s%s' % (host, '/')): 
      continue 

     internal_links_set.add(href) 


     if href not in visited: 
      visited.add(href) 
      pool.add_task(process,pool,host,href) 

     else: 
      pass 

def start(host,charset): 
    pool = ThreadPool(20) 
    pool.add_task(process,pool,host,'http://%s/' % (host)) 
    pool.wait_completion() 

start('evgenm.com','utf8') 

Спасибо за помощь! Я делаю новую реализацию: Что вы можете сказать об этом коде # 2? = = = = ===========================

import sys 
    from urllib import urlopen 
    from BeautifulSoup import BeautifulSoup, SoupStrainer 
    import re 
    from Queue import Queue, Empty 
    from threading import Thread 


    W_STOP = 1 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, pool, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.pool = pool 
     self.state = None 
     self.start() 



    def stop(self): 
     self.state = W_STOP 

    def run(self): 
     while True: 
      if self.state == W_STOP: 
       print "\ncalled stop" 
       break 
      try: 
       func, args, kargs = self.tasks.get(timeout=3) 
      except Empty: 
       continue 
      print "\n***START*** %s" % str(self) 
      try: 
       func(*args, **kargs) 
      except Exception, e: 
       print e 
      print "\n***STOP*** %s", str(self) 
      self.tasks.task_done() 



class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     #self.tasks = Queue(num_threads) 
     self.tasks = Queue() 
     self.workers = [] 
     for _ in range(num_threads): 
      self.workers.append(Worker(self,self.tasks)) 


    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 

    def stop_threads(self): 
     for w in self.workers: 
      w.stop() 

    def wait_stop(self): 
     self.wait_completion() 
     self.stop_threads() 



    visited = set() 
    queue = Queue() 
    external_links_set = set() 
    internal_links_set = set() 
    external_links = 0 

    def process(pool,host,url): 

     try: 

      content = urlopen(url).read() 
     except UnicodeDecodeError: 
      return 


     for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')): 
      try: 
       href = link['href'] 
      except KeyError: 
       continue 


      if not href.startswith('http://'): 
       href = 'http://%s%s' % (host, href) 
      if not href.startswith('http://%s%s' % (host, '/')): 
       continue 

      internal_links_set.add(href) 


      if href not in visited: 
       visited.add(href) 
       pool.add_task(process,pool,host,href) 

      else: 
       pass 

    def start(host,charset): 
     pool = ThreadPool(20) 
     pool.add_task(process,pool,host,'http://%s/' % (host)) 
     pool.wait_stop() 

    start('evgenm.com','utf8') 
+0

Критикай? ... черт возьми, какой софтбол. Наверное, я буду сопротивляться желанию быть юмористическим, но ... – bmargulies

+0

, конечно, вы правы) с вашей точки зрения) – Evg

ответ

1

Вы используете состояние между потоками (то есть в is_all_wait) без синхронизации. Кроме того, тот факт, что все потоки «ждут», не является надежным индикатором того, что очередь пуста (например, все они могут быть в процессе получения задачи). Я подозреваю, что иногда потоки выходят до того, как очередь действительно пуста. Если это происходит достаточно часто, у вас останутся задачи в очереди, но нет потоков для их запуска. Так что queue.join() будет ждать вечно.

Моя рекомендация является:

  1. Избавьтесь от is_all_wait - это не является надежным показателем
  2. Избавиться от задачи state - это на самом деле не нужно
  3. Положитесь на queue.join, чтобы вы знали когда все обрабатывается

Если вам нужно убить потоки (например, это часть более крупной, долговременной программы), то сделайте это после queue.join().

+0

thanx для вас, я делаю реализацию # 2, что вы думаете об этом? – Evg

+0

Да, это выглядит разумно. В целом, код выглядит довольно чистым и простым в использовании. –

0

у меня есть базовые знания питона, но многопоточности в Python не бесполезно? Я видел множество статей, критикующих глобальный интерпретатор шлюзов.

+0

Как вы говорите, «нарезание резьбы на питоне не бесполезно» ;-). GIL специфичен для CPython. Например, Jython использует синхронизацию Java, чтобы сделать поток интерпретатора безопасным. Кроме того, GIL оказывает менее серьезное влияние на задачи, связанные с IO, чем задачи, связанные с CPU. Наконец, код в C-расширениях может освобождать GIL, когда им не нужно обращаться к структурам данных Python. Так что это действительно не такая большая сделка, как это сделано для большинства целей. –

Смежные вопросы