2013-10-07 3 views
1

Я пытаюсь создать простую распределенную систему клиент/сервер для работы в Twisted. В основном шаги:сплетенные проблемы связи с клиентами/сервером

  1. Пустите JobServer с несколькими рабочих мест и связанных с ними файлов
  2. Запуск экземпляров JobClient, они подключаются к JobServer и попросить Джобса
  3. Сервер дает JobClient работу и отправляет сериализованную JSON над TCP
  4. После, возможно, много вычислений, то JobClient отправляет результат и ждет новой работы
  5. полоскание и повторить

Но мне не удается отладить мой протокол на локальном компьютере.

JobServer.py

from twisted.application import internet, service 
from twisted.internet import reactor, protocol, defer 
from twisted.protocols import basic 
from twisted.protocols.basic import Int32StringReceiver 
from twisted.web import client 
import random 
import json 
import base64 
from logger import JobLogger 

class JobServerProtocol(Int32StringReceiver): 

    log = JobLogger("server.log") 

    def connectionMade(self): 
     self.log.write("Connected to client") 
     self.sendJob(None) 

    def stringReceived(self, msg): 
     self.log.write("Recieved job from client: %s" % msg) 
     self.sendJob(msg) 

    def sendJob(self, msg): 
     d = self.factory.getJob(msg) 

     def onError(err): 
      self.transport.write("Internal server error") 
     d.addErrback(onError) 

     def sendString(newjob_dict): 
      encoded_str = json.dumps(newjob_dict) 
      self.transport.write(encoded_str) 
      self.log.write("Sending job to client: %s" % encoded_str) 
     d.addCallback(sendString) 

    def lengthLimitExceeded(self, msg): 
     self.transport.loseConnection() 

class JobServerFactory(protocol.ServerFactory): 
    protocol = JobServerProtocol 

    def __init__(self, jobs, files): 
     assert len(jobs) == len(files) 
     self.jobs = jobs 
     self.files = files 
     self.results = [] 

    def getJob(self, msg): 

     # on startup the client will not have a message to send 
     if msg: 
      # recreate pickled msg 
      msg_dict = json.loads(msg) 
      self.results.append((msg_dict['result'], msg_dict['jidx'])) 

     # if we're all done, let the client know 
     if len(self.jobs) == 0: 
      job = None 
      jidx = -1 
      encoded = "" 
     else: 
      # get new job for client to process 
      jidx = random.randint(0, len(self.jobs) - 1) 
      job = self.jobs[jidx] 
      del self.jobs[jidx] 

      # get file 
      with open(self.files[jidx], 'r') as f: 
       filecontents = f.read() 
      encoded = base64.b64encode(filecontents) 

     # create dict object to send to client 
     response_msg = { 
      "job" : job, 
      "index" : jidx, 
      "file" : encoded 
     } 

     return defer.succeed(response_msg) 

# args for factory 
files = ['test.txt', 'test.txt', 'test.txt'] 
jobs = ["4*4-5", "2**2-5", "2/9*2/3"] 

application = service.Application('jobservice') 
factory = JobServerFactory(jobs=jobs, files=files) 
internet.TCPServer(12345, factory).setServiceParent(
    service.IServiceCollection(application)) 

JobClient.py

from twisted.internet import reactor, protocol 
from twisted.protocols.basic import Int32StringReceiver 
import json 
import time 
from logger import JobLogger 

class JobClientProtocol(Int32StringReceiver): 

    log = JobLogger("client.log") 

    def stringReceived(self, msg): 

     # unpack job from server 
     server_msg_dict = json.loads(msg) 
     job = server_msg_dict["job"] 
     index = server_msg_dict["index"] 
     filestring = server_msg_dict["file"] 

     if index == -1: 
      # we're done with all tasks 
      self.transport.loseConnection() 

     self.log.write("Recieved job %d from server with file '%s'" % (index, filestring)) 

     # do something with file 
     # job from the server... 
     time.sleep(5) 
     result = { "a" : 1, "b" : 2, "c" : 3} 
     result_msg = { "result" : result, "jidx" : index } 

     self.log.write("Completed job %d from server with result '%s'" % (index, result)) 

     # serialize and tell server 
     result_str = json.dumps(result_msg) 
     self.transport.write(encoded_str) 

    def lengthLimitExceeded(self, msg): 
     self.transport.loseConnection() 

class JobClientFactory(protocol.ClientFactory): 

    def buildProtocol(self, addr): 
     p = JobClientProtocol() 
     p.factory = self 
     return p 

reactor.connectTCP("127.0.0.1", 12345, JobClientFactory()) 
reactor.run() 

logging.py

class JobLogger(object): 
    def __init__(self, filename): 
     self.log = open(filename, 'a') 

    def write(self, string): 
     self.log.write("%s\n" % string) 

    def close(self): 
     self.log.close() 

Запуск, тестирование локально только с одним клиентом:

$ twistd -y JobServer.py -l ./jobserver.log --pidfile=./jobserver.pid 
$ python JobClient.py 

Проблемы Я имею:

  1. клиент и сервер .log файлы не пишутся надежно - иногда не до после того как я убить процесс.
  2. Протокол застревает после подключения клиента, и сервер отправляет сообщение. Сообщение, похоже, никогда не доходит до клиента.

В целом, я надеюсь, что эти протоколы гарантируют, что операции с обеих сторон могут занимать какое-то время, но, возможно, я не правильно их разработал.

ответ

3

Клиентские и серверные файлы .log не могут быть написаны надежно - иногда только после того, как я убью процесс.

Если вы хотите, чтобы на диске отображались байты своевременно, вам может потребоваться позвонить flush в ваш файловый объект.

Протокол застревает после подключения клиента, и сервер отправляет сообщение. Сообщение, похоже, никогда не доходит до клиента.

Сервер не отправляет клиенту int32 строки: он напрямую вызывает transport.write. Клиент запутывается, потому что в итоге они выглядят как очень длинные строки int32. Например, первые четыре байта «Внутренняя ошибка сервера» декодируются как целое число 1702129225, поэтому, если на сервере есть ошибка, и эти байты отправляются клиенту, клиент будет ждать примерно 2 ГБ данных перед продолжением.

Используйте вместо этого Int32StringReceiver.sendString.

Смежные вопросы