У меня есть многолетняя вычислительная модель, которую я хочу контролировать, передавать данные и читать данные с использованием STDIN
и STDOUT
. Внутри этого внешнего кода есть контур обратной связи управления, который нуждается в новых данных от STDIN
каждые 100 мс или около того.пишите в stdin и читайте из stdout по длительному дочернему процессу в python
по этой причине subprocess.communicate()
не подходит, так как он ждет завершения процесса. Оценочная продолжительность выполнения процесса составляет порядка нескольких недель.
Код ниже не работает, потому что управление зависает на stdout.read()
и никогда не возвращается.
Каков правильный способ разговора по STDIN и STDOUT?
import subprocess as sb
class Foo:
def process_output(self,values: str) ->():
""" gets 7 comma separated floats back from ADC.elf and returns them as a tuple of two vectors """
floats = [float(f) for f in values.split(',') if values and f]
# if len(floats) == 7:
mag = (floats[0], floats[1], floats[2])
quat = (floats[3], floats[4], floats[5], floats[6])
return (mag, quat)
def format_input(self,invals: {}) -> bytes:
""" takes a dict of arrays and stuffs them into a comma-separated bytestring to send to ADC.elf with a trailing newline"""
concat = lambda s, f: ''.join([f % x for x in s])
retval = ''
retval += concat(invals['mag_meas'], '%3.2f,')
retval += concat(invals['euler_angle'], '%3.2f,')
retval += concat(invals['sun_meas'], '%3.2f,')
retval += concat(invals['epoch'], '%02.0f,')
retval += concat(invals['lla'], '%3.2f,')
retval += concat([invals['s_flag']], '%1.0f,')
retval = retval[:-1]
retval += '\n'
return retval.encode('utf-8')
def page(self,input: {}) ->():
""" send a bytestring with 19 floats to ADC.elf. Process the returned 7 floats into a data struture"""
formatted = self.format_input(input)
self.pid.stdin.write(formatted)
response = self.pid.stdout.read()
return self.process_output(response.decode())
def __init__(self):
""" start the long-running process ADC.elf that waits for input and performs some scientific computation"""
self.pid = sb.Popen(args=['./ADC.elf'], stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE)
def exit(self):
""" send SIGTERM to ADC.elf"""
self.pid.terminate()
if __name__ == "__main__":
# some dummy data
testData = {'mag_meas': [1, 2, 3],
'euler_angle': [4, 5, 6],
'sun_meas': [7, 8, 9],
'epoch': [0, 1, 2, 3, 4, 5],
'lla': [6, 7, 8],
's_flag': 9
}
#initialize
runner = Foo()
# send and receive once.
result = runner.page(testData)
print(result)
#clean up
runner.exit()
[ 'общаться()'] (https://github.com/python/cpython /blob/eb70a87363193a22a2ee36a44efd8372f97efeae/Lib/subprocess.py#L1051) сам использует потоки. ['select'] (https://docs.python.org/2/library/select.html) также является опцией. Как и неблокирующий IO. – dhke
@dhke это правда, но communication() не вернется, пока процесс не завершится; Я хочу многократно запрашивать его во время его однократного выполнения. – gvoysey
Следовательно, мое предложение: посмотрите, как общается и адаптируется к вашим потребностям. 'communication()' явно ждет поток читателя. Бросьте это, и у вас есть непрерывный опрос. – dhke