2016-11-03 4 views
1

Я написал программу, которая будет публиковать события, используя asyncio и aiohttp. Эта программа работает, когда я запускаю ее локально. Я могу опубликовать 10 тыс. Событий без проблем. Тем не менее, я SCPed все кодовый к удаленной машине и в этой машине я не могу отправить более чем 15 событий, не получает эту ошибку:Невозможно сделать большое количество сообщений на удаленной машине по сравнению с локальным использованием asynio и aiohttp

RuntimeError: Event loop is closed 
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a53989410> 
Traceback (most recent call last): 
    File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__ 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed 
RuntimeError: Event loop is closed 
Exception ignored in: <coroutine object Poster.async_post_event at 0x7f4a5397ffc0> 
Traceback (most recent call last): 
    File "/home/bli1/qe-trinity/tracer/utils/poster.py", line 63, in async_post_event 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__ 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/client.py", line 198, in _request 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 316, in connect 
    File "/home/bli1/py/python3.5/lib/python3.5/site-packages/aiohttp/connector.py", line 349, in _release_waiter 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 332, in set_result 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon 
    File "/home/bli1/py/python3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed 
RuntimeError: Event loop is closed 

Как я могу отладить это или выяснить причину этой проблемы?

Вот класс, который я создал, и я использую метод post() для запуска:

import uuid 
import os 
import asyncio 
import time 
import random 
import json 
import aiohttp 
from tracer.utils.phase import Phase 

class Poster(Phase): 
    def __init__(self, log, endpoint, num_post, topic, datafile, timeout, oracles, secure=False, thru_proxy=True): 
     Phase.__init__(self, log, "post", oracles, secure, thru_proxy) 
     self.log = log 
     self.num_post = int(num_post) 
     self.datafile = datafile.readlines() 
     self.topic = topic 
     self.endpoint = self.set_endpoint(endpoint, self.topic) 
     self.response = None 
     self.timeout = timeout 

    def random_line(self): 
     """ Returns random line from file and converts it to JSON """ 
     return json.loads(random.choice(self.datafile)) 

    @staticmethod 
    def change_uuid(event): 
     """ Creates new UUID for event_id """ 
     new_uuid = str(uuid.uuid4()) 
     event["event_header"]["event_id"] = new_uuid 
     return event 

    @staticmethod 
    def wrapevent(event): 
     """ Wrap event with metadata for analysis later on """ 
     return { 
      "tracer": { 
       "post": { 
        "statusCode": None, 
        "timestamp": None, 
       }, 
       "awsKafkaTimestamp": None, 
       "qdcKakfaTimestamp": None, 
       "hdfsTimestamp": None 
      }, 
      "event": event 
     } 

    def gen_random_event(self): 
     random_event = self.random_line() 
     event = self.change_uuid(random_event) 
     dataspec = self.wrapevent(event) 
     return dataspec 

    async def async_post_event(self, event, session): 
     async with session.post(self.endpoint, data=event, proxy=self.proxy) as resp: 
      event["tracer"]["post"]["timestamp"] = time.time() * 1000.0 
      event["tracer"]["post"]["statusCode"] = resp.status 
      unique_id = event["event"]["event_header"]["event_id"] 
      oracle_endpoint = os.path.join(self.oracle, unique_id) 
     async with session.put(oracle_endpoint, data=json.dumps(event), proxy=self.proxy) as resp: 
      if resp.status != 200: 
       self.log.debug("Post to ElasticSearch not 200") 
       self.log.debug(event["event"]["event_header"]["event_id"]) 
       self.log.debug("Status code: " + str(resp.status)) 
      return event["event"]["event_header"]["event_id"], resp.status 

    async def async_post_events(self, events): 
     coros = [] 
     conn = aiohttp.TCPConnector(verify_ssl=self.secure) 
     async with aiohttp.ClientSession(connector=conn) as session: 
      for event in events: 
       coros.append(self.async_post_event(event, session)) 
      return await asyncio.gather(*coros) 

    def post(self): 
     event_loop = asyncio.get_event_loop() 
     try: 
      events = [self.gen_random_event() for i in range(self.num_post)] 
      start_time = time.time() 
      results = event_loop.run_until_complete(self.async_post_events(events)) 
      print("Time taken: " + str(time.time() - start_time)) 
     finally: 
      event_loop.close() 

ответ

2

Вы не можете повторно использовать цикл, когда он закрыт. Из AbstractEventLoop.close документации:

This is idempotent and irreversible. No other methods should be called after this one.

Либо удалите loop.close вызов или создать новый цикл для каждой записи.

Мой совет был бы избежать этих проблем, запустив все внутри цикла и ожидая async_post_events при необходимости.

Смежные вопросы