Я пытаюсь создать простой RPC-сервер с SimpleXMLRPCServer и Celery. В принципе, идея состоит в том, что удаленный клиент (client.py) может вызывать задачи через xmlrpc.client на сервер (server.py), который включает функции, зарегистрированные как задачи Celery (runnable.py).SimpleXMLRPCServer вызывает задачи Celery
Проблема в том, что когда функция RPC зарегистрирована через register_function, я могу назвать ее по имени, поэтому она будет выполнена правильно, но без использования Celery. То, что я хотел бы достичь, - это вызвать его через name.delay() в client.py, как он будет выполняться Celery, но без блокировки потока сервера. Таким образом, server.py должен действовать как прокси-сервер и позволяет нескольким клиентам вызывать полный набор функций, как:
for task in flow:
job = globals()[task]
job.delay("some arg")
while True:
if job.ready():
break
Я попытался с помощью register_instance с allow_dotted_names = True, но я пришел к ошибке:
xmlrpc.client.Fault: <Fault 1: "<class 'TypeError'>:cannot marshal <class '_thread.RLock'> objects">
что привело меня к вопросу - если это вообще возможно, чтобы сделать что-то вроде этого
упрощенный код:
server.py
# ...runnable.py import
# ...rpc init
def register_tasks():
for task in get_all_tasks():
setattr(self, task, globals()[task])
self.server.register_function(getattr(self, task), task)
runnable.py
app = Celery("tasks", backend="amqp", broker="amqp://")
@app.task()
def say_hello():
return "hello there"
@app.task()
def say_goodbye():
return "bye, bye"
def get_all_tasks():
tasks = app.tasks
runnable = []
for t in tasks:
if t.startswith("modules.runnable"):
runnable.append(t.split(".")[-1])
return runnable
Наконец, client.py
s = xmlrpc.client.ServerProxy("http://127.0.0.1:8000")
print(s.say_hello())