2016-12-09 4 views
0

Я новичок в Spark 1.6. Я бы хотел прочитать файл паркета и обработать его. Для упрощающие предположим иметь паркет с этой структурой:Искра: прочитайте файл паркета и обработайте его

id, amount, label 

и у меня есть 3 правила:

amount < 10000 => label=LOW 
10000 < amount < 100000 => label=MEDIUM 
amount > 1000000 => label = HIGH 

Как это сделать в искре и Скале?

Я пытаюсь что-то вроде этого:

case class SampleModels(
    id: String, 
    amount: Double, 
    label: String, 
) 

val sc = SparkContext.getOrCreate() 
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._ 

val df = sqlContext.read.parquet("/path/file/") 
val ds = df.as[SampleModels].map(row=> 
    MY LOGIC 
    WRITE OUTPUT IN PARQUET 
) 

правильно ли это подход? Это эффективно? «MYLOGIC» может быть более сложным.

Thanks

ответ

1

Да, это правильный способ работы с искровым зажиганием. Если ваша логика проста, вы можете попытаться использовать встроенные функции для непосредственного использования в dataframe (например, when в вашем случае), это будет немного быстрее, чем отображение строк в класс case и выполнение кода в jvm, и вы будете способный легко сохранить результаты обратно в паркет.

+0

Это правильный ответ. В общем, предпочитают использовать операции искробезопасных данных/наборов данных. Операции передаются в Оптимизатор Catalyst Spark SQL (https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html), механизм оптимизации, который запускается под капотом, чтобы лучше выполнять ваши запросы. –

0

Да, это правильный подход. Он выполнит один проход над вашими полными данными, чтобы построить дополнительную колонку, в которой вы нуждаетесь.

Если вы хотите SQL путь, это путь,

val df = sqlContext.read.parquet("/path/file/") 
df.registerTempTable("MY_TABLE") 
val df2 = sqlContext.sql("select *, case when amount < 10000 then LOW else HIGH end as label from MY_TABLE") 

Не забывайте использовать hiveContext вместо sparkContext хотя.

0

Благодарим за ответ. я пытался реализовать первый вариант, но я получаю эту ошибку:

Error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases. 
    df.as[AmountModel].map(row => { 
    ^

я использую класс дела и импорт sqlContext.implicits._, как я писал в посте

Смежные вопросы