2016-10-14 2 views
6

Я новичок в Scala и Spark, поэтому, надеюсь, кто-то может сообщить мне, где я здесь не так.Spark «CodeGenerator: не удалось скомпилировать» с Dataset.groupByKey

У меня есть трехколоночный набор данных (id, name, year), и я хочу найти последний год для каждого имени. Другими словами:

BEFORE           AFTER 
| id_1 | name_1 | 2015 |      | id_2 | name_1 | 2016 | 
| id_2 | name_1 | 2016 |      | id_4 | name_2 | 2015 | 
| id_3 | name_1 | 2014 | 
| id_4 | name_2 | 2015 | 
| id_5 | name_2 | 2014 | 

Я думал groupByKey и reduceGroups бы получить работу:

val latestYears = ds 
    .groupByKey(_.name) 
    .reduceGroups((left, right) => if (left.year > right.year) left else right) 
    .map(group => group._2) 

Но это дает эту ошибку, и выкладывает много сгенерированного кода Java:

ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: 
File 'generated.java', Line 21, Column 101: Unknown variable or type "value4" 

Интересно, если я создаю набор данных с только столбцами имени и года, он будет работать так, как ожидалось.


Вот полный код, который я бегу:

object App { 

    case class Record(id: String, name: String, year: Int) 

    def main(args: Array[String]) { 
    val spark = SparkSession.builder().master("local").appName("test").getOrCreate() 
    import spark.implicits._ 

    val ds = spark.createDataset[String](Seq(
     "id_1,name_1,2015", 
     "id_2,name_1,2016", 
     "id_3,name_1,2014", 
     "id_4,name_2,2015", 
     "id_5,name_2,2014" 
    )) 
     .map(line => { 
     val fields = line.split(",") 
     new Record(fields(0), fields(1), fields(2).toInt) 
     }) 

    val latestYears = ds 
     .groupByKey(_.name) 
     .reduceGroups((left, right) => if (left.year > right.year) left else right) 
     .map(group => group._2) 

    latestYears.show() 
    } 


} 

EDIT: Я считаю, что это может быть ошибка с искровым v2.0.1. После понижения до версии 2.0, этого больше не происходит.

+0

Одинаковая проблема здесь, я работал над проблемой, преобразовывая reduceGroups(). Map (_._ 2) в mapGroups (_. Reduce (_._ 2)). Вы уже сообщали об этой проблеме в список рассылки/отправителя ошибок? –

+0

Это может быть ошибка, но больше _concerned_ с самим кодом. Почему вы не используете 'groupBy' и' max' в 'year'? Однако он использует нетипизированный API DataFrame (а не набор данных). Любая конкретная причина? –

ответ

0

Ваши groupBy и reduceGroups функции: experimental. Почему бы не использовать reduceByKey (api)?

Плюсы:

  • Это должно быть легко перевести из кода у вас есть.
  • Он более стабильный (не экспериментальный).
  • Он должен быть более эффективным, поскольку он не требует полного перетасовки всех элементов в каждой группе (что также может привести к замедлению работы сети и переполнению памяти в узле).