2015-04-20 4 views
1

Я пытаюсь представить данные из искры в Кибане. Тем не менее, создание RRD с помощью следующих действий:Cassandra, Spark, Elasticsearch: потоковые данные для визуализации в кибане

val test = sc.cassandraTable("test","data") 

Затем я использовал библиотеку Elasticsearch и Hadoop для потоковой передачи в Elasticsearch со следующими:

EsSpark.saveToEs(test, "spark/docs", Map("es.nodes" -> "192.168.1.88")) 

, но я получаю эту ошибку:

15/04/20 16:15:27 ERROR TaskSetManager: Task 0 in stage 12.0 failed 4 times; aborting job 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 36, 192.168.1.92): org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: Cannot handle type [class com.datastax.spark.connector.CassandraRow] 

Может ли кто-нибудь привести меня к потоку от искры до Elasticsearch. Есть ли лучший способ визуализации данных из cassandra, solr или spark. Я столкнулся с бананом, но у него нет возможности опубликовать дашаборды.

Благодаря

ответ

1

По Spark Cassandra Connector Guide, вы можете сначала определить класс случай, а затем преобразовать CassandraRow к объектам случай класса, затем сохраните объекты Elasticsearch. Ниже приведен пример кода из руководства:

case class WordCount(w: String, c: Int) 

object WordCount { 
    implicit object Mapper extends DefaultColumnMapper[WordCount](
     Map("w" -> "word", "c" -> "count")) 
} 

sc.cassandraTable[WordCount]("test", "words").toArray 
// Array(WordCount(bar,20), WordCount(foo,10)) 

sc.parallelize(Seq(WordCount("baz", 30), WordCount("foobar", 40))) 
    .saveToCassandra("test", "words", SomeColumns("word", "count")) 
Смежные вопросы