2016-05-06 4 views
1

Я запускаю искровое задание, и в какой-то момент я хочу подключиться к эластичному поисковому серверу, чтобы получить некоторые данные и добавить их в RDD. Таким образом, код, я использую выглядит следующим образомSpark, mapPartitions, сетевое подключение закрыто до завершения операции карты

input.mapParitions(records=>{ 
    val elcon=new ElasticSearchConnection 
    val client:TransportClient=elcon.openConnection() 
val newRecs=records.flatMap(record=>{ 
     val response = client.prepareGet("index" "indexType", 
     record.id.toString).execute().actionGet() 
     val newRec=processRec(record,reponse) 
     newRec 
    })//end of flatMap 
    client.close() 
    newRecs 
})//end of mapPartitions 

Моя проблема заключается в том, что команда client.close() вызывается до flatMap завершения операции, что приводит, естественно, в виде исключения. Код работает, если я перемещаю генерацию и закрываю соединение внутри flatMap, но это создаст огромное количество соединений. Можно ли вызывать после завершения операции flatMap, что client.close?

+0

Ваша проблема решена? –

+0

Спасибо за ваше предложение и помощь. Я рассмотрел альтернативу, которую вы предложили, но я звоню в другую службу, поэтому я не уверен, как я буду использовать предложенную вами инфраструктуру. В настоящее время я нашел субоптимальное обходное решение, используя цикл while вместо карты внутри mapPartitions. Хотя это, как правило, медленно, узким местом в моем случае являются сетевые вызовы, поэтому параллелизм на данном этапе не имеет решающего значения. – orestis

+0

Это, похоже, решает эту проблему: http://stackoverflow.com/questions/36545579/spark-how-to-use-mappartition-and-create-close-connection-per-partition – Stanislav

ответ

0

Выполнение блокирующего вызова для каждого элемента в вашем RDD для получения соответствующего документа ElasticSearch вызывает проблему. Обычно рекомендуется избегать блокировки вызовов.

Существует еще один альтернативный подход, используя ElasticSearch-for-Hadoop's Spark support.

Прочтите индекс ElasticSearch/введите в качестве другого RDD и присоедините его к вашему RDD.

Включите нужную версию ESHadoop dependency.

import org.elasticsearch.spark._ 
val esRdd = sc.esRDD("index/indexType") //This returns a pair RDD of (_id, Map of all key value pairs for all fields] 
input.map(record => (record.id, record)) //Convert your RDD of records to a pair rdd of (id, record) as we want to join based on the id 
input.join(esRdd).map(rec => processResponse(rec._2._1, rec._2._2)) // Join the two RDDs based on id column it returns a pair RDD with key=id & value=Pair of matching records (id,(inputrddrecord,esrddrecord)) 

Надеюсь, это поможет.

PS: Это все равно не уменьшит проблему отсутствия совместного размещения. (то есть каждый документ с _id будет поступать из разных осколков индекса). Лучшим подходом было бы обеспечить совместное размещение входных данных RDD и документов индекса ES во время создания индекса ES.

Смежные вопросы