2016-08-21 3 views
3

Обычно я загружаю файлы csv, а затем запускаю разные типы агрегатов, например, «group by» с помощью Spark. Мне было интересно, можно ли запускать такие операции во время загрузки файла (как правило, несколько миллионов строк) вместо того, чтобы секвенировать их, и если это может быть достойным (как экономия времени).Искра: группировка во время загрузки

Пример:

val csv = sc.textFile("file.csv") 
val data = csv.map(line => line.split(",").map(elem => elem.trim)) 
val header = data.take(1) 
val rows = data.filter(line => header(0) != "id") 
val trows = rows.map(row => (row(0), row)) 
trows.groupBy(//row(0) etc.) 

Для моего понимания того, как работает искра, то groupBy (или aggregate) будет «отложено» до загрузки в память весь файл CSV. Если это правильно, может ли загрузка и группировка выполняться в «одно и то же» время вместо последовательности двух шагов?

ответ

3

groupBy (или совокупность) будет «отложен» на загрузку в память всего файла csv.

Это не тот случай. На локальном уровне (один раздел) Spark работает на ленивых последовательностях, поэтому операции, относящиеся к одной задаче (включая агрегирование на стороне карты), могут раздаваться вместе.

Другими словами, если у вас есть цепочка методов, операции выполняются по очереди, а не по трансформации. Другими словами, первая строка будет отображена, отфильтрована, отображена еще раз и передана в агрегатор до следующего доступа.

+0

Хорошо спасибо. Если это так, любая дальнейшая оптимизация на моей стороне бесполезна. Функции высокого порядка от Spark все вообще ленивы или есть исключения? – Randomize

+0

Большая часть этого материала не зависит от искры. Это просто свойство структур данных, которые используются для реализации внутренней логики. Но в целом я бы сказал, что Спарк такой ленивый, что имеет смысл на практике. – zero323

1

Чтобы запустить группу, на операции загрузки Вы можете продолжить 2-х вариантов:

  1. Написать свой собственный загрузчик и сделать собственную группу, внутри что + aggregationByKey. Недостатки этого - написать больше кода & больше обслуживания.
  2. Использование Паркет Формат файлы в качестве входных данных + DataFrames, из-за это столбчатый он будет читать только нужные столбцы, используемые в вашем GroupBy. поэтому он должен быть быстрее. - DataFrameReader

    df = spark.read.parquet('file_path') 
    df = df.groupBy('column_a', 'column_b', '...').count() 
    df.show() 
    

Благодаря Спарк Ленивый он не будет загружать файл, пока вы не вызовете методы действий как шоу/собирать/записи. Поэтому Spark будет знать, какие столбцы считываются и которые игнорируют процесс загрузки.

Смежные вопросы