2016-05-25 2 views
3

Я создаю скрипт для загрузки и анализа информации о преимуществах для планов медицинского страхования на обменах Obamacare. Часть этого требует загрузки и разбора плана, используя JSON-файлы от каждой отдельной страховой компании. Для этого я использую concurrent.futures.ThreadPoolExecutor с 6 работниками для загрузки каждого файла (с urllib), разбора и петли через JSON и извлечения соответствующей информации (которая хранится во вложенном словаре в скрипте).Использование памяти с concurrent.futures.ThreadPoolExecutor в Python3

(работает Python 3.5.1 (v3.5.1: 37a07cee5969, 6 Декабря 2015, 1:38:48) [MSC v.1900 32 бит (Intel)] на win32)

Проблема заключается в том, что, когда Я делаю это одновременно, сценарий, похоже, не освобождает память после того, как он загрузил \ parsed \ looped через файл JSON, и через некоторое время он сработает с malloc, создавая ошибку памяти.

Когда я делаю это серийно - с простой петлей for in - однако программа не разбивается и не требует особого объема памяти.

def load_json_url(url, timeout): 
    req = urllib.request.Request(url, headers={ 'User-Agent' : 'Mozilla/5.0' }) 
    resp = urllib.request.urlopen(req).read().decode('utf8') 
    return json.loads(resp) 



with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: 
     # Start the load operations and mark each future with its URL 
     future_to_url = {executor.submit(load_json_url, url, 60): url for url in formulary_urls} 
     for future in concurrent.futures.as_completed(future_to_url): 
      url = future_to_url[future] 
      try: 
       # The below timeout isn't raising the TimeoutError. 
       data = future.result(timeout=0.01) 
       for item in data: 
         if item['rxnorm_id']==drugid: 
          for row in item['plans']: 
           print (row['drug_tier']) 
           (plansid_dict[row['plan_id']])['drug_tier']=row['drug_tier'] 
           (plansid_dict[row['plan_id']])['prior_authorization']=row['prior_authorization'] 
           (plansid_dict[row['plan_id']])['step_therapy']=row['step_therapy'] 
           (plansid_dict[row['plan_id']])['quantity_limit']=row['quantity_limit'] 

      except Exception as exc: 
       print('%r generated an exception: %s' % (url, exc)) 


      else: 
       downloaded_plans=downloaded_plans+1 

ответ

3

Это не твоя вина. as_complete() не выпускает свои фьючерсы до завершения. Уже зарегистрирован вопрос: https://bugs.python.org/issue27144

На данный момент я считаю, что подход большинства состоит в том, чтобы обернуть as_complete() внутри другого цикла, который блокирует разумное количество фьючерсов, в зависимости от того, сколько оперативной памяти вы хотите потратить и насколько велики ваши результат будет. Он будет блокироваться на каждом фрагменте, пока вся работа не исчезнет до перехода к следующему фрагменту, поэтому будьте медленнее или потенциально застряли в середине в течение длительного времени, но пока я не вижу другого пути, хотя буду держать этот ответ в курсе, когда умнее путь.

1

В качестве альтернативного решения вы можете позвонить по телефону add_done_callback со своим фьючерсом, а не использовать as_completed. Ключ НЕ ведет ссылки на фьючерсы. Итак, future_to_url список в оригинальном вопросе - плохая идея.

Что я сделал в основном:

def do_stuff(future): 
    res = future.result() # handle exceptions here if you need to 

f = executor.submit(...) 
f.add_done_callback(do_stuff) 
Смежные вопросы