2016-05-19 5 views
1

Я использую следующий код для хранения вывода Spark-Streaming по адресу ElasticSearch. Я хочу сопоставить вывод искрового потока с соответствующим именем i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count). Но, как вы можете видеть, в настоящее время он отображается в ES как _1 или _2 и т. Д. Кроме того, я хочу поставить некоторый фильтр i.e (if PlatFormName = "ubuntu" then index the data) перед индексированием данных в ES. Итак, как мне это сделать?Отображение имен полей вывода из Spark-Streaming to Elastic Search

val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_) 

      val pageCounts = realTimeAgg.map  
      pageCounts.foreachRDD{ x => 
        if (x.toLocalIterator.nonEmpty) {  
         EsSpark.saveToEs(x, "spark/ElasticSearch") 
        } 
       } 

      ssc.start() 
      ssc.awaitTermination() 

Выход в ElasticSearch:

{ 
      "_index": "spark", 
      "_type": "ElasticSearch", 
      "_id": "AVTH0JPgzgtrAOUg77qq", 
      "_score": 1, 
      "_source": { 
       "_1": { 
        "_3": "Amiga", 
        "_2": "AmigaOS 1.3", 
        "_6": "SeaMonkey", 
        "_1": "Usedcar", 
        "_4": 0, 
        "_5": 0 
       }, 
       "_2": 1013 
      } 
     } 

ответ

1

Ключи упругим поиска документа являются _1, _2 и т.д., потому что вы хранения PairRDD с (Tuple6, Long) типы данных.

Чтобы сохранить ключи, вы должны использовать класс case в качестве ключа.

val realTimeAgg = lines.map{ x => (x, 1)}.reduceByKey(_+_) 

Я предполагаю, что класс объект х является классом случая, и вы хотите использовать все поля этого класса для выполнения сокращения (т.е. для проверки равенства 2 экземпляров случая класса). Если все поля этого класса не являются естественным ключом класса, который будет использоваться для равенства, то у вас есть два варианты -

  1. Override равно и хэш-код для Вашего случая класса
  2. Создайте еще один класс случая, который имеет только ключевые поля (поля, которые вы использовали в кортеже - (x.key, x.os, x.platform, x.mobile, x.browser)) и сопоставляете класс этого случая вместо Tuple в строках первой строки. map {x => ...}.

Вы можете добавить фильтр, который хотите перед тем, как писать в ElasticSearch.

pageCounts.foreachRDD { x => 
         if (x.toLocalIterator.nonEmpty) { 
          val y = x.filter(z => z._1.platform == "ubuntu")  
          EsSpark.saveToEs(y, "spark/ElasticSearch") 
        } 
       } 

PS:. Если вы проверяете пару RDD с (случай класса, Long) случай класса как ключ, как я предложил lines.map (х => (х, 1)) reduceByKey (_ + _). Существует ошибка, специально связанная с Spark Shell, что классы классов не работают как ключевые классы правильно для операций сокращения - jira issue

+0

Благодарю вас. Я выполнил ваше второе предложение. Не могли бы вы, пожалуйста, подробнее рассказать о том, что вы имеете в виду в своем первом предложении с примером, которого я не понял. Моровер, эта ошибка, похоже, не возникает, когда вы отправляете свою работу в искровой clsuter, верно? – Naresh

+0

@Naresh, в первом варианте я имел в виду переопределение методов equals & hashCode в вашем существующем классе (если требуется), как (этот поток предлагает) [http://stackoverflow.com/questions/7681183/how-can- я-определения-а-обычая-равенство-операционно-что-будет-быть использованы-на-неизменного-набор]. И да, ошибка только в искровой оболочке не при запуске на кластере. –

+0

Не могли бы вы помочь мне в этом. Я застрял здесь 'http: // stackoverflow.com/questions/39363586/issue-while-storage-data-from-spark-streaming-to-cassanadra' – Naresh

Смежные вопросы