2013-11-06 2 views
7

Я хочу сделать много URL-запросов для веб-сайта REST. Обычно между 75-90k. Тем не менее, мне нужно ограничить количество одновременных подключений к веб-сервису.бассейн с множеством запросов.session?

Я начал играть с помощью grequests следующим образом, но быстро начал пережевывать открытые сокеты.

concurrent_limit = 30 
urllist = buildUrls() 
hdrs = {'Host' : 'hostserver'} 
g_requests = (grequests.get(url, headers=hdrs) for url in urls) 
g_responses = grequests.map(g_requests, size=concurrent_limit) 

Поскольку это длится минутку или около того, меня поражают «максимальное количество найденных сокетов». Насколько я могу судить, каждый из вызовов request.get в grequests использует собственный сеанс, что означает, что для каждого запроса открывается новый сокет.

Я нашел примечание к github, ссылаясь на то, как сделать grequests одним сеансом. Но это, по-видимому, эффективно устраняет все запросы в один общий пул. Это, похоже, превзошло цель асинхронных HTTP-запросов.

s = requests.session() 
rs = [grequests.get(url, session=s) for url in urls] 
grequests.map(rs) 

Можно ли использовать grequests или gevent.Pool, чтобы создать несколько сеансов?

Путь другой: Как я могу сделать много одновременных HTTP-запросов, используя либо путем очередей, либо объединения пулов?

ответ

6

В результате я не использовал grequests для решения моей проблемы. Я все еще надеюсь, что это возможно.

Я использовал резьб:

class MyAwesomeThread(Thread): 
    """ 
    Threading wrapper to handle counting and processing of tasks 
    """ 
    def __init__(self, session, q): 
     self.q = q 
     self.count = 0 
     self.session = session 
     self.response = None 
     Thread.__init__(self) 

    def run(self): 
     """TASK RUN BY THREADING""" 
     while True: 
      url, host = self.q.get() 
      httpHeaders = {'Host' : host} 
      self.response = session.get(url, headers=httpHeaders) 
      # handle response here 
      self.count+= 1 
      self.q.task_done() 
     return 

q=Queue() 
threads = [] 
for i in range(CONCURRENT): 
    session = requests.session() 
    t=MyAwesomeThread(session,q) 
    t.daemon=True # allows us to send an interrupt 
    threads.append(t) 


## build urls and add them to the Queue 
for url in buildurls(): 
    q.put_nowait((url,host)) 

## start the threads 
for t in threads: 
    t.start() 
+0

Я думаю, у меня есть аналогичная проблема: http://stackoverflow.com/questions/34593643/gevent-grequests-wont-spawn-more-greenlets, как ОС как 70 потоков? – domoarrigato

2

rs - это список AsyncRequest. У AsyncRequest есть своя сессия.

rs = [grequests.get(url) for url in urls] 
grequests.map(rs) 
for ar in rs: 
    print(ar.session.cookies) 
+2

Да, проблема была в том, что у каждого запроса была своя сессия. Это означало 70 тыс. Сеансов. OS это не очень нравится. Мне нужно ограничить количество сеансов. –

3

Что-то вроде этого:

NUM_SESSIONS = 50 
sessions = [requests.Session() for i in range(NUM_SESSIONS)] 
reqs = [] 
i = 0 
for url in urls: 
    reqs.append(grequests.get(url, session=sessions[i % NUM_SESSIONS] 
    i+=1 
responses = grequests.map(reqs, size=NUM_SESSIONS*5) 

Это должно распространяться на запросы более чем 50 различных сессий.

+0

Почему размер дроссельной заслонки '.map' установлен в 5 раз больше числа сеансов? Не будет ли количество параллельных соединений по существу узким местом по количеству сеансов? – Michael

Смежные вопросы