2016-03-23 1 views
1

У меня возникает странная проблема: я запускаю большое количество команд utils.getProcessOutputAndValue('cmd', [args]), и результат зависит от того, начал ли я запуск реактора с использованием task.react() или reactor.run()OSError: [Errno 24] Слишком много открытых файлов при использовании реактора.run() в Twisted

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

from progress.bar import IncrementalBar 
from twisted.internet import defer 
from twisted.internet import task 
from twisted.internet import utils 
from twisted.python import usage 


class Options(usage.Options): 
    optFlags = [['reactor', 'r', 'Use reactor.run().'], 
       ['task', 't', 'Use task.react().'], 
       ['cwr', 'w', 'Use callWhenRunning().']] 
    optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'], 
        ['cmd', 'c', 'echo Testing {i}...', 'Command to run.']] 


def run(opt): 
    limit = int(opt['limit']) 
    cmd, args = opt['cmd'].split(' ', 1) 
    bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit) 
    requests = [] 
    for i in range(0, limit): 
     try: 
      _args = args.format(i=i) 
      args = _args 
     except KeyError: 
      pass 
     requests.append(utils.getProcessOutputAndValue('echo', [args])) 
     bar.next() 
    bar.finish() 
    return defer.gatherResults(requests) 


@defer.inlineCallbacks 
def main(reactor, opt): 
    d = defer.Deferred() 
    limit = int(opt['limit']) 
    cmd, args = opt['cmd'].split(' ', 1) 
    bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit) 
    for i in range(0, limit): 
     try: 
      _args = args.format(i=i) 
      args = _args 
     except KeyError: 
      pass 
     yield utils.getProcessOutputAndValue('echo', [args]) 
     bar.next() 
    bar.finish() 
    defer.returnValue(d.callback(True)) 


if __name__ == '__main__': 
    opt = Options() 
    opt.parseOptions() 

    if opt['reactor']: 
     from twisted.internet import reactor 
     task.deferLater(reactor, 0, run, opt) 
     reactor.run() 

    elif opt['task']: 
     from twisted.internet.task import react 
     react(main, [opt]) 

    elif opt['cwr']: 
     from twisted.internet import reactor 
     reactor.callWhenRunning(run, opt) 
     reactor.run() 

при использовании limit выше 400 (в моем случае) я получаю следующее сообщение об ошибке:

Upon execvpe echo ['echo', 'Testing 0...'] in environment id 42131264 
:Traceback (most recent call last): 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 428, in _fork 
    self._setupChild(**kwargs) 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 803, in _setupChild 
    for fd in _listOpenFDs(): 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 638, in _listOpenFDs 
    return detector._listOpenFDs() 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 553, in _listOpenFDs 
    self._listOpenFDs = self._getImplementation() 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 576, in _getImplementation 
    after = impl() 
    File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 606, in _procFDImplementation 
    return [int(fd) for fd in self.listdir(dname)] 
OSError: [Errno 24] Too many open files: '/proc/23421/fd' 
Unhandled error in Deferred: 

Какие не происходит если я U петь task.react()

В резюме:

  • python pyerr.py -l100 -r: OK
  • python pyerr.py -l100 -t: OK
  • python pyerr.py -l100 -w: OK
  • python pyerr.py -l400 -r: OSERR
  • python pyerr.py -l400 -t: OK
  • python pyerr.py -l400 -w: OSERR

Проблема заключается в том, что у меня есть большое приложение, которое использует реактор, так как его применение в ответ на SMTP-соединений (поэтому не может использовать task.react, I не хотят останавливать реактор).

Я всегда думал, что task.react лишь остановки реактора после того, как отложенный делается, но я думаю, делает больше, чем это ...


редактировать: Здесь pstree comparaison для task.react против reactor.run

reactor.run (Python pyerr.py -l400 -r):

init-+-VBoxService---7*[{VBoxService}] 
    |-acpid 
    |-atd 
    |-cron 
    |-dbus-daemon 
    |-dhclient 
    |-6*[getty] 
    |-master-+-pickup 
    |  `-qmgr 
    |-mysqld---18*[{mysqld}] 
    |-nginx---4*[nginx] 
    |-php5-fpm---2*[php5-fpm] 
    |-puppet---{puppet} 
    |-rpc.idmapd 
    |-rpc.statd 
    |-rpcbind 
    |-rsyslogd---3*[{rsyslogd}] 
    |-ruby---{ruby} 
    |-sshd-+-3*[sshd---sshd---sftp-server] 
    |  |-sshd---sshd---2*[sftp-server] 
    |  |-sshd---sshd---bash---pstree 
    |  `-sshd---sshd---bash---python-+-323*[echo] 
    |         `-5*[python] 
    |-systemd-logind 
    |-systemd-udevd 
    |-upstart-file-br 
    |-upstart-socket- 
    `-upstart-udev-br 

task.react (Python pyerr.py -l400 -t):

init-+-VBoxService---7*[{VBoxService}] 
    |-acpid 
    |-atd 
    |-cron 
    |-dbus-daemon 
    |-dhclient 
    |-6*[getty] 
    |-master-+-pickup 
    |  `-qmgr 
    |-mysqld---18*[{mysqld}] 
    |-nginx---4*[nginx] 
    |-php5-fpm---2*[php5-fpm] 
    |-puppet---{puppet} 
    |-rpc.idmapd 
    |-rpc.statd 
    |-rpcbind 
    |-rsyslogd---3*[{rsyslogd}] 
    |-ruby---{ruby} 
    |-sshd-+-3*[sshd---sshd---sftp-server] 
    |  |-sshd---sshd---2*[sftp-server] 
    |  |-sshd---sshd---bash---pstree 
    |  `-sshd---sshd---bash---python---echo 
    |-systemd-logind 
    |-systemd-udevd 
    |-upstart-file-br 
    |-upstart-socket- 
    `-upstart-udev-br 

Обратите внимание на разницу между этим

|  `-sshd---sshd---bash---python-+-323*[echo] 
|         `-5*[python] 

и этот

|  `-sshd---sshd---bash---python---echo 

в одном КАС кажется, что процессы не закрываются, как только они завершаются.

Я испытал эту проблему на 4 разных машинах:

  • Ubuntu 14.04
  • CentOS 6
  • Centos 7

Вопрос заключается в точности то же самое.

Чтобы сделать снимок, попробуйте запустить watch -n 0.1 "pstree", чтобы узнать, как развиваются процессы.


редактировать: Я получаю это, почему это происходит благодаря Glyph ответ, но как адаптировать это к моей реальной жизни дела?

Приложение Я разрабатываю с Twisted является SMTP фильтр на основе Milter, вот как это работает (предположим, что мы хотим, чтобы проверить подпись электронной почты):

  • соединение открывает порт 25
  • Milter протокол получить все электронные письма детали
  • Milter вызывает удаленный сервер «модуль», который будет обрабатывать проверку подписи с /usr/bin/openssl mime вызовом
  • модуль будет возвращать ответ, указывающим, является ли действительной подписью

В этом случае моя проблема заключается в том, что я получаю 150 одновременных соединений, будет 150 вызовов модуля (TCP-протокол), и этот модуль будет вызывать команду openssl один раз для каждого соединения.

Модуль полностью агностик, поэтому не будет знать, работают ли другие вызовы. Где, по вашему мнению, поставить DeferredSemaphore?

Моя проблема в том, что соединения smtp также являются агностиками и не знают о других возможных открытиях соединений.

Каков правильный способ обращения с этой параллелизмом, на ваш взгляд?

+0

это * есть * все 'задача.react'. Вставив ваш пример на сервер Linux, я смог успешно запустить ваш пример для всех значений, которые вы предоставили, поэтому я не уверен, что происходит не так. Возможно, проблема с локальной конфигурацией? – Glyph

+0

Я запустил два примера с помощью 'pstree', и при использовании переключателя' -r' я увидел подпроцесс 300 python, я вставлю вам подробный образец утром. У меня создалось впечатление, что процессы зависают после завершения и вызывают ошибку limit/proc – kitensei

+0

. Я добавил график 'pstree' и немного детализировал мои тесты, этот вопрос был протестирован на 4 разных машинах с 3 различными ОС. Обратите внимание, что я скорректировал результаты: это ** - r ** переключатель 'python pyerr.py -l400 -r', который терпит неудачу, а не ** - t ** – kitensei

ответ

2

Проблема здесь не имеет ничего общего с различием между task.react и reactor.run, а скорее, тонкое, но существенное различие между реализацией ваших run и main функций.

Разница заключается в том, что run является нерест limit процессов параллельно, мучая тысячи одновременно открытых дескрипторов файлов, легко дует через ограничение вашей системы. Тем не менее, main ожидает, что каждый процесс полностью завершит выполнение, прежде чем запускать следующий, что означает, что он никогда не использует больше 4 или 5 за раз.

Причина заключается в том, что main украшен inlineCallbacks и дает каждые getProcessOutputAndValueDeferred, который приостанавливает выполнение main до этого Deferred завершен.

В реальных приложениях ни один из этих подходов не идеален. Вы хотите некоторый параллелизм, но не неограниченный. Twisted поставляется с некоторыми утилитами, такими как DeferredSemaphore, чтобы облегчить ограниченный параллелизм, не ограничивая все, чтобы запускать только одну задачу за раз. Жан-Поль Кальдерон написал статью - 10 лет назад! - это объясняет, как использовать это, here.

Однако, как раз, чтобы продемонстрировать, что этот вопрос не имеет ничего общего с task.react, вот модифицированной версией вашего примера, который устраняет функцию run и делает сравнение яблок с яблоками с помощью main:

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

from progress.bar import IncrementalBar 
from twisted.internet import defer 
from twisted.internet import task 
from twisted.internet import utils 
from twisted.python import usage 


class Options(usage.Options): 
    optFlags = [['reactor', 'r', 'Use reactor.run().'], 
       ['task', 't', 'Use task.react().'], 
       ['cwr', 'w', 'Use callWhenRunning().']] 
    optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'], 
        ['cmd', 'c', 'echo Testing {i}...', 'Command to run.']] 


@defer.inlineCallbacks 
def main(reactor, opt): 
    d = defer.Deferred() 
    limit = int(opt['limit']) 
    cmd, args = opt['cmd'].split(' ', 1) 
    bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit) 
    for i in range(0, limit): 
     try: 
      _args = args.format(i=i) 
      args = _args 
     except KeyError: 
      pass 
     yield utils.getProcessOutputAndValue('echo', [args]) 
     bar.next() 
    bar.finish() 
    defer.returnValue(d.callback(True)) 


if __name__ == '__main__': 
    opt = Options() 
    opt.parseOptions() 

    if opt['reactor']: 
     from twisted.internet import reactor 
     task.deferLater(reactor, 0, main, reactor, opt) 
     reactor.run() 

    elif opt['task']: 
     from twisted.internet.task import react 
     react(main, [opt]) 

    elif opt['cwr']: 
     from twisted.internet import reactor 
     reactor.callWhenRunning(main, reactor, opt) 
     reactor.run() 

редактировать, реагируя на изменения в вопросе:

с вашей реальной проблемы с входящими соединениями, а не только for петли, вместо использования DeferredSemaphore вам может понадобиться поддерживать счетчик и воспользоваться тем фактом, что объект, возвращенный с listenTCP, или результат Deferred, который возвращается с TCP4ServerEndpoint, реализует IPushProducer и вызывает на нем pauseProducing(), когда слишком много параллельные соединения делают работу, и resumeProducing() когда эта работа делается.

+0

Спасибо, я полностью понял это, чем больше я копаю, тем больше люблю Твист;). Не могли бы вы проверить мое последнее изменение, чтобы посмотреть, как я должен действовать в моем реальном случае? – kitensei

+0

Добавлено редактирование для вас. – Glyph

+0

Отличный ответ, спасибо за ваше время;) – kitensei

Смежные вопросы