2017-01-23 3 views
0

У меня есть многолетняя вычислительная модель, которую я хочу контролировать, передавать данные и читать данные с использованием STDIN и STDOUT. Внутри этого внешнего кода есть контур обратной связи управления, который нуждается в новых данных от STDIN каждые 100 мс или около того.пишите в stdin и читайте из stdout по длительному дочернему процессу в python

по этой причине subprocess.communicate() не подходит, так как он ждет завершения процесса. Оценочная продолжительность выполнения процесса составляет порядка нескольких недель.

Код ниже не работает, потому что управление зависает на stdout.read() и никогда не возвращается.

Каков правильный способ разговора по STDIN и STDOUT?

import subprocess as sb 


class Foo: 
    def process_output(self,values: str) ->(): 
     """ gets 7 comma separated floats back from ADC.elf and returns them as a tuple of two vectors """ 
     floats = [float(f) for f in values.split(',') if values and f] 
     # if len(floats) == 7: 
     mag = (floats[0], floats[1], floats[2]) 
     quat = (floats[3], floats[4], floats[5], floats[6]) 
     return (mag, quat) 

    def format_input(self,invals: {}) -> bytes: 
     """ takes a dict of arrays and stuffs them into a comma-separated bytestring to send to ADC.elf with a trailing newline""" 
     concat = lambda s, f: ''.join([f % x for x in s]) 
     retval = '' 
     retval += concat(invals['mag_meas'], '%3.2f,') 
     retval += concat(invals['euler_angle'], '%3.2f,') 
     retval += concat(invals['sun_meas'], '%3.2f,') 
     retval += concat(invals['epoch'], '%02.0f,') 
     retval += concat(invals['lla'], '%3.2f,') 
     retval += concat([invals['s_flag']], '%1.0f,') 
     retval = retval[:-1] 
     retval += '\n' 
     return retval.encode('utf-8') 

    def page(self,input: {}) ->(): 
     """ send a bytestring with 19 floats to ADC.elf. Process the returned 7 floats into a data struture""" 
     formatted = self.format_input(input) 
     self.pid.stdin.write(formatted) 
     response = self.pid.stdout.read() 

     return self.process_output(response.decode()) 

    def __init__(self): 
     """ start the long-running process ADC.elf that waits for input and performs some scientific computation""" 
     self.pid = sb.Popen(args=['./ADC.elf'], stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE) 

    def exit(self): 
     """ send SIGTERM to ADC.elf""" 
     self.pid.terminate() 



if __name__ == "__main__": 
    # some dummy data 
    testData = {'mag_meas': [1, 2, 3], 
       'euler_angle': [4, 5, 6], 
       'sun_meas': [7, 8, 9], 
       'epoch': [0, 1, 2, 3, 4, 5], 
       'lla': [6, 7, 8], 
       's_flag': 9 
       } 
    #initialize 
    runner = Foo() 
    # send and receive once. 
    result = runner.page(testData) 
    print(result) 
    #clean up 
    runner.exit() 
+0

[ 'общаться()'] (https://github.com/python/cpython /blob/eb70a87363193a22a2ee36a44efd8372f97efeae/Lib/subprocess.py#L1051) сам использует потоки. ['select'] (https://docs.python.org/2/library/select.html) также является опцией. Как и неблокирующий IO. – dhke

+0

@dhke это правда, но communication() не вернется, пока процесс не завершится; Я хочу многократно запрашивать его во время его однократного выполнения. – gvoysey

+0

Следовательно, мое предложение: посмотрите, как общается и адаптируется к вашим потребностям. 'communication()' явно ждет поток читателя. Бросьте это, и у вас есть непрерывный опрос. – dhke

ответ

0

Понятия не имею, как это сделать с подпроцесс напрямую, но pexpect сделал именно то, что нужно:

import pexpect, os 
from time import sleep 

class Foo: 
    def process_output(self,values: str) ->(): 
     floats = [float(f) for f in values.split(',') if values and f] 
     # if len(floats) == 7: 
     mag = (floats[0], floats[1], floats[2]) 
     quat = (floats[3], floats[4], floats[5], floats[6]) 
     return (mag, quat) 

    def format_input(self,invals: {}) -> bytes: 
     concat = lambda s, f: ''.join([f % x for x in s]) 
     retval = '' 
     retval += concat(invals['mag_meas'], '%3.2f,') 
     retval += concat(invals['euler_angle'], '%3.2f,') 
     retval += concat(invals['sun_meas'], '%3.2f,') 
     retval += concat(invals['epoch'], '%02.0f,') 
     retval += concat(invals['lla'], '%3.2f,') 
     retval += concat([invals['s_flag']], '%1.0f,') 
     retval = retval[:-1] 
     retval += '\n' 
     return retval.encode('utf-8') 

    def page(self,input: {}) ->(): 
     formatted = self.format_input(input) 
     self.pid.write(formatted) 
     response = self.pid.readline() 

     return self.process_output(response.decode()) 

    def __init__(self): 

     self.pid = pexpect.spawn('./ADC.elf') 
     self.pid.setecho(False) 

    def exit(self): 
     self.pid.terminate() 



if __name__ == "__main__": 
    testData = {'mag_meas': [1, 2, 3], 
       'euler_angle': [4, 5, 6], 
       'sun_meas': [7, 8, 9], 
       'epoch': [0, 1, 2, 3, 4, 5], 
       'lla': [6, 7, 8], 
       's_flag': 9 
       } 
    runner = Foo() 
    i = 0 
    while i < 100: 
     result = runner.page(testData) 
     print(result) 
     i += 1 
     sleep(.1) 



    runner.exit() 
Смежные вопросы