Я запускаю искровое задание, и в какой-то момент я хочу подключиться к эластичному поисковому серверу, чтобы получить некоторые данные и добавить их в RDD. Таким образом, код, я использую выглядит следующим образомSpark, mapPartitions, сетевое подключение закрыто до завершения операции карты
input.mapParitions(records=>{
val elcon=new ElasticSearchConnection
val client:TransportClient=elcon.openConnection()
val newRecs=records.flatMap(record=>{
val response = client.prepareGet("index" "indexType",
record.id.toString).execute().actionGet()
val newRec=processRec(record,reponse)
newRec
})//end of flatMap
client.close()
newRecs
})//end of mapPartitions
Моя проблема заключается в том, что команда client.close()
вызывается до flatMap
завершения операции, что приводит, естественно, в виде исключения. Код работает, если я перемещаю генерацию и закрываю соединение внутри flatMap, но это создаст огромное количество соединений. Можно ли вызывать после завершения операции flatMap, что client.close
?
Ваша проблема решена? –
Спасибо за ваше предложение и помощь. Я рассмотрел альтернативу, которую вы предложили, но я звоню в другую службу, поэтому я не уверен, как я буду использовать предложенную вами инфраструктуру. В настоящее время я нашел субоптимальное обходное решение, используя цикл while вместо карты внутри mapPartitions. Хотя это, как правило, медленно, узким местом в моем случае являются сетевые вызовы, поэтому параллелизм на данном этапе не имеет решающего значения. – orestis
Это, похоже, решает эту проблему: http://stackoverflow.com/questions/36545579/spark-how-to-use-mappartition-and-create-close-connection-per-partition – Stanislav