2013-09-13 5 views
8

Я хотел бы использовать куски внутри цепочки сельдерея.Кусочки сельдерея внутри цепи

chain = task1.s(arg1) | task2.chunks(?,CHUNK_SIZE) | task3.chunks(?, CHUNK_SIZE) 

Основном то, что я хотел бы сделать, это запустить TASK1, порций это результат и отправить куски в TASK2, которые должны затем вызвать Task3, которые также должны получать Chunked результаты от TASK2, чтобы завершить процесс. Зачем? Поскольку task1 и task2 могут возвращать справедливое количество элементов, которые я хотел бы обрабатывать в большем количестве партий.

Код, указанный выше не работает, так как я не совсем уверен, что поставить вместо вопросительных знаков, чтобы заставить его работать.

Я не совсем уверен, что это возможно, так как поиск не принес много результатов, поэтому в случае, если такой рабочий процесс невозможно, меня бы заинтересовали разумные альтернативы.

+0

У вас есть решение? –

ответ

0

Я не уверен, возможно ли это даже с выходными примитивами.

Я думаю, что если две альтернативы/обходные:

  1. Используйте куски/аккорды инициировать новые задачи из задачи.

    Возможно, вы уже подумали об этом. Идея состоит в том, чтобы вызвать task1 обычно с apply_async. После выполнения этой задачи, генерирующей массивный вывод, который требует chunking, просто используйте примитив chunks для дальнейшего создания кусков для task2. Аналогичным образом выполните тот же шаг для перехода между task2 и task3. Вызов задач из задач - это только плохая идея, когда вы ожидаете получения результатов внутренней задачи. Поэтому помните, если вы ждете результатов задачи, тогда это не будет рекомендуемым подходом.

    @task 
    def task1(some_input): 
        # Do stuff 
        # Create a list of lists where the inner list represent the *args to send to an individual task 
        task2.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async() 
    
    @task 
    def task2(a, b): 
        # Do stuff 
        # Create a list of lists where the inner list represent the *args to send to an individual task 
        task3.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async() 
    
    @task 
    def task3(a, b): 
        # Do stuff 
    
  2. Это решение немного интересно. Я наткнулся на конкретную просьбу на странице сельдерей Github. Посмотрите этот запрос на растяжение от steeve: https://github.com/celery/celery/pull/817 Из того, что я понял, он создал динамический декоратор задач (существует дискуссия о том, должно ли это имя быть), которое понимает, возвращает ли задача подзадачу. Если это так, сначала применяется эта подзадача. Он утверждает, что успешно использует его в производстве Veezio. Я сам не пробовал. Я бы предложил перейти к этой теме и задать несколько вопросов. Или даже прослушивание Steeve об этом в Twitter или IRC или что-то в этом роде.

Смежные вопросы