2015-09-18 2 views
1

Мне часто приходится сортировать коллекцию файлов, содержащих заголовки. Поскольку сортировка зависит от содержимого заголовка, этот пример использования более сложный, чем аналогичные вопросы (например, Is there a way to ignore header lines in a UNIX sort?).потоковая передача данных в команду с подпроцессом.Popen

Я надеялся использовать Python для чтения файлов, вывода заголовка первого файла, а затем передать хвосты в сортировку. Я попытался это как доказательство концепции:

#!/usr/bin/env python 

import io 
import subprocess 
import sys 

header_printed = False 

sorter = subprocess.Popen(['sort'], stdin=subprocess.PIPE) 

for f in sys.argv[1:]: 
    fd = io.open(f,'r') 
    line = fd.readline() 
    if not header_printed: 
     print(line) 
     header_printed = True 
    sorter.communicate(line) 

Когда называется header-sort fileA fileB с FiLea и FILEB, содержащие линии, как

c float int 
Y 0.557946  413 
F 0.501935  852 
F 0.768102  709 

я получаю:

# sort file 1 
Traceback (most recent call last): 
    File "./archive/bin/pipetest", line 17, in <module> 
    sorter.communicate(line) 
    File "/usr/lib/python2.7/subprocess.py", line 785, in communicate 
    self.stdin.write(input) 
ValueError: I/O operation on closed file 

Проблема заключается в том связь берет строку, и труба закрывается после записи. Это означает, что содержимое должно быть полностью прочитано в памяти. связь не принимает генератор (я пробовал).

Еще более простой демонстрацией этого является:

>>> import subprocess 
>>> p = subprocess.Popen(['tr', 'a-z', 'A-Z'], stdin=subprocess.PIPE) 
>>> p.communicate('hello') 
HELLO(None, None) 
>>> p.communicate('world') 
Traceback (most recent call last): 
    File "<ipython-input-14-d6873fd0f66a>", line 1, in <module> 
    p.communicate('world') 
    File "/usr/lib/python2.7/subprocess.py", line 785, in communicate 
    self.stdin.write(input) 
ValueError: I/O operation on closed file 

Итак, вопрос в том, что правильный путь (с Popen или иным образом) для потоковой передачи данных в трубу в Python?

+0

связанные: [Сортировка текстового файла с помощью Python] (http://stackoverflow.com/q/14465154/4279) – jfs

ответ

1

Просто напишите в трубу непосредственно:

#!/usr/bin/env python2 
import fileinput 
import subprocess 

process = subprocess.Popen(['sort'], stdin=subprocess.PIPE) 
with process.stdin as pipe, fileinput.FileInput() as file: 
    for line in file: 
     if file.isfirstline(): # print header 
      print line, 
     else: # pipe tails 
      pipe.write(line) 
process.wait() 
+0

Я начал схожий путь (ваш гораздо более элегантный), но затем я увидел это предостережение в документах: «Предупреждение Используйте связь(), а не .stdin.write, .stdout.read или .stderr.read, чтобы избежать взаимоблокировки из-за того, что любой из других буферов операционной системы заполняет и блокирует дочерний процесс ». Я понимаю тупиковый потенциал, когда сценарий записывает на подпроцесс «stdin» и читает его со своего stdout, но я не понимаю потенциал взаимоблокировки, когда сценарий и подпроцесс передаются в stdout (как в вашем ответе). Комментарии? – Reece

+0

@Reece: здесь оно не применяется. Правило, чтобы избежать тупика в общем случае, прост: никогда не используйте PIPE, если вы не потребляете соответствующую трубку. – jfs

+0

Это была моя интерпретация. Я ценю последующие действия. – Reece

0

Вы можете использовать запись/чтение от stdin и stdout, однако в зависимости от вашего подпроцесса вам нужен «промывочный механизм» для подпроцесса для обработки вашего ввода. Нижеприведенный код работает для первой части, но поскольку он закрывает stdin, он также убивает подпроцесс. Если вы измените его на flush(), или если вы можете добавить некоторые завершающие символы для подталкивания вашего подпроцесса, вы можете его использовать. Еще бы, я бы рекомендовал взглянуть на Multithreading in Python, особенно pipes.

p=subprocess.Popen(['tr','a-z','A-Z'],stdin=subprocess.PIPE,stdout=subprocess.PIPE) 
p.stdin.write("hello\n") 
p.stdin.close() 
p.stdout.readline() 
'HELLO\n' 
+0

Это действительно не безопасно либо ' flush' или 'close'; если вы отправляете достаточно данных в подпроцесс, который заполняет его собственный выходной канал, он будет блокироваться. Если вы заполните свой входной канал, вы заблокируете его. И потому что он ждет вас, чтобы читать, и вы ждете его, чтобы прочитать, вы зашли в тупик и никогда не доходите до 'readline'. Кроме того, если вы 'flush' вместо' close', подпроцесс может быть блочным буферизацией собственного вывода, поэтому 'readline' может блокироваться навсегда (и вы никогда не вернетесь из' readline', чтобы отправить больше данных, чем может быть это для сброса его буфера). – ShadowRanger

2

Для вашего конкретного случая, если вы только прошли subprocess.PIPE для одной стандартной ручки (в вашем случае, stdin), то в вашем примере, вы можете смело называть sorter.stdin.write(line) снова и снова. Когда вы закончите запись вывода, вызовите sorter.stdin.close(), так что sort знает, что вход завершен, и он может выполнять фактические операции сортировки и вывода (sorter.communicate() без аргументов, вероятно, тоже будет работать, в противном случае после закрытия stdin вы, вероятно, захотите позвонить sorter.wait(), чтобы закончить).

Если вам нужно иметь дело с более чем одной стандартной рукояткой, подходящей по порядку, то есть правильный путь: threading с выделенной нитью для каждой трубы, которая должна обрабатываться за пределами первой (относительно простая в концепции, но в супертяжелом и вводит все головные боли потоковой передачи) или с использованием модуля select (или в Python 3.4+, selectors), что довольно сложно получить, но может (при некоторых обстоятельствах) быть более эффективным. Наконец, есть creating temporary files for output, поэтому вы можете напрямую писать в процесс stdin, пока процесс записывается в файл (и поэтому не будет блокироваться); вы можете прочитать файл в свободное время (обратите внимание, что подпроцесс не обязательно будет очищать свои собственные выходные буферы до тех пор, пока он не выйдет, поэтому выход может не прибыть незамедлительно в ответ на ваш вход до тех пор, пока дополнительные входы и выходы не будут заполнены и не будут сброшены буфер).

subprocess.Popen «s .communicate() метода использует либо тему или select сам модуль примитивов (в зависимости от поддержки ОС, реализация находится под various _communicate methods here) всякий раз, когда вы передаете subprocess.PIPE более чем одной из стандартных ручек; это то, как вы должны это делать.

+0

есть одна труба (подпроцесс 'stdin). Зачем вам здесь несколько потоков? – jfs

+0

Да, моя ошибка. Я предположил, что это один из тех, «я пытаюсь сделать то, что« общаюсь », не используя случаи« общаться », и ответил. Я отредактировал, чтобы объяснить, как это работает для конкретного случая только с одним стандартным дескриптором «PIPE». – ShadowRanger

Смежные вопросы