Я использую следующий код для хранения вывода Spark-Streaming
по адресу ElasticSearch
. Я хочу сопоставить вывод искрового потока с соответствующим именем i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)
. Но, как вы можете видеть, в настоящее время он отображается в ES как _1 или _2 и т. Д. Кроме того, я хочу поставить некоторый фильтр i.e (if PlatFormName = "ubuntu" then index the data)
перед индексированием данных в ES. Итак, как мне это сделать?Отображение имен полей вывода из Spark-Streaming to Elastic Search
val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)
val pageCounts = realTimeAgg.map
pageCounts.foreachRDD{ x =>
if (x.toLocalIterator.nonEmpty) {
EsSpark.saveToEs(x, "spark/ElasticSearch")
}
}
ssc.start()
ssc.awaitTermination()
Выход в ElasticSearch:
{
"_index": "spark",
"_type": "ElasticSearch",
"_id": "AVTH0JPgzgtrAOUg77qq",
"_score": 1,
"_source": {
"_1": {
"_3": "Amiga",
"_2": "AmigaOS 1.3",
"_6": "SeaMonkey",
"_1": "Usedcar",
"_4": 0,
"_5": 0
},
"_2": 1013
}
}
Благодарю вас. Я выполнил ваше второе предложение. Не могли бы вы, пожалуйста, подробнее рассказать о том, что вы имеете в виду в своем первом предложении с примером, которого я не понял. Моровер, эта ошибка, похоже, не возникает, когда вы отправляете свою работу в искровой clsuter, верно? – Naresh
@Naresh, в первом варианте я имел в виду переопределение методов equals & hashCode в вашем существующем классе (если требуется), как (этот поток предлагает) [http://stackoverflow.com/questions/7681183/how-can- я-определения-а-обычая-равенство-операционно-что-будет-быть использованы-на-неизменного-набор]. И да, ошибка только в искровой оболочке не при запуске на кластере. –
Не могли бы вы помочь мне в этом. Я застрял здесь 'http: // stackoverflow.com/questions/39363586/issue-while-storage-data-from-spark-streaming-to-cassanadra' – Naresh