2016-05-23 2 views
1

Я пытаюсь обрабатывать JSON файлы в ведре и записать результаты в ведро:GCP потока данных - обработка JSON занимает слишком много времени

DataflowPipelineOptions options = PipelineOptionsFactory.create() 
      .as(DataflowPipelineOptions.class); 
    options.setRunner(BlockingDataflowPipelineRunner.class); 
    options.setProject("the-project"); 
    options.setStagingLocation("gs://some-bucket/temp/"); 

    Pipeline p = Pipeline.create(options); 

    p.apply(TextIO.Read.from("gs://some-bucket/2016/04/28/*/*.json")) 
    .apply(ParDo.named("SanitizeJson").of(new DoFn<String, String>() { 
     @Override 
     public void processElement(ProcessContext c) { 
      try { 
       JsonFactory factory = JacksonFactory.getDefaultInstance(); 
       String json = c.element(); 
       SomeClass e = factory.fromString(json, SomeClass.class); 
       // manipulate the object a bit... 
       c.output(factory.toString(e)); 
      } catch (Exception err) { 
       LOG.error("Failed to process element: " + c.element(), err); 
      } 
     } 
    })) 
    .apply(TextIO.Write.to("gs://some-bucket/output/")); 
    p.run(); 

У меня есть около 50000 файлов под ОШ пути: // some- ковш/2016/04/28/(в подкаталогах). Мой вопрос: имеет ли смысл, что для этого требуется больше часа? Выполнение чего-то подобного на кластере Spark в амазонке занимает около 15-20 минут. Я подозреваю, что могу делать что-то неэффективно.

EDIT:

В моей работе я Спарк агрегировать все результаты в DataFrame и только потом записать вывод, все сразу. Я заметил, что мой конвейер здесь пишет каждый файл отдельно, я полагаю, поэтому он занимает гораздо больше времени. Есть ли способ изменить это поведение?

+0

Спасибо за отчет, не могли бы вы привести пример ID задания, чтобы мы могли исследовать? – jkff

+0

@jkff Вы имеете в виду это? 'Добавлено: 2016-05-23_03_33_05-6586037759045185050' – Nira

+0

Спасибо. Еще один вопрос: это задание Dataflow настроено на использование по умолчанию 3 виртуальных машин типа n1-standard-1, т. Е. Всего 3 потока. Является ли это преднамеренным/вашей работой Spark также использование 3 потоков, но завершение через 15-20 минут? Вы можете настроить число рабочих с помощью '--numWorkers' или включить автомасштабирование через https://cloud.google.com/dataflow/service/dataflow-service-desC# autotuning-features – jkff

ответ

0

Ваши рабочие места сталкиваются с несколькими проблемами производительности в потоке данных, вызванные тем, что он более оптимизирован для выполнения работы с большими приращениями, в то время как ваша работа обрабатывает множество очень маленьких файлов. В результате в некоторых аспектах выполнения задания преобладают служебные данные для каждого файла. Вот некоторые подробности и предложения.

  • Задача ограничена, например, записыванием вывода, чем чтением ввода (хотя чтение также является значительной частью). Вы можете значительно сократить эту накладную, указав withNumShards на TextIO.Write, в зависимости от того, сколько файлов вы хотите получить. Например. 100 может быть разумным. По умолчанию вы получаете неуказанное количество файлов, которое в этом случае, учитывая текущее поведение оптимизатора потока данных, соответствует количеству входных файлов: обычно это хорошая идея, потому что это позволяет нам не материализовывать промежуточные данные, но в этом случай это не очень хорошая идея, потому что входные файлы настолько малы, и накладные расходы на файл более важны.
  • Я рекомендую установить maxNumWorkers на значение, например, например. 12 - в настоящее время вторая работа - автомасштабирование чрезмерно большого числа рабочих. Это вызвано автоматическим масштабированием Dataflow, которое в настоящее время ориентировано на задания, которые обрабатывают данные с большими приращениями - в настоящее время он не учитывает накладные расходы на каждый файл и ведет себя не так хорошо в вашем случае.
  • Вторая работа также попадает в ошибку, из-за которой она не может завершить запись. Мы расследуем, однако установка maxNumWorkers также должна быть успешной.

Выражаясь коротко:

  • набор maxNumWorkers=12
  • набор TextIO.Write.to("...").withNumShards(100)

и он должен работать гораздо лучше.

+0

Спасибо @jkff. Я выполнил задание с вашей предлагаемой конфигурацией и закончил примерно через 23 минуты, прочитав + письмо (см. 2016-05-29_23_50_44-12310242971206854729). – Nira

+0

Отлично, спасибо. Эта работа выглядит намного здоровее. – jkff

Смежные вопросы