2016-07-25 3 views
3

Я пытаюсь решить следующую задачу:Как определить приоритеты задач при использовании очереди задач в движке Google?

  1. У меня есть ряд «задачи», которые я хотел бы выполнить
  2. У меня есть определенное количество рабочих для выполнения этих работников (так они называют внешний API с использованием urlfetch и количество параллельных вызовов этого API ограничено)
  3. Я хотел бы, чтобы эти «задачи» выполнялись «как можно скорее» (то есть минимальная задержка)
  4. Эти задачи являются частями более крупных задач и может быть классифицирован в зависимости от размера исходной задачи (т. е. небольшая оригинальная задача может генерировать от 1 до 100 задач с, средний от 100 до 1000 и большой - более 1000).

Сложная часть: Я хотел бы сделать все это эффективно (то есть минимальную задержку и использовать как можно больше параллельных API-вызовов - без превышения предела), но в то же время попытаться предотвратить большой количество задач, генерируемых из «больших» исходных задач, для задержки задач, генерируемых из «малых» исходных задач.

Иными словами, я хотел бы иметь «приоритет», назначенный каждой задаче с «маленькими» задачами с более высоким приоритетом и тем самым предотвращать голодание от «больших» задач.

Некоторые искать вокруг, кажется, не показывают, что все готовые доступен, поэтому я придумал следующее:

  • создать три нажимные очереди: tasks-small, tasks-medium, tasks-large
  • множества а максимальное количество одновременных запросов для каждого из них, так что общее количество является максимальным количеством одновременных вызовов API (например, если максимальный вызов одновременных API составляет 200, я мог бы установить tasks-small, чтобы иметь max_concurrent_requests из 30, tasks-medium 60 и tasks-large 100)
  • при постановке на охрану задания, проверьте номер. ожидающую задачи в каждой очереди (используя что-то вроде класса QueueStatistics), и если другая очередь не используется на 100%, запустите там задачу, иначе просто установите задачу в очереди с соответствующим размером.

Например, если мы имеем задачу T1 которая является частью небольшой задачи, первая проверка, если tasks-small есть свободные «слоты» и епдиеие его там. В противном случае проверьте tasks-medium и tasks-large. Если ни один из них не имеет свободных слотов, в любом случае добавьте его на tasks-small, и он будет обработан после того, как задачи будут добавлены до его обработки (обратите внимание: это не оптимально, потому что если «слоты» освобождаются в других очередях, они все равно не будут обрабатывать незавершенные задачи из очереди tasks-small)

Другой вариант - использовать очередь PULL и иметь центральный «координатор», который вытаскивает из этой очереди на основе приоритетов и отправляет их, однако это, кажется, добавляет немного больше латентности.

Однако это кажется немного хакерским, и мне интересно, есть ли там лучшие альтернативы.


EDIT: после того, как некоторые мысли и обратной связи я имею в виду использование очереди PULL после того, как все следующим образом:

  • имеют два PULL очереди (medium-tasks и large-tasks)
  • имеют диспетчеру (PUSH) с параллелизмом 1 (так что только одна задача отправки запускается в любое время). Задачи диспетчеризации созданы несколькими способами:
    • по один раз в минуты хрон
    • после добавления средней/большой задачей толкающих очередей
    • после задания рабочего заканчивает
  • есть очередь работник (PUSH) с параллелизмом, равное числу рабочих

И рабочий процесс:

  • мелкие задачи добавляются непосредственно в очередь рабочего
  • диспетчер задач, когда он запускается, выполняет следующие действия:
    • оценивает количество свободных рабочих (смотря на количество запущенных задач в рабочая очередь)
    • для любых «свободных» слотов он берет задачу из средних/больших задач PULL queue и ставит ее в очередь на рабочем месте (точнее: добавляет его в очередь PUSH рабочего, что приведет к ее выполнению - в конечном счете - у рабочего).

Я доложу, как только это будет реализовано, и по крайней мере умеренно протестированы.

ответ

0

EDIT: теперь я мигрировал к более простому решению, подобно тому, что @ Эрик-Саймонтон описан:

  • У меня есть несколько очередей PULL, один для каждого приоритета
  • Многих рабочих тянут на конечной точке (обработчик)
  • Обработчик генерирует случайное число и делает простое «если меньше 0,6, попробуйте сначала небольшую очередь, а затем большую очередь, иначе наоборот (« большой »).
  • Если работники не получают заданий или ошибки, они выполняют полуслучайную экспоненциальную отсрочку до максимального времени (т.е. они начинают вытягивать каждую 1 секунду и приблизительно в два раза больше времени ожидания после каждой пустой тянуть до 30 секунд)

Этой конечная точка необходима - среди других причин - потому что количество извлечений/второй из очереди PULL ограниченно 10k/с: https://cloud.google.com/appengine/docs/python/taskqueue/overview-pull#Python_Leasing_tasks


Я реализовал решение, описанное в UPDATE:

  • две очереди PULL (среднее задачи и большая-задача)
  • диспетчер (PUSH) очередь с параллелизмом 1
  • очереди работника (PUSH) с параллелизмом, равным числу рабочих

Смотрите вопрос для более подробной информации. Некоторые примечания:

  • существует некоторая задержка видимости задачи из-за возможную последовательность (то есть диспетчеры задача иногда не видят задачи из очереди тянуть, даже если они вставлены вместе.) - Я работал вокруг, добавляя обратный отсчет в 5 секунд до задач диспетчера, а также добавление задания cron, которое каждую минуту добавляет задачу диспетчера (поэтому, если исходная задача диспетчера не «видит» задачу из очереди вытягивания, другая будет позже)
  • удостоверился, что он назвал каждую задачу, чтобы исключить возможность их двойного диспетчеризации.
  • Вы не можете арендовать 0 предметов из очередей PULL :-)
  • Операции пакетной обработки имеют верхний предел, поэтому вам необходимо выполнить свою собственную доработку над вызовами пакетной задачи
  • , похоже, нет возможности программно получить значение «максимального параллелизма» для очереди, поэтому мне пришлось жесткий код, который в диспетчере (вычислить, сколько еще задачи можно запланировать)
  • не добавлять диспетчерские задачи, если они уже немного (по крайней мере, 10) в очереди
1

Малые/средние/крупные очереди задач не помогут сами по себе - после того, как исходные задачи будут установлены в очередь, они будут нести нерестовые рабочие задачи, потенциально даже нарушая ограничение размера очереди рабочих задач. Таким образом, вам необходимо выполнить темп/контроль за выполнением исходных задач.

я бы следить за «TODO» оригинальных задач в хранилище данных/GCS и епдиеие эти первоначальные задачи только тогда, когда соответствующий размер очереди достаточно низка (1 или 2, может быть, отложенные задания), либо из повторяющейся задача, задание cron или отложенная задача (в зависимости от скорости, с которой вам нужно выполнить первоначальную задачу), которая будет реализовывать требуемую логику стимуляции и приоритета точно так же, как диспетчер очереди push, но без дополнительной задержки, о которой вы говорили.

1

Я не использовал вытяжные очереди, но, по моему мнению, они вполне могли удовлетворить ваш прецедент. Вы можете определить 3 очереди вытягивания и иметь X рабочих, все из которых вытаскивают задачи, сначала пробуя «маленькую» очередь, а затем переходим к «среднему», если она пуста (где X - ваш максимальный параллелизм). Вам не нужен центральный диспетчер.

Тем не менее, вы должны заплатить за X работников, даже если нет заданий (или X/threadsPerMachine?), Или уменьшите их вниз &.

Итак, вот еще одна мысль: сделайте одиночную очередь с правильной maximum concurrency. Когда вы получите новую задачу, нажмите ее info на datastore и поставите в очередь родовое. Затем эта общая работа будет обращаться к хранилищу данных, который ищет задачи в приоритетном порядке, выполняя первый, который он находит. Таким образом, следующая задача будет выполняться по следующей задаче, даже если это задание уже было выделено из большой задачи.

Смежные вопросы