Я читаю большой объем данных от поставщика API. Получив ответ, мне нужно просмотреть и переупаковать данные и поместить их в хранилище данных App Engine. Особая большая учетная запись будет содержать записи ~ 50 тыс. Записей.Google App Engine (Python) обрабатывает большое количество задач записи
Каждый раз, когда я получаю некоторые записи из API, я буду хранить 500 записей в виде пакета в таблице temp и отправлять задачу обработки в очередь. В случае слишком много задач, застревают внутри одной очереди, я использую 6 очередей в общей сложности:
count = 0
worker_number = 6
for folder, property in entries:
data[count] = {
# repackaging data here
}
count = (count + 1) % 500
if count == 0:
cache = ClientCache(parent=user_key, data=json.dumps(data))
cache.put()
params = {
'access_token': access_token,
'client_key': client.key.urlsafe(),
'user_key': user_key.urlsafe(),
'cache_key': cache.key.urlsafe(),
}
taskqueue.add(
url=task_url,
params=params,
target='dbworker',
queue_name='worker%d' % worker_number)
worker_number = (worker_number + 1) % 6
И task_url приведет к следующему:
logging.info('--------------------- Process File ---------------------')
user_key = ndb.Key(urlsafe=self.request.get('user_key'))
client_key = ndb.Key(urlsafe=self.request.get('client_key'))
cache_key = ndb.Key(urlsafe=self.request.get('cache_key'))
cache = cache_key.get()
data = json.loads(cache.data)
for property in data.values():
logging.info(property)
try:
key_name = '%s%s' % (property['key1'], property['key2'])
metadata = Metadata.get_or_insert(
key_name,
parent=user_key,
client_key=client_key,
# ... other info
)
metadata.put()
except StandardError, e:
logging.error(e.message)
Все задачи выполняются в интерфейсе.
С такой структурой он отлично работает. хорошо ... много времени. Но иногда я получаю эту ошибку:
2013-09-19 15:10:07.788
suspended generator transaction(context.py:938) raised TransactionFailedError(The transaction could not be committed. Please try again.)
W 2013-09-19 15:10:07.788
suspended generator internal_tasklet(model.py:3321) raised TransactionFailedError(The transaction could not be committed. Please try again.)
E 2013-09-19 15:10:07.789
The transaction could not be committed. Please try again.
Казалось бы, проблема записи в хранилище слишком часто? Я хочу узнать, как я могу сбалансировать темп и позволить работнику работать гладко ... Также есть ли другой способ улучшить производительность? Моя конфигурация очереди примерно такая:
- name: worker0
rate: 120/s
bucket_size: 100
retry_parameters:
task_retry_limit: 3
Я просто попробовал put_multi, похоже, что это помогает! Причина, по которой я использую get_or_insert, заключается в том, чтобы избежать дублирования данных. Я признаю каждую запись уникальной по ключевому слову .. если вы не знаете лучшего способа сохранить уникальное свойство. – xialin
относительно о уникальном id. Если в списке сущностей есть дубликат, как будет обрабатывать put_multi? – xialin
Вы можете легко удалить дубликаты из списка. Однако вы никогда не получите дубликаты записей, потому что используете имя ключа, и в этом случае вы просто напишете один и тот же объект. –