2015-10-12 4 views
2

Я использую будущее, чтобы выполнить операцию блокировки на РДУ, как это:Спарк РДД Блок Удалены Перед использованием

dStreams.foreach(_.foreachRDD { rdd => 

    Future{ writeRDD(rdd) } 

}) 

Иногда я получаю эту ошибку:

org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[820] at actorStream at Tests.scala:149 after its blocks have been removed! 

Похоже, Спарк не зная, когда это RDD следует удалить.

Почему это происходит и каково решение?

Update:

Я думаю, РД может быть GC'd, прежде чем они будут использованы. Только рабочий раствор до сих пор включает в себя установку

conf.set("spark.streaming.unpersist", "false") 

И unpersist() -ную вручную.

Полный трассировки стека в случае это ошибка:

15/10/12 23:57:23 ERROR org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation: Aborting job. 
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[765] at actorStream at NxCoreSparkTests.scala:168 after its blocks have been removed! 
org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83) 
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251) 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251) 
scala.Option.getOrElse(Option.scala:120) 
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:250) 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1394) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404) 
scala.collection.immutable.List.foreach(List.scala:318) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402) 
scala.collection.immutable.List.foreach(List.scala:318) 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404) 
scala.collection.immutable.List.foreach(List.scala:318) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402) 
scala.collection.immutable.List.foreach(List.scala:318) 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402) 
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1368) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:829) 
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:827) 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
scala.collection.Iterator$class.foreach(Iterator.scala:727) 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
scala.collection.AbstractTraversable.map(Traversable.scala:105) 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:827) 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772) 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757) 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463) 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) 
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:836) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772) 
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:927) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:927) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) 
    at vscan.NxCoreSparkDbUtil$.writeToParquetByDay(NxCoreSparkTapeReader.scala:210) 
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(NxCoreSparkTests.scala:190) 
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(NxCoreSparkTests.scala:188) 
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(NxCoreSparkTests.scala:188) 
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:217) 
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:219) 
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:219) 
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:219) 
    at vscan.NxCoreSparkGoogleHDFS$.retry(NxCoreSparkTests.scala:219) 
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1.apply$mcV$sp(NxCoreSparkTests.scala:188) 
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1.apply(NxCoreSparkTests.scala:185) 
    at vscan.NxCoreSparkGoogleHDFS$$anonfun$6$$anonfun$apply$3$$anonfun$1.apply(NxCoreSparkTests.scala:185) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[765] at actorStream at NxCoreSparkTests.scala:168 after its blocks have been removed! 
    at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83) 
    at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56) 
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251) 
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:250) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1394) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402) 
    at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1368) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:829) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:827) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:827) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772) 
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
15/10/12 23:57:24 ERROR org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer: Job job_201510122357_0000 aborted. 
+1

Я думаю, вы не должны использовать async ops там, ИМХО. Даже если ваши записи на диске блокируются, он не должен блокировать потоковое воспроизведение данных или обработки. – mehmetminanc

+0

mehmetminanc правильный. @BAR: Можете ли вы поделиться полной трассировкой стека? – tuxdna

+0

@mehmetminanc Это определенно блокирует меня в локальном режиме с более чем достаточным количеством ядер и параллелизмом. Следующая запись не начнется, пока не завершится предыдущая запись. Когда я добавляю «Будущее», запись выполняется одновременно. Может быть, вы можете предложить другой подход? – BAR

ответ

2

Я думаю, что проблема заключается в том, что перед кодом внутри writeRDD(rdd) выполняется (так как он находится в Future), ДРР (или микро- пакетная RDD) уже исправлена ​​с помощью Apache Spark memory-management или BlockManager.

Таким образом, эта ошибка

org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[820] at actorStream at Tests.scala:149 after its blocks have been removed!

Вы можете исправить это первый сбор коллекции микро-партии, а затем передавать его writeRDD() функции. Что-то вроде этого:

dStreams.foreach(_.foreachRDD { rdd => 

    val coll = rdd.collect() 
    Future{ writeCollection(coll) } 

}) 
+0

of course 'collect' создаст коллекцию, а НЕ и RDD. Где ошибка, которую вы сейчас получаете? – tuxdna

+0

Такая же ошибка. Ничего не изменилось, что удивительно, потому что сбор определенно происходит за пределами будущего. Возможно, этот сборник лениво оценивается, поэтому нужно что-то с ним делать, чтобы выполнить свою DAG. Может быть так же просто, как написать 'col1' сразу после его объявления. – BAR

Смежные вопросы