Это не должно быть слишком сложно с ZeroMQ. Вот что я буду делать.
Запишите HTTP-сервер торнадо в облаке в сочетании с двумя портами для сокетов с нулевым разрешением MQPUB и SUB. Сервер предоставляет конечные точки REST для пользователей. На малиновом пироге подключите к облачному серверу два сокета с нулевой суммой PUB SUB.
Когда вы получаете запрос на сервер облака, отправьте команду на сокет PUB. Установите обратный вызов полученных данных через сокет NOMQ SUB. На малиновом пироге, когда вы получаете данные, запустите свой код датчика. Если есть ответ, отправьте его через канал PUB с малиновым пирогом. На сервере обратный вызов будет отвечать пользователю.
Ниже приведены фрагменты кода для сервера (облака) и клиента (малиновый пирог) для этого.
На сервере облака запуска.
#!/usr/bin/env python
import json
import tornado
import tornado.web
import zmq
from tornado import httpserver
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
tornado.ioloop = ioloop
import sys
def ping_remote():
"""callback to keep the connection with remote server alive while we wait
Network routers between raspberry pie and cloud server will close the socket
if there is no data exchanged for long time.
"""
pub_inst.send_json_data(msg="Ping", req_id="##")
sys.stdout.write('.')
sys.stdout.flush()
pending_requests = {}
class ZMQSub(object):
def __init__(self, callback):
self.callback = callback
context = zmq.Context()
socket = context.socket(zmq.SUB)
# socket.connect('tcp://127.0.0.1:5559')
socket.bind('tcp://*:8081')
self.stream = ZMQStream(socket)
self.stream.on_recv(self.callback)
socket.setsockopt(zmq.SUBSCRIBE, "")
def shutdown_zmq_sub(self):
self.stream.close()
class ZMQPub(object):
def __init__(self):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://*:8082')
self.publish_stream = ZMQStream(socket)
def send_json_data(self, msg, req_id):
topic = str(req_id)
self.publish_stream.send_multipart([topic, msg])
def shutdown_zmq_sub(self):
self.publish_stream.close()
def SensorCb(msg):
# decode message from raspberry pie and the channel ID.
key, msg = (i for i in msg)
if not key == "##":
msg = json.loads(msg)
if key in pending_requests.keys():
req_inst = pending_requests[key]
req_inst.write(msg)
req_inst.finish()
del pending_requests[key]
else:
print "no such request"
print pending_requests
else:
print "received ping"
class Handler(tornado.web.RequestHandler):
def __init__(self, *args, **kwargs):
super(Handler, self).__init__(*args, **kwargs)
# get the unique req id
self.req_id = str(self.application.req_id) + "#"
self.application.req_id += 1
# set headers
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS, PUT')
@tornado.web.asynchronous
def get(self):
print self.request
if self.req_id not in pending_requests.keys():
pending_requests[self.req_id] = self
else:
print "WTF"
pub_inst.send_json_data(msg=json.dumps({"op": "ServiceCall"}), req_id=self.req_id)
if __name__ == "__main__":
pub_inst = ZMQPub()
sub_inst = ZMQSub(callback=SensorCb)
application = tornado.web.Application(
[(r'/get_sensor_data', Handler), (r'/(.*)')])
application.req_id = 0
server = httpserver.HTTPServer(application,)
port = 8080
server.listen(port)
print "Sensor server ready on port: ", port
ping = ioloop.PeriodicCallback(ping_remote, 3000)
ping.start()
tornado.ioloop.IOLoop.instance().start()
На малиновый пирог запустите следующий клиент.
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
import sys
ioloop.install()
def ping_remote():
"""keep pinging the server so that the connection between server and client is not lost"""
pub_inst.send_json_data(msg="Ping", req_id="##")
sys.stdout.write('.')
sys.stdout.flush()
pending_requests = {}
class ZMQSub(object):
def __init__(self, callback):
self.callback = callback
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:8082') # replace by cloud server's IP address
self.stream = ZMQStream(socket)
self.stream.on_recv(self.callback)
socket.setsockopt(zmq.SUBSCRIBE, "")
def shutdown_zmq_sub(self):
self.stream.close()
class ZMQPub(object):
def __init__(self):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect('tcp://127.0.0.1:8081') # replace by cloud server's IP address
self.publish_stream = ZMQStream(socket)
def send_json_data(self, msg, req_id):
topic = str(req_id)
self.publish_stream.send_multipart([topic, msg])
def shutdown_zmq_sub(self):
self.publish_stream.close()
def SensorCb(msg):
# decode image and the channel ID.
key, msg = (i for i in msg)
if not key == "##":
print key, msg
#####
# Do your sensor specific work here. And built a reply for remote server.
#####
resp = "response from raspi"
pub_inst.send_json_data(msg=resp, req_id=key)
else:
print "received ping"
if __name__ == "__main__":
pub_inst = ZMQPub()
sub_inst = ZMQSub(callback=SensorCb)
ioloop.PeriodicCallback(ping_remote, 3000).start()
ioloop.IOLoop.instance().start()
Мнение: торнадо может легко выполнять то, что вам нужно. Данные могут быть переданы в URL-адресе или POST-данных, а возвратом могут быть произвольные данные (например, обычно json-словари). – mdurant
Да, я согласен с вами. но связь должна быть очень двунаправленной, чтобы сервер также выдавал команды датчиков. Похоже, что Tornado не был предназначен для истинного - всегда на двунаправленных трубопроводах. –
Я бы сказал, что все, что вам нужно для двунаправленности, в этом случае - это регулярный опрос конечной точки get-command. – mdurant