2012-02-12 2 views
8

Я работаю над функцией appengine-mapreduce и модифицировал демо, чтобы соответствовать моей цели. В основном у меня есть миллион строк в следующем формате: userid, time1, time2. Моя цель - найти разницу между time1 и time2 для каждого пользователя.Ограничение памяти с помощью appengine-mapreduce

Однако, как я бегу это на Google App Engine, я столкнулся это сообщение об ошибке в разделе журналов:

Превышен мягкий частный предел памяти с 180.56 МБ после обслуживания 130 запросов Всего При обработке этого запроса, было обнаружено, что процесс, обрабатывающий этот запрос, использует слишком много памяти и был прерван. Вероятно, это приведет к тому, что новый процесс будет использоваться для следующего запроса вашего приложения. Если вы часто видите это сообщение, у вас может быть утечка памяти в приложении.

def time_count_map(data): 
    """Time count map function.""" 
    (entry, text_fn) = data 
    text = text_fn() 

    try: 
    q = text.split('\n') 
    for m in q: 
     reader = csv.reader([m.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
    logging.debug(e) 


def time_count_reduce(key, values): 
    """Time count reduce function.""" 
    time = 0.0 
    for subtime in values: 
    time += float(subtime) 
    realtime = int(time) 
    yield "%s: %d\n" % (key, realtime) 

Может кто-нибудь подскажет, как еще я могу оптимизировать свой код лучше? Благодаря!!

Отредактировано:

Вот обработчик трубопровода:

class TimeCountPipeline(base_handler.PipelineBase): 
    """A pipeline to run Time count demo. 

    Args: 
    blobkey: blobkey to process as string. Should be a zip archive with 
     text files inside. 
    """ 

    def run(self, filekey, blobkey): 
    logging.debug("filename is %s" % filekey) 
    output = yield mapreduce_pipeline.MapreducePipeline(
     "time_count", 
     "main.time_count_map", 
     "main.time_count_reduce", 
     "mapreduce.input_readers.BlobstoreZipInputReader", 
     "mapreduce.output_writers.BlobstoreOutputWriter", 
     mapper_params={ 
      "blob_key": blobkey, 
     }, 
     reducer_params={ 
      "mime_type": "text/plain", 
     }, 
     shards=32) 
    yield StoreOutput("TimeCount", filekey, output) 

Mapreduce.yaml:

mapreduce: 
- name: Make messages lowercase 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.lower_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 
- name: Make messages upper case 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.upper_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 

Остальные файлы точно так же, как демо.

Я загрузил копию моих кодов на раздаточном: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

+0

Можете ли вы показать свою конфигурацию mapreduce? По какой-то причине похоже, что вы передаете весь файл в mapper, вместо того, чтобы сопоставлять его по строкам. –

+0

Привет, Даниэль, мой вопрос отредактирован. Спасибо, очень ценю! – autumngard

ответ

2

Это, скорее всего, ваш входной файл превышает мягкое ограничение памяти в размере. Для больших файлов используйте либо BlobstoreLineInputReader, либо BlobstoreZipLineInputReader.

Эти считыватели ввода передают что-то другое в функции map, они передают start_position в файл и строку текста.

Ваша функция map может выглядеть примерно так:

def time_count_map(data): 
    """Time count map function.""" 
    text = data[1] 

    try: 
     reader = csv.reader([text.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
     logging.debug(e) 

Использование BlobstoreLineInputReader позволит задание работать намного быстрее, так как он может использовать более чем один осколок, до 256, но это означает, что вам нужно загрузить файлы несжатые, что может быть болью. Я обрабатываю его, загружая сжатые файлы на сервер Windows EC2, а затем распаковываю и загружаю оттуда, поскольку пропускная способность восходящего канала настолько велика.

+0

Это работало очень хорошо для меня! Большое спасибо! :) – autumngard

6

Также рассмотрите возможность вызова gc.collect() в обычных точках во время вашего кода. Я видел несколько вопросов о превышении пределов мягкой памяти, которые были смягчены вызовом gc.collect(), причем большинство из них связано с blobstore.

+0

вызывает gc.collect() только для blobstore или вообще? – marcadian

Смежные вопросы