2014-09-21 3 views
1

Я пытаюсь написать скрипт python, который отправляет несколько DNS-запросов, используя другой сервер имен для каждого запроса.Python: Реализация параллельных запросов DNS (конвейерная обработка)

Реализация последовательного решения легко с помощью dnspython, но слишком медленная для меня. Добавление параллелизма с помощью пула потоков невозможно, так как в моем конкретном случае все запросы используют один и тот же исходный порт (здесь также не поможет REUSE_ADDRESS).

По вышеуказанным причинам я думал об использовании следующего решения (канав использования модуля распознавателя dnspython, но, воспользовавшись его здание сообщения и разбора модулей):

  • позволяет до запросов X, чтобы быть в ходе
  • запросы Send X одновременно (только отправка Dns пакетов запросов с использованием ПДПА. Возможно, с добавлением задержки между посылает, чтобы избежать очередей)
  • различного ожидания потока для ответов
  • Когда ответ приходит сравнить его с запрос (по адресу) и разрешить новый запрос запустить
  • Если ответ на запрос не поступает в течение TIMEOUT секунд пометить его как завершенное и разрешить новый запрос запустить

Мои основные вопросы здесь:

  • Как реализовать тайм-аут задачи легко
  • можно ли реализовать без использования синхронизации потоков (например, используя цикл событий?)
  • Есть ли какая-либо существующая библиотека, которая может помочь ее реализовать (мне кажется, что я пытаюсь изобрести колесо здесь, я заглянул в модуль asycnio, но не мог понять, как воспользоваться это для моей проблемы). Обратите внимание: я не хочу использовать существующие dns или сетевые библиотеки, поскольку мне нужна гибкость изменения основных функций (например, использование сырых сокетов, изменение полей заголовка DNS и т. Д.).

ответ

0

Вы попробовали aiodns? https://pypi.python.org/pypi/aiodns/

Для тайм-аутов asyncio имеет стандарт wait_for сопрограммы (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for).

+0

Мне нужно использовать другой сервер имен для каждого запроса, и, похоже, для него не разработаны aiodns. – roee88

+0

Вы можете создать экземпляр 'DNSResolver' для сервера имен и указать нужный DNS-ip через параметр' nameservers'. Instantiatig 'DNSResolver' - относительная легкая задача. –

0

Использование простого цикла выбора хорошо работает здесь. Ниже приводится фрагмент кода для завершения:

def run(self, resolvers_iter): 
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) 
    sock.setblocking(False) 

    try: 
     pending = [] 

     # For rate limiting 
     limit = float(self.timeout)/self.max_pending # delay between sends 
     last_sent = clock() - limit 

     # Work as long as there are more resolvers to query 
     completed_sending = False 
     while not completed_sending or pending: 

      # Can I send more requests 
      want_to_write = False 
      if not completed_sending and len(pending) < self.max_pending: 
       want_to_write = True 

      # Calculate nearest timeout time to make sure select returns on time 
      timeout = None 
      if pending: 
       timeout = self.timeout - clock() + pending[0][0] + 0.001 
       timeout = max(timeout, 0) 

      # Rate limit 
      time_passed_since_send = clock() - last_sent 
      if want_to_write and time_passed_since_send + 0.001 < limit: 
       timeout = min(timeout, limit-time_passed_since_send) 
       timeout = max(timeout, 0) 
       want_to_write = False 

      # Poll socket - uses internally the select module 
      readable, writable = self._select(readable=True, writable=want_to_write, timeout=timeout) 

      # Can read 
      if readable: 
       # Read as many as possible 
       while True: 
        try: 
         # Get response 
         response, from_address = DnsFacilities.read_response(sock) 

         # Check if not duplicate or already timed out 
         sent_time = None 
         for i, (t, ip) in enumerate(pending): 
          if ip == from_address[0]: 
           sent_time = t 
           del pending[i] 
           break 

         if sent_time is not None: 
          self.response_received((response, from_address, clock()-sent_time)) 

        except socket.error, e: 
         if e[0] in (socket.errno.EWOULDBLOCK, socket.errno.EAGAIN): 
          break 
         elif e[0] in (socket.errno.WSAECONNRESET, socket.errno.WSAENETRESET): 
          pass 
         else: 
          raise 

      # Can write 
      if writable: 
       try: 
        last_sent = clock() 
        resolver_address = resolvers_iter.next() 
        DnsFacilities.send_query(resolver_address) 
        pending.append((clock(), resolver_address) 
       except StopIteration: 
        completed_sending = True 

      # Check for timed out tasks 
      now = clock() 
      while pending and now - pending[0][0] > self.timeout: 
       self.response_timeout(pending[0][1]) 
       del pending[0] 

    finally: 
     sock.close() 
Смежные вопросы