2016-01-13 2 views
0

Предположим, что я добавляю 100 задач push (как группа 1) в свой tasks-queue. Затем я добавляю еще 200 задач (как группа 2) в одну очередь. Как я могу понять, завершены ли все задачи группы 1?Есть ли способ понять, когда все задачи закончены?

Похоже, QueueStatistics не поможет. tag работает только с протяженными очередями.

И у меня не может быть отдельных очередей (так как у меня могут быть сотни групп).

+0

Конечно, это будет непросто. Вероятно, вам понадобится отстрочный счетчик в хранилище данных, который увеличивается каждый раз, когда задача в своей группе завершается. вы можете проверить количество выполненных заданий на основе группы из подсвеченного счетчика и посмотреть, равно ли оно количеству поставленных задач. Это все еще может быть не идеальным, поскольку я считаю, что в некоторых случаях задачам разрешается дважды запускать задачи, поэтому ваш осколочный счетчик также должен иметь надежный ключ, чтобы одна и та же задача выполнялась дважды, установив один и тот же объект-счетчик. – mgilson

ответ

0

Я бы, вероятно, решил это, используя заштрихованный счетчик в хранилище данных, например, @mgilson, и украсил мои отложенные функции, чтобы запустить обратный вызов, когда задачи выполняются.

Я думаю, что что-то вроде этого - это то, что вы ищете, если включить код в https://cloud.google.com/appengine/articles/sharding_counters?hl=en и написать функцию уклона, чтобы дополнить приращение.

import random 
import time 
from google.appengine.ext import deferred 

def done_work(): 
    logging.info('work done!') 

def worker(callback=None): 
    def fst(f): 
    def snd(*args, **kwargs): 
     key = kwargs['shard_key'] 
     del kwargs['shard_key'] 

     retval = f(*args, **kwargs) 

     decriment(key) 
     if get_count(key) == 0: 
     callback() 

     return retval 
    return snd 
    return fst 

def func(n): 
    # do some work 
    time.sleep(random.randint(1, 10)/10.0) 
    logging.info('task #{:d}'.format(n)) 

def make_some_tasks(): 
    func = worker(callback=done_work)(func) 
    key = random.randint(0, 1000) 
    for n in xrange(0, 100): 
    increment(key) 
    deferred.defer(func, n, shard_key=key) 
0

Задачи не могут запускаться только один раз, иногда даже успешно выполняемые задачи могут повторяться. Вот такой пример: GAE deferred task retried due to "instance unavailable" despite having already succeeded.

Из-за этого использование счетчика, увеличенного при постановке задачи и уменьшении при выполнении задачи, не будет работать - оно будет уменьшаться дважды в таком дублированном случае выполнения, выкидывая все вычисление.

Достоинство Способ отслеживания завершения задачи (что я могу придумать) - это независимо отслеживать каждую отдельную задачу. Вы можете сделать это, используя task names (либо заданные, либо автоматически назначенные после успешной очереди) - они уникальны для данной очереди. Названия задач, подлежащие отслеживанию, могут храниться в списках задач, сохраняемых, например, в хранилище данных.

Примечание: это только теоретический ответ, который я получил, когда задал себе тот же вопрос, но не смог его проверить.

Смежные вопросы