2014-02-10 2 views
1

Im используя следующий код для многопоточного urlib2. Однако каков наилучший способ ограничить количество потоков, которые он потребляет?Ограничение потоков в Python Threading, Queue

class ApiMultiThreadHelper: 

    def __init__(self,api_calls): 
     self.q = Queue.Queue() 
     self.api_datastore = {} 
     self.api_calls = api_calls 
     self.userpass = '#####' 

    def query_api(self,q,api_query): 
     self.q.put(self.issue_request(api_query)) 

    def issue_request(self,api_query): 

     self.api_datastore.update({api_query:{}}) 

     for lookup in ["call1","call2"]: 
      query = api_query+lookup 

      request = urllib2.Request(query) 
      request.add_header("Authorization", "Basic %s" % self.userpass) 
      f = urllib2.urlopen(request) 
      response = f.read() 
      f.close() 

      self.api_datastore[api_query].update({lookup:response}) 

     return True 

    def go(self): 
     threads = [] 
     for i in self.api_calls: 
      t = threading.Thread(target=self.query_api, args = (self.q,i)) 
      t.start() 
      threads.append(t) 

     for t in threads: 
      t.join() 

ответ

1

Вы должны использовать пул потоков. Вот моя реализация я сделал лет назад (Python 3.x дружественных):

import traceback 
from threading import Thread 
try: 
    import queue as Queue # Python3.x 
except ImportError: 
    import Queue 

class ThreadPool(object): 
    def __init__(self, no=10): 
     self.alive = True 
     self.tasks = Queue.Queue() 
     self.threads = [] 
     for _ in range(no): 
      t = Thread(target=self.worker) 
      t.start() 
      self.threads.append(t) 

    def worker(self): 
     while self.alive: 
      try: 
       fn, args, kwargs = self.tasks.get(timeout=0.5) 
      except Queue.Empty: 
       continue 
      except ValueError: 
       self.tasks.task_done() 
       continue 

      try: 
       fn(*args, **kwargs) 
      except Exception: 
       # might wanna add some better error handling 
       traceback.print_exc() 

      self.tasks.task_done() 

    def add_job(self, fn, args=[], kwargs={}): 
     self.tasks.put((fn, args, kwargs)) 

    def join(self): 
     self.tasks.join() 

    def deactivate(self): 
     self.alive = False 
     for t in self.threads: 
      t.join() 

Вы также можете найти подобный класс в multiprocessing.pool модуле (не спрашивайте меня, почему именно там). Затем вы можете реорганизовать свой код следующим образом:

def go(self): 
    tp = ThreadPool(20) # <-- 20 thread workers 
    for i in self.api_calls: 
     tp.add_job(self.query_api, args=(self.q, i)) 
    tp.join() 
    tp.deactivate() 

Число потоков определено априори.

Смежные вопросы