0

Я ищу мощный и быстрый способ обработки обработки большого файла в Google App Engine.Как обрабатывать обработку большого файла на GAE?

Он работает в качестве следующего (упрощенного процесса в конце):

  1. клиент отправить файл CSV, что наш сервер будет обращаться, строка за строкой.
  2. После загрузки файла в хранилище NDB Uploads добавляется запись с именем CSV, файловым путем (в хранилище Google) и некоторыми основными сведениями. Затем создается задача, называемая «предварительная обработка».
  3. Задача предварительной обработки будет проходить по всем строкам файла CSV (может быть миллионы) и добавит запись NDB в модель UploadEntries для каждой строки с идентификатором CSV, линией, данными для извлечения/обрабатывать и некоторые индикаторы (логические), если эта строка начала обработку, и завершила обработку («is_treating», «is_done»)
  4. Как только задача предварительной обработки завершена, она обновляет информацию клиенту «XXX строки» будут обработаны "
  5. Запрашивается Uploads.next(). Метод next будет:
    • Поиск в UploadEntries, который имеет is_treating и is_done на ложь,
    • Будет ли добавить задачу в датасторе Redis для следующей строки найдено. (Хранилище данных Redis используется, потому что работа здесь выполняется на серверах, которые не управляются Google)
    • Также создаст новую запись в задаче Process-healthcheck (эта задача запускается через 5 минут и проверяется, что 7) была выполнена правильно. Если нет, он считает, что сервер Redis/Outside потерпел неудачу и сделал то же самое, что и 7), без результата (вместо этого «ошибка»)).
    • Затем он обновляет UploadEntries.is_treating до True для этой записи.
  6. Внешний сервер обрабатывает данные и возвращает результаты, отправив запрос POST на конечную точку на сервере.
  7. Эта конечная точка обновляет запись UploadEntries в хранилище данных (включая «is_treating» и «is_done») и звоните Uploads.next(), чтобы начать следующую строку.
  8. В Uploads.next при поиске следующих записей ничего не возвращает, я считаю, что файл будет окончательно обработан, и вызовет задачу post-process, которая будет перестраивать CSV с обработанными данными и возвращает ее клиенту.

Вот несколько вещей, чтобы иметь в виду:

  1. Серверы, делает реальной работы находятся за пределами Google AppEngine, поэтому мне пришлось придумать Redis.
  2. Текущий способ делать вещи дает мне гибкость в отношении количества параллельных записей для обработки: в 5) методы Uploads.next() содержат аргумент limit, который позволяет мне параллельно искать процесс n. Может быть 1, 5, 20, 50.
  3. Я не могу просто добавить все строки из задачи pre-processing прямо в Redis, потому что в этом случае следующему клиенту придется дождаться завершения первого файла, и это будет слишком долго, чтобы занять слишком много времени

Но эта система имеет различные проблемы, и именно поэтому я поворачиваюсь к вам на помощь:

  1. Иногда эта система настолько быстро, что Datastore еще не обновляется правильно и когда вызывая Uploads.next(), записи возвратили (только entry.is_treating = True еще не нажата в базу данных)
  2. Redis или мой сервер (я действительно не знаю) когда-нибудь потерял задание или запрос POST после того, как обработка не была выполнена, поэтому задача никогда не идет до is_done = True. Вот почему мне пришлось внедрить систему Healcheck, чтобы убедиться, что линия правильно обработана независимо от того, что. Это имеет двойное преимущество: имя этой задачи содержит идентификатор csv и строку. Сделать его уникальным для каждого файла. Если я хранилище данных не обновляется, и одна и та же задача выполняется дважды, создание проверки работоспособности завершится неудачно, потому что одно и то же имя уже существует, что позволяет мне знать, что есть проблема совпадения, поэтому я игнорирую эту задачу, потому что это означает, что Datastore еще не обновлен.

Я initiall думал о запуске файла в одном процессе независим, строка за строкой, но это имеет большой недостаток, не будучи в состоянии запустить несколько линию параллельно. Кроме того, Google ограничивает выполнение задачи до 24 часов для выделенных целей (не по умолчанию), а когда файл действительно большой, он может работать дольше 24 часов.

Для получения дополнительной информации, если это поможет, я использую Python


и упростить рабочий процесс, вот что я пытаюсь добиться в лучшем виде:

  • Обработать большой файл, запустить несколько процессов paralllel, по одному на строку.
  • Отправьте работу на внешний сервер, используя Redis. После этого, что за пределами сервер возвращает результат с помощью запроса POST к главному серверу
  • Основной сервер затем обновить информацию об этой линии, и переходит к следующей строке

Я бы очень признателен, если кто-то имел лучший способ сделать это. Я действительно верю, что я не первый, кто сделал такую ​​работу, и я уверен, что я не делаю это правильно.

(Я считаю, что Stackoverflow - это лучший раздел Stack Exchange для публикации такого вопроса, поскольку это вопрос с алгоритмом, но также возможно, что я не видел лучшей сети для этого. Если это так, я сожалею о что).

+1

Я думаю, что вы можете использовать для этого приложение map map для этого, он может читать CSV из GCS по строкам с буферами, запуская его на нескольких экземплярах. Он будет обрабатывать N строк на запрос, в зависимости от ваших настроек. Но, примеры GAE дороги. –

ответ

1

серверы, которые действительно реальная работа находятся за пределами Google AppEngine

Рассматривали ли вы с помощью Google Cloud Dataflow для обработки больших файлов, а? Это управляемая служба, которая будет обрабатывать разделение и обработку файлов для вас.

На основании исходных мыслей здесь является процесс очертания:

  • добавление пользователя файлы прямо в Google облако хранения, используя signed urls или Blobstore API
  • Запрос от AppEngine запускает небольшой вычислительный экземпляр двигателя, инициирующий (BlockingDataflowPipelineRunner), чтобы запустить задачу потока данных. (Я боюсь, что это должен быть экземпляр вычисления из-за песочницы и блокировки проблем ввода-вывода).
  • Когда задача потока данных завершена, экземпляр экземпляра вычислителя разблокируется и отправляет сообщение в pubsub.
  • Сообщение pubsub вызывает webhook в службе AppEngine, которая изменяет состояние задач с 'in progress' на 'complete', чтобы пользователь мог получить свои результаты.
+0

Удивительный, Dataflow кажется правильным для меня. Знаете ли вы, работает ли он с входами XLS/XLSX, с чего я начинаю (поэтому я могу не только читать строки по строкам)? –

+0

Возможно, вам придется создать собственный источник. Для разработки пользовательских читателей файлов есть [классы удобства] (https://cloud.google.com/dataflow/model/custom-io-python#convenience-source-base-classes). Возможно, это можно было бы сочетать с [одной из многих библиотек python для чтения XLS] (http://www.python-excel.org/). –

+0

Хорошо, вот что я подозревал. Я начал читать идеи, связанные с Dataflow, но меня все еще беспокоит вопрос: как я могу применить метод Transform, который, как мне кажется, должен ждать на внешнем сервере ответа? Я блокирую текущий процесс и запрашиваю состояние ответа каждые n секунд, или есть лучший способ? –

Смежные вопросы