2015-05-15 2 views
1

Я использую Spark 1.3.1 и пытаюсь сохранить RDD для mongodb, используя версию mongo-hadoop connector версии 1.3.2 и версию mongo-java версии 3.0.1. Когда я запускаю приложение ниже в отдельном кластере, драйвер помечен как FAILURE.RDD частично написан на mongo

Вот код, я использую, чтобы воспроизвести проблему,

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 

import org.apache.hadoop.conf.Configuration 
import org.apache.spark.rdd.RDD 

import org.bson.BasicBSONObject 
import org.bson.BSONObject 

object TestApp { 

    def testSaveRddToMongo() { 
    val sparkConf = new SparkConf().setAppName("Test") 
    val sc = new SparkContext(sparkConf) 

    val mongoConfig = new Configuration() 
    mongoConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat") 
    mongoConfig.set("mongo.input.uri", "mongodb://some.local.ip:27017/mydb.input") 

    val bsonRDD: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(mongoConfig, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject]) 

    val reasons: RDD[String] = bsonRDD.map(tuple => { 
     tuple._2.asInstanceOf[BasicBSONObject].getString("fieldName").trim 
     }).distinct().cache() 

    val out: RDD[(String,Int)] = reasons.zipWithIndex().map { case (k,v) => (k,v.toInt)} 

    println (s"Saving ${out.count} elements") 
    val outputConfig = new Configuration() 
    outputConfig.set("mongo.job.output.format","com.mongodb.hadoop.MongoOutputFormat") 
    outputConfig.set("mongo.output.uri", "mongodb://some.local.ip:27017/mydb.garbage") 
    out.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], outputConfig) 
    } 

    def main(args: Array[String]) { 
    testSaveRddToMongo() 
    } 
} 

В STDERR от водителя, я вижу это

15/05/15 14:18:43 INFO DAGScheduler: Job 2 failed: saveAsNewAPIHadoopFile at Test.scala:39, took 6.491961 s 
    Exception in thread "main" java.lang.reflect.InvocationTargetException 
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:497) 
      at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:59) 
      at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) 
    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 5.0 failed 4 times, most recent failure: Lost task 3.3 in stage 5.0 (TID 275, largo-ubuntu): 
java.lang.IllegalStateException: The pool is closed 
      at com.mongodb.internal.connection.ConcurrentPool.get(ConcurrentPool.java:123) 
      at com.mongodb.connection.DefaultConnectionPool.getPooledConnection(DefaultConnectionPool.java:243) 
      at com.mongodb.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:90) 
      at com.mongodb.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:80) 
      at com.mongodb.connection.DefaultServer.getConnection(DefaultServer.java:69) 
      at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.getConnection(ClusterBinding.java:86) 
      at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:184) 
      at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:177) 
      at com.mongodb.operation.BaseWriteOperation.execute(BaseWriteOperation.java:106) 
      at com.mongodb.operation.BaseWriteOperation.execute(BaseWriteOperation.java:58) 
      at com.mongodb.Mongo.execute(Mongo.java:745) 
      at com.mongodb.Mongo$2.execute(Mongo.java:728) 
      at com.mongodb.DBCollection.executeWriteOperation(DBCollection.java:327) 
      at com.mongodb.DBCollection.replaceOrInsert(DBCollection.java:405) 
      at com.mongodb.DBCollection.save(DBCollection.java:394) 
      at com.mongodb.DBCollection.save(DBCollection.java:367) 
      at com.mongodb.hadoop.output.MongoRecordWriter.write(MongoRecordWriter.java:105) 
      at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000) 
      at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) 
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
      at org.apache.spark.scheduler.Task.run(Task.scala:64) 
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
      at java.lang.Thread.run(Thread.java:745) 

    Driver stacktrace: 
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) 
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
      at scala.Option.foreach(Option.scala:236) 
      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) 
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) 
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Почему соединение закрыто? Было ли исключение в другом месте, которое я не вижу?

FIX

Согласно maasg ниже, используя Casbah для записи результатов работы. Я обновил код следующим образом,

import com.mongodb.casbah.Imports._ 
... 
    println (s"Saving ${out.count} elements") 
    val uri = MongoClientURI("mongodb://some.local.ip:27017/mydb.garbage") 
    val mongoClient = MongoClient(uri) 
    val collection = mongoClient(uri.database.get)(uri.collection.get) 
    collection.drop() 
    val builder = collection.initializeUnorderedBulkOperation 
    for ((value, index) <- out.collect()) { builder.insert(MongoDBObject(("_id" -> value), ("value" -> index))) } 
    builder.execute() 

ЛУЧШЕ FIX

Вот лучший вариант, который будет делать одну пакетную запись на раздел

... 
    def dropCollection(uriString: String) { 
    val uri = MongoClientURI(uriString) 
    val mongoClient = MongoClient(uri) 
    val collection = mongoClient(uri.database.get)(uri.collection.get) 
    mongoClient.close() 
    } 

    def saveReultsToMongo(out: RDD[(String,Int)], uriString: String) { 
    out.foreachPartition(itr => { 
     val uri = MongoClientURI(uriString) 
     val mongoClient = MongoClient(uri) 
     val collection = mongoClient(uri.database.get)(uri.collection.get) 
     val builder = collection.initializeUnorderedBulkOperation 
     for ((value, index) <- itr){ builder.insert(MongoDBObject(("_id" -> value), ("value" -> index))) } 
     builder.execute 
     mongoClient.close 
     }) 
    } 
... 
    println (s"Saving ${out.count} elements") 
    dropCollection("mongodb://10.22.128.84:27017/Minerva.garbage") 
    saveReultsToMongo(out, "mongodb://10.22.128.84:27017/Minerva.garbage") 

Пара отмечает,

  • out.foreach{ case (value, index) => builder.insert(MongoDBObject(("_id" -> value), ("value" -> index))) } не работает, потому что BulkWriteOperation является не сериализуемым
    • однако, out.foreachPartition могут быть использованы в соответствии с maasg и ЛУЧШЕГО FIX
  • Casbah 1.8.1 не совместим с 3.0.x Монго-Java-драйвер использует 2.13.1
+0

У исполнителей есть свои отдельные журналы. Вы должны определенно взглянуть туда, но неясно, найдете ли вы более информативную трассировку стека. –

+0

Я проверил журналы, здесь: 'spark/work/app-20150515141801-0008 $ cat */stderr | less' Но, я не вижу никакой дополнительной информации. – Russell

+0

@Russell использует 'rdd.foreachPartition {...}'. вместо 'foreach' .. Вы также должны установить и закрыть соединение db в пределах закрытия foreachPartition. Затем он будет выполняться параллельно на каждом узле, содержащем раздел этого rdd. – maasg

ответ

2

До версии 1.4 отсутствует соединитель hadoop-mongo ненадежный, работающий с Spark. Высокая параллельная загрузка приведет к утечке клиентских соединений, что приведет к сбою. В нашем случае эта ошибка была критической точкой: https://jira.mongodb.org/browse/HADOOP-143 Как вы можете видеть, она объединена в 1.4 rel.

В качестве обходного пути я могу порекомендовать использовать операционные операции casbah client + (оболочка scala вокруг клиента Java).

Смежные вопросы