2013-09-19 8 views
0

Я читаю большой объем данных от поставщика API. Получив ответ, мне нужно просмотреть и переупаковать данные и поместить их в хранилище данных App Engine. Особая большая учетная запись будет содержать записи ~ 50 тыс. Записей.Google App Engine (Python) обрабатывает большое количество задач записи

Каждый раз, когда я получаю некоторые записи из API, я буду хранить 500 записей в виде пакета в таблице temp и отправлять задачу обработки в очередь. В случае слишком много задач, застревают внутри одной очереди, я использую 6 очередей в общей сложности:

count = 0 
worker_number = 6 
for folder, property in entries: 
        data[count] = { 
         # repackaging data here 
        } 

        count = (count + 1) % 500 

        if count == 0: 
         cache = ClientCache(parent=user_key, data=json.dumps(data)) 
         cache.put() 
         params = { 
          'access_token': access_token, 
          'client_key': client.key.urlsafe(), 
          'user_key': user_key.urlsafe(), 
          'cache_key': cache.key.urlsafe(), 
         } 
         taskqueue.add(
          url=task_url, 
          params=params, 
          target='dbworker', 
          queue_name='worker%d' % worker_number) 
         worker_number = (worker_number + 1) % 6 

И task_url приведет к следующему:

logging.info('--------------------- Process File ---------------------') 
     user_key = ndb.Key(urlsafe=self.request.get('user_key')) 
     client_key = ndb.Key(urlsafe=self.request.get('client_key')) 
     cache_key = ndb.Key(urlsafe=self.request.get('cache_key')) 

     cache = cache_key.get() 
     data = json.loads(cache.data) 
     for property in data.values(): 
      logging.info(property) 
      try: 
       key_name = '%s%s' % (property['key1'], property['key2']) 
       metadata = Metadata.get_or_insert(
        key_name, 
        parent=user_key, 
        client_key=client_key, 
        # ... other info 
       ) 
       metadata.put() 
      except StandardError, e: 
       logging.error(e.message) 

Все задачи выполняются в интерфейсе.

С такой структурой он отлично работает. хорошо ... много времени. Но иногда я получаю эту ошибку:

2013-09-19 15:10:07.788 
suspended generator transaction(context.py:938) raised TransactionFailedError(The transaction could not be committed. Please try again.) 
W 2013-09-19 15:10:07.788 
suspended generator internal_tasklet(model.py:3321) raised TransactionFailedError(The transaction could not be committed. Please try again.) 
E 2013-09-19 15:10:07.789 
The transaction could not be committed. Please try again. 

Казалось бы, проблема записи в хранилище слишком часто? Я хочу узнать, как я могу сбалансировать темп и позволить работнику работать гладко ... Также есть ли другой способ улучшить производительность? Моя конфигурация очереди примерно такая:

- name: worker0 
    rate: 120/s 
    bucket_size: 100 
    retry_parameters: 
    task_retry_limit: 3 

ответ

2

Вы пишете отдельные объекты за раз.

Как насчет модификации кода для записи партиями с использованием ndb.put_multi, что сократит время в оба конца для каждой транзакции.

И почему вы используете get_or_insert при перезаписи записи каждый раз. Вы могли бы просто написать. Оба из них значительно уменьшат рабочую нагрузку

+0

Я просто попробовал put_multi, похоже, что это помогает! Причина, по которой я использую get_or_insert, заключается в том, чтобы избежать дублирования данных. Я признаю каждую запись уникальной по ключевому слову .. если вы не знаете лучшего способа сохранить уникальное свойство. – xialin

+0

относительно о уникальном id. Если в списке сущностей есть дубликат, как будет обрабатывать put_multi? – xialin

+0

Вы можете легко удалить дубликаты из списка. Однако вы никогда не получите дубликаты записей, потому что используете имя ключа, и в этом случае вы просто напишете один и тот же объект. –

Смежные вопросы