Я пытаюсь обрабатывать JSON файлы в ведре и записать результаты в ведро:GCP потока данных - обработка JSON занимает слишком много времени
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("the-project");
options.setStagingLocation("gs://some-bucket/temp/");
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://some-bucket/2016/04/28/*/*.json"))
.apply(ParDo.named("SanitizeJson").of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
try {
JsonFactory factory = JacksonFactory.getDefaultInstance();
String json = c.element();
SomeClass e = factory.fromString(json, SomeClass.class);
// manipulate the object a bit...
c.output(factory.toString(e));
} catch (Exception err) {
LOG.error("Failed to process element: " + c.element(), err);
}
}
}))
.apply(TextIO.Write.to("gs://some-bucket/output/"));
p.run();
У меня есть около 50000 файлов под ОШ пути: // some- ковш/2016/04/28/(в подкаталогах). Мой вопрос: имеет ли смысл, что для этого требуется больше часа? Выполнение чего-то подобного на кластере Spark в амазонке занимает около 15-20 минут. Я подозреваю, что могу делать что-то неэффективно.
EDIT:
В моей работе я Спарк агрегировать все результаты в DataFrame и только потом записать вывод, все сразу. Я заметил, что мой конвейер здесь пишет каждый файл отдельно, я полагаю, поэтому он занимает гораздо больше времени. Есть ли способ изменить это поведение?
Спасибо за отчет, не могли бы вы привести пример ID задания, чтобы мы могли исследовать? – jkff
@jkff Вы имеете в виду это? 'Добавлено: 2016-05-23_03_33_05-6586037759045185050' – Nira
Спасибо. Еще один вопрос: это задание Dataflow настроено на использование по умолчанию 3 виртуальных машин типа n1-standard-1, т. Е. Всего 3 потока. Является ли это преднамеренным/вашей работой Spark также использование 3 потоков, но завершение через 15-20 минут? Вы можете настроить число рабочих с помощью '--numWorkers' или включить автомасштабирование через https://cloud.google.com/dataflow/service/dataflow-service-desC# autotuning-features – jkff