2015-02-15 3 views
0

Я пытаюсь создать сервер в Twisted, который позволит клиентам подключаться с использованием событий, отправленных сервером. Я хотел бы, чтобы этот сервер также прослушал Redis, и если появится сообщение, то нажмите его на подключенные SSE-клиенты.Закрученный SSE-сервер, подписанный на Redis через pubsub

У меня работает сервер SSE. Я знаю, как подписаться на Redis. Я не могу понять, как обе части работают, не блокируя друг друга.

Я знаю https://github.com/leporo/tornado-redis и https://github.com/fiorix/txredisapi, которые были рекомендованы в соответствующих вопросах. Не знаю, как это помогает:/

Как это решить? Не могли бы вы помочь с обоими: концептуальные советы и фрагменты кода?

код сервера My Twisted SSE:

# coding: utf-8 
from twisted.web import server, resource 
from twisted.internet import reactor 


class Subscribe(resource.Resource): 
    isLeaf = True 
    sse_conns = set() 

    def render_GET(self, request): 
     request.setHeader('Content-Type', 'text/event-stream; charset=utf-8') 
     request.write("") 
     self.add_conn(request) 
     return server.NOT_DONE_YET 

    def add_conn(self, conn): 
     self.sse_conns.add(conn) 
     finished = conn.notifyFinish() 
     finished.addBoth(self.rm_conn) 

    def rm_conn(self, conn): 
     self.sse_conns.remove(conn) 

    def broadcast(self, event): 
     for conn in self.sse_conns: 
      event_line = "data: {}'\r\n'".format(event) 
      conn.write(event_line + '\r\n') 


if __name__ == "__main__": 
    sub = Subscribe() 
    reactor.listenTCP(9000, server.Site(sub)) 
    reactor.run() 

Мои Redis подписаться Код:

import redis 


redis = redis.StrictRedis.from_url('redis://localhost:6379') 


class RedisSub(object): 
    def __init__(self): 
     self.pubsub = redis.pubsub() 
     self.pubsub.subscribe('foobar-channel') 

    def listen(self): 
     for item in self.pubsub.listen(): 
      print str(item) 

ответ

0

Это то, что работает для меня.

Я закончил использование txredis lib с небольшим изменением в RedisClient (добавлено минимальное количество подписчиков).

# coding: utf-8 
import os 
import sys 
import weakref 

from txredis.client import RedisClient 

from twisted.web import server, resource 
from twisted.internet import reactor, protocol, defer 
from twisted.python import log 

from utils import cors, redis_conf_from_url 


log.startLogging(sys.stdout) 

PORT = int(os.environ.get('PORT', 9000)) 
REDIS_CONF = redis_conf_from_url(os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379')) 
REDIS_SUB_CHANNEL = 'votes' 


class RedisBroadcaster(RedisClient): 
    def subscribe(self, *channels): 
     self._send('SUBSCRIBE', *channels) 

    def handleCompleteMultiBulkData(self, reply): 
     if reply[0] == u"message": 
      message = reply[1:][1] 
      self.sse_connector.broadcast(message) 
     else: 
      super(RedisClient, self).handleCompleteMultiBulkData(reply) 


@defer.inlineCallbacks 
def redis_sub(): 
    clientCreator = protocol.ClientCreator(reactor, RedisBroadcaster, password=REDIS_CONF.get('password')) 
    redis = yield clientCreator.connectTCP(REDIS_CONF['host'], REDIS_CONF['port']) 
    redis.subscribe(REDIS_SUB_CHANNEL) 


class Subscribe(resource.Resource): 
    isLeaf = True 
    sse_conns = weakref.WeakSet() 

    @cors 
    def render_GET(self, request): 
     request.setHeader('Content-Type', 'text/event-stream; charset=utf-8') 
     request.write("") 
     self.sse_conns.add(request) 
     return server.NOT_DONE_YET 

    def broadcast(self, event): 
     for conn in self.sse_conns: 
      event_line = "data: {}\r\n".format(event) 
      conn.write(event_line + '\r\n') 


if __name__ == "__main__": 
    sub = Subscribe() 
    reactor.listenTCP(PORT, server.Site(sub)) 

    RedisBroadcaster.sse_connector = sub  
    reactor.callLater(0, redis_sub) 

    reactor.run()