2017-01-06 2 views
1

Sparks2/Java8 Cassandra2 Пытается прочитать некоторые данные из Cassandra, а затем запустить группу по запросу в искрых. У меня есть только две колонки в моей DF transdate (Дата), происхождения (String)Проблема с отображением Spark Sql

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins GROUP BY (origin,transdate) ORDER BY cnt DESC LIMIT 1"); ` 

Get Error:

`Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'origins.`origin`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value)` 

Группа по выпускам удалось решить удаление() в группе, как показано ниже

Полный код: (пытается получить максимальное количество транс на дату для происхождения/место)

JavaRDD<TransByDate> originDateRDD = javaFunctions(sc).cassandraTable("trans", "trans_by_date", CassandraJavaUtil.mapRowTo(TransByDate.class)) 
        .select(CassandraJavaUtil.column("origin"), CassandraJavaUtil.column("trans_date").as("transdate")).limit((long)100) ; 
Dataset<Row> originDF = sparks.createDataFrame(originDateRDD, TransByDate.class); 
String[] columns = originDF.columns(); 
System.out.println("originDF columns: "+columns[0]+" "+columns[1]) ; -> transdate origin 
originDF.createOrReplaceTempView("origins"); 

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins GROUP BY origin,transdate ORDER BY cnt DESC LIMIT 1"); 
List list = maxOrigindate.collectAsList(); -> Exception here 
int j = list.size(); 

originDF столбцы: transdate происхождения

`public static class TransByDate implements Serializable { 
     private String origin; 
     private Date transdate; 

     public TransByDate() { } 

     public TransByDate (String origin, Date transdate) { 
      this.origin = origin; 
      this.transdate= transdate; 

     } 

     public String getOrigin() { return origin; } 
     public void setOrigin(String origin) { this.origin = origin; } 

     public Date getTransdate() { return transdate; } 
     public void setTransdate(Date transdate) { this.transdate = transdate; } 

    } 

схемы

root 
|-- transdate: struct (nullable = true) 
| |-- date: integer (nullable = false) 
| |-- day: integer (nullable = false) 
| |-- hours: integer (nullable = false) 
| |-- minutes: integer (nullable = false) 
| |-- month: integer (nullable = false) 
| |-- seconds: integer (nullable = false) 
| |-- time: long (nullable = false) 
| |-- timezoneOffset: integer (nullable = false) 
| |-- year: integer (nullable = false) 
|-- origin: string (nullable = true) 

Исключение ОШИБКА Исполнитель: Исключение в задаче 0.0 в стадии 2.0 (TID 12) scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (из класса java.util.Date) по адресу org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (C atalystTypeConverters.scala: 256) на org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 251) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst (CatalystTypeConverters. scala: 103) .... .... Исключение в потоке «main» org.apache.spark.SparkException: Иск прерывается из-за срыва этапа: Задача 0 на этапе 2.0 не удалась 1 раз, последний сбой: потерянная задача 0.0 на этапе 2.0 (TID 12, localhost): scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (из класса java.util.Date) at org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters. scala: 256) ... Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler .org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1454) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1442) at org. apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1441) ... at org.apache.spark.sql.Dataset $$ anonfun $ collectAsList $ 1.apply (Dataset.scala: 2184) на org.apache.spark.sql.Dataset.withCallback (Dataset.scala: 2559) на org.apache.spark.sql.Dataset.collectAsList (Dataset.scala: 2184) в spark.SparkTest.sqlMaxCount (SparkTest.java:244) -> Список list = maxOrigindate.collectAsList();

Вызванный: scala.MatchError: Вс янв 01 00:00:00 PST 2012 (из класса java.util.Date) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter .toCatalystImpl (CatalystTypeConverters.scala: 256) at org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.Скала: 251)

+1

Просто удалите скобку из группы по выражению i.e. по группам, переведите –

+0

@Rajat - thx, прошло мимо этой ошибки, но в следующей строке 'List list = maxOrigindate.collectAsList(); 'целая куча исключений:' scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (из класса java.util.Date) \t at org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters .scala: 256) ..... 'Я думал, что HellowWorld должен был быть легким, но –

+0

Добавлена ​​информация о деталях, строка запроса теперь не дает исключений, но следующая строка collectAsList() –

ответ

1

У вас появляется ошибка.

Caused by: scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at 

Эта ошибка происходит потому, что Спарк SQL поддерживает java.sql.Date типа. Пожалуйста, проверьте документацию Spark here. Вы также можете обратиться к SPARK-2562.

+0

Да, это работает. ** Изменен тип данных в классе до java.sql.Date ** –

+0

Только проблема, остающаяся открытой в длинной саге, представляет собой sql [основная проблема] [1] [1]: http://stackoverflow.com/questions/41473949/искровой SQL-запросы не может/41474803 # 41474803 –

1

изменить запрос к

Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, 
transdate, 
COUNT(*) AS cnt FROM origins GROUP BY origin,transdate 
ORDER BY cnt DESC LIMIT 1"); 

это будет работать.

Смежные вопросы