2016-01-04 2 views
0

Я пытаюсь создать простой RPC-сервер с SimpleXMLRPCServer и Celery. В принципе, идея состоит в том, что удаленный клиент (client.py) может вызывать задачи через xmlrpc.client на сервер (server.py), который включает функции, зарегистрированные как задачи Celery (runnable.py).SimpleXMLRPCServer вызывает задачи Celery

Проблема в том, что когда функция RPC зарегистрирована через register_function, я могу назвать ее по имени, поэтому она будет выполнена правильно, но без использования Celery. То, что я хотел бы достичь, - это вызвать его через name.delay() в client.py, как он будет выполняться Celery, но без блокировки потока сервера. Таким образом, server.py должен действовать как прокси-сервер и позволяет нескольким клиентам вызывать полный набор функций, как:

for task in flow: 
    job = globals()[task] 
    job.delay("some arg") 
    while True: 
     if job.ready(): 
      break 

Я попытался с помощью register_instance с allow_dotted_names = True, но я пришел к ошибке:

xmlrpc.client.Fault: <Fault 1: "<class 'TypeError'>:cannot marshal <class '_thread.RLock'> objects"> 

что привело меня к вопросу - если это вообще возможно, чтобы сделать что-то вроде этого

упрощенный код:

server.py

# ...runnable.py import 
# ...rpc init 
def register_tasks(): 
    for task in get_all_tasks(): 
     setattr(self, task, globals()[task]) 
     self.server.register_function(getattr(self, task), task) 

runnable.py

app = Celery("tasks", backend="amqp", broker="amqp://") 

@app.task() 
def say_hello(): 
    return "hello there" 

@app.task() 
def say_goodbye(): 
    return "bye, bye" 

def get_all_tasks(): 
    tasks = app.tasks 
    runnable = [] 

    for t in tasks: 
     if t.startswith("modules.runnable"): 
      runnable.append(t.split(".")[-1]) 

    return runnable 

Наконец, client.py

s = xmlrpc.client.ServerProxy("http://127.0.0.1:8000") 
print(s.say_hello()) 

ответ

0

Я придумал идею, которая создает дополнительные обертки для задержки сельдерея функции. Они регистрируются так, как клиент RPC может вызвать rpc.the_remote_task.delay (* args). Это возвращает идентификатор задания Celery, а затем клиент спрашивает, готово ли задание через rpc.ready (job_id) и получает результаты с помощью rpc.get (job_id). На данный момент есть очевидное ядро ​​безопасности, так как вы можете получить результаты, когда знаете идентификатор задания, но все же - он отлично работает.

регистрирующие задачи (server.py)

def register_tasks(): 
    for task in get_all_tasks(): 
     exec("""def """ + task + """_runtime_task_delay(*args): 
return celery_wrapper(""" + task + """, "delay", *args) 
setattr(self, task + "_delay", """ + task + """_runtime_task_delay) 
      """) 

     f_delay = task + "_delay" 
     self.server.register_function(getattr(self, f_delay), task + ".delay") 

    def job_ready(jid): 
     return celery_wrapper(None, "ready", jid) 

    def job_get(jid): 
     return celery_wrapper(None, "get", jid) 

    setattr(self, "ready", job_ready) 
    setattr(self, "get", job_get) 

    self.server.register_function(job_ready, "ready") 
    self.server.register_function(job_get, "get") 

Обертка (server.py)

def celery_wrapper(task, method, *args): 
    if method == "delay": 
     job = task.delay(*args) 
     job_id = job.id 

     return job_id 
    elif method == "ready": 
     res = app.AsyncResult(args[0]) 
     return res.ready() 
    elif method == "get": 
     res = app.AsyncResult(args[0]) 
     return res.get() 
    else: 
     return "0" 

И вызов RPC (client.py)

jid = s.the_remote_task.delay("arg1", "arg2") 
is_running = True 
while is_running: 
     is_running = not s.ready(jid) 

     if not is_running: 
       print(s.get(jid)) 
     time.sleep(.01) 
Смежные вопросы