2013-02-19 3 views
11

У меня есть ситуация, аналогичная ситуации, описанной в here, за исключением того, что вместо цепочки задач с несколькими аргументами я хочу сгруппировать задачи, которые возвращают словарь с несколькими записями.Целевая цепочка сельдерея и доступ к ** kwargs

Это - очень слабо и абстрактно --- то, что я пытаюсь сделать:

tasks.py

@task() 
def task1(item1=None, item2=None): 
    item3 = #do some stuff with item1 and item2 to yield item3 
    return_object = dict(item1=item1, item2=item2, item3=item3) 
    return return_object 

def task2(item1=None, item2=None, item3=None): 
    item4 = #do something with item1, item2, item3 to yield item4 
    return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4) 
    return return_object 

Работа с IPython, я могу назвать TASK1 индивидуально, так и асинхронно , без проблем.

Я также могу назвать TASK2 индивидуально с результатом возвращенного task1 как двойной звезды аргумент:

>>res1 = task1.s(item1=something, item2=something_else).apply_async() 
>>res1.status 
'SUCCESS' 
>>res2 = task2.s(**res1.result).apply_async() 
>>res2.status 
'SUCCESS 

Однако, что я в конечном счете хочу достичь такой же, конечный результат, как указано выше, но с помощью цепи, и здесь, я не могу понять, как иметь tASK2 инстанцированный не с (позиционными) аргументами возвращенных task1, но с task1.result как ** kwargs:

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK! 

Я подозреваю, что я могу вернуться и переписать мои задачи так, чтобы t hey возвращает позиционные аргументы вместо словаря, и это может прояснить ситуацию, но мне кажется, что должен быть какой-то способ доступа к возвращенному объекту task1 в task2 с эквивалентной функциональностью ** двойной звезды. Я также подозреваю, что я пропустил кое-что здесь достаточно очевидным в отношении реализации подзадачи Celery или * args vs. ** kwargs.

Надеюсь, что это имеет смысл. И спасибо заранее за любые советы.

ответ

1

chain и другие холсты примитивы в семье функциональных утилит, как map и reduce.

E.g. где map(target, items) звонки target(item) для каждого элемента в списке, Python имеет редко используемую версию карты, называемую itertools.starmap, , которая вместо этого называет target(*item).

Хотя мы могли бы добавить starchain и даже kwstarchain на панель инструментов, эти были бы очень специализированными и, вероятно, не использовались бы так часто.

Интересно Python сделал это ненужным с перечнем и генераторных выражений, так, что карта заменяется [target(item) for item in item] и Starmap с [target(*item) for item in item].

Так что вместо того, чтобы внедрять несколько альтернатив для каждого примитива, я думаю, что мы должны сосредоточиться на поиске более гибкого способа поддержки этого, например. например, с использованием выражений генератора на основе сельдерея (если возможно, и если не что-то подобное)

+0

Понял. Благодарю. Я в конечном итоге решил это, слегка изменив входные/выходные данные в свою задачу. T2 теперь просто ищет один объект dict в качестве входа, а затем извлекает ожидаемые пары k/value из dict для выполнения задачи. –

+0

@BenjaminWhite я до сих пор не понимаю. можете ли вы рассказать мне, как вы это делали? – ashim888

1

Поскольку это не встроено в сельдерей, я написал функцию декоратора для чего-то подобного.

# Use this wrapper with functions in chains that return a tuple. The 
# next function in the chain will get called with that the contents of 
# tuple as (first) positional args, rather than just as just the first 
# arg. Note that both the sending and receiving function must have 
# this wrapper, which goes between the @task decorator and the 
# function definition. This wrapper should not otherwise interfere 
# when these conditions are not met. 

class UnwrapMe(object): 
    def __init__(self, contents): 
     self.contents = contents 

    def __call__(self): 
     return self.contents 

def wrap_for_chain(f): 
    """ Too much deep magic. """ 
    @functools.wraps(f) 
    def _wrapper(*args, **kwargs): 
     if type(args[0]) == UnwrapMe: 
      args = list(args[0]()) + list(args[1:]) 
     result = f(*args, **kwargs) 

     if type(result) == tuple and current_task.request.callbacks: 
      return UnwrapMe(result) 
     else: 
      return result 
    return _wrapper 

Mine разворачивает как starchain концепции, но вы можете легко изменить его разворачивать kwargs вместо этого.

5

Это мое взятие на проблему, используя абстрактный класс задач:

from __future__ import absolute_import 
from celery import Task 
from myapp.tasks.celery import app 


class ChainedTask(Task): 
    abstract = True  

    def __call__(self, *args, **kwargs): 
     if len(args) == 1 and isinstance(args[0], dict): 
      kwargs.update(args[0]) 
      args =() 
     return super(ChainedTask, self).__call__(*args, **kwargs) 

@app.task(base=ChainedTask) 
def task1(x, y): 
    return {'x': x * 2, 'y': y * 2, 'z': x * y}  


@app.task(base=ChainedTask) 
def task2(x, y, z): 
    return {'x': x * 3, 'y': y * 3, 'z': z * 2} 

Теперь можно определить и выполнить вашу цепочку, как, например:

from celery import chain 

pipe = chain(task1.s(x=1, y=2) | task2.s()) 
pipe.apply_async() 
Смежные вопросы