Я работаю над функцией appengine-mapreduce и модифицировал демо, чтобы соответствовать моей цели. В основном у меня есть миллион строк в следующем формате: userid, time1, time2. Моя цель - найти разницу между time1 и time2 для каждого пользователя.Ограничение памяти с помощью appengine-mapreduce
Однако, как я бегу это на Google App Engine, я столкнулся это сообщение об ошибке в разделе журналов:
Превышен мягкий частный предел памяти с 180.56 МБ после обслуживания 130 запросов Всего При обработке этого запроса, было обнаружено, что процесс, обрабатывающий этот запрос, использует слишком много памяти и был прерван. Вероятно, это приведет к тому, что новый процесс будет использоваться для следующего запроса вашего приложения. Если вы часто видите это сообщение, у вас может быть утечка памяти в приложении.
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
Может кто-нибудь подскажет, как еще я могу оптимизировать свой код лучше? Благодаря!!
Отредактировано:
Вот обработчик трубопровода:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
Остальные файлы точно так же, как демо.
Я загрузил копию моих кодов на раздаточном: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
Можете ли вы показать свою конфигурацию mapreduce? По какой-то причине похоже, что вы передаете весь файл в mapper, вместо того, чтобы сопоставлять его по строкам. –
Привет, Даниэль, мой вопрос отредактирован. Спасибо, очень ценю! – autumngard