2013-07-16 5 views
2

В настоящее время у меня есть следующий код, вдохновленный ответом на Non-blocking read on a subprocess.PIPE in python. Кажется, что он работает корректно, выводя строки на экран, однако он делает это только для первого созданного процесса, все остальные процессы (которые запущены) не получают никаких данных.Неблокируемое чтение из нескольких подпроцессов (Python)

Как я могу убедиться, что я могу читать данные (неблокирующим способом) из нескольких подпроцессов?

#!/usr/bin/env python 
import sys 
import os 
import subprocess 
from threading import Thread 
from Queue import Queue, Empty 

STREAMER_URL = 'rtmp://127.0.0.1/app' 
RTMPDUMP_EXECUTEABLE = 'rtmpdump' 

def enqueue_output(out, queue): 
    for line in iter(lambda: out.read(16), b''): 
     queue.put(line) 
    out.close() 

def download_rtmp(media, filename): 
    # Create parameters 
    args=[RTMPDUMP_EXECUTEABLE] 
    args.extend(['-r',media[0],'-y',media[1]]) 

    # Set output file 
    OUTPUT_FILE = filename 
    args.extend(['-o',OUTPUT_FILE]) 

    # Send rtmpdump any extra arguments 
    if len(sys.argv) > 2: 
    args.extend(sys.argv[2:]) 

    # Execute rtmpdump 
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 
    q = Queue() 
    t = Thread(target=enqueue_output, args=(p.stdout, q)) 
    t.daemon = True # thread dies with the program 
    t.start() 
    return (p, q, t) 

def main(): 

    # actual data is from somewhere else on the internet 
    for (name, playpath, filepath) in data: 
    print 'Spawning %s download...' % name 
    PROCESSES.append(download_rtmp((STREAMER_URL, playpath), filepath)) 

    BUFS = dict() 

    # infinite loop checking if all processes have finished 
    while True: 
    done = True 
    for (process, queue, thread) in PROCESSES: 
     try: 
     readdata = queue.get_nowait() 
     except Empty: 
     pass 
     else: 
     if process in BUFS: 
      readdata = BUFS[process] + readdata 
     lines = readdata.split('\n') 
     if len(lines) > 1: 
      for line in lines[:-1]: 
      print 'Line: %s' % line 
     if '\r' in lines[-1]: 
      lines = readdata.split('\r') 
      for line in lines[:-1]: 
      print 'Line2: %s' % line 
     BUFS[process] = lines[-1] 

     process.poll() 

     if process.returncode is None: 
     done = False 
     break 
    if done: 
     break 

    print "Done" 

if __name__ == "__main__": 
    main() 
+0

Тема не очень * тема * в питон ... http://wiki.python.org/moin/GlobalInterpreterLock –

+0

@notbad .jpeg: Threads - настоящие потоки ОС в python. GIL не имеет ничего общего с вопросом в вопросе. – jfs

ответ

1

Я не понял, все это, но перерыв в if process.returncode is None: означает, что вы не будете смотреть на других очередей процесса, пока первый процесс не выйдет полностью. И я не знаю, откуда у вас эта многопользовательская опросная вещь, но она абсолютно ужасна.

Эта проблема лучше всего решить с помощью одной очереди возврата, используемой всеми рабочими потоками. Рабочие передают кортежи (процесс, линия), а основной поток блокирует ожидание данных от всех работников.

Это псевдокод действительно, но это будет выглядеть так:

STREAMER_URL = 'rtmp://127.0.0.1/app' 
RTMPDUMP_EXECUTEABLE = 'rtmpdump' 

def enqueue_output(process, queue): 
    """read process stdout in small chunks and queue for processing""" 
    for line in iter(lambda: out.read(16), b''): 
     queue.put((process, line)) 
    process.wait() 
    queue.put((process, None)) 

def download_rtmp(media, filename): 
    # Create parameters 
    args=[RTMPDUMP_EXECUTEABLE, '-r', media[0], '-y', media[1], '-o', filename] 

    # Send rtmpdump any extra arguments 
    # if len(sys.argv) > 2: no need for the if in list comprehension 
    args.extend(sys.argv[2:]) 

    # Execute rtmpdump 
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 
    t = Thread(target=enqueue_output, args=(p, return_q)) 
    t.daemon = True # thread dies with the program 
    t.start() 
    return (p, t) 

def main(): 
    THREADS = [] 
    BUFS = dict() 

    # actual data is from somewhere else on the internet 
    for (name, playpath, filepath) in data: 
    print 'Spawning %s download...' % name 
    process, thread = download_rtmp((STREAMER_URL, playpath), filepath) 
    BUFS[process] = '' 
    THREADS.append(thread) 

    # all processes write to return_q and we process them here 
    while BUFS: 
    process, line = return_q.get() 
    readdata = BUFS[process] + (line or '') 
    if line is None: 
     del BUFS[process] 
    # I didn't try to figure this part out... basically, when line is 
    # None, process is removed from BUFS so you know your end condition 
    # and the following stuff should do its final processing. 
    lines = readdata.split('\n') 
    if len(lines) > 1: 
     for line in lines[:-1]: 
     print 'Line: %s' % line 
    if '\r' in lines[-1]: 
     lines = readdata.split('\r') 
     for line in lines[:-1]: 
     print 'Line2: %s' % line 
    if line is not None: 
     BUFS[process] = lines[-1] 
+0

Проблема в том, что я хочу данные перед завершением команды, т.е. Я пытаюсь разобрать и отобразить ход загрузки, поэтому блокировка с помощью wait() не помогает достичь этой цели. –

+1

@adam - если вы говорите об enqueue_output, то это не проблема. Цикл for работает до тех пор, пока субпроцесс stdout не будет закрыт при завершении процесса. Это ожидание просто очищает процесс зомби и извлекает код возврата. – tdelaney

+0

@adam - если вы говорите о return_q.get(), он просыпается каждый раз, когда рабочий поток отправляет данные. – tdelaney

Смежные вопросы