0

Я пытаюсь получить данные из Cassandra 3.9 с использованием SparkCassandraConnector. У меня несколько Spark (1.6) Рабочие места, в которых используются одни и те же данные. Итак, я кэшировал его, используя следующий код. enter image description hereОшибка при извлечении блоков из другого искрового узла при кэшировании Rdds

Спарк Код:

sc.parallelize(partitions, 2*sc.defaultParallelism).map(x => new Partition(x)).joinWithCassandraTable("KEYSPACE","COLUMNFAMILY").on(SomeColumns("partitionkey")).select("partitionkey", "cookie", "query").cache() 

Но некоторые из задач, получить не удалось, и выдает следующее исключение:

org.apache.spark.storage.BlockFetchException: Failed to fetch block from 1 locations. Most recent failure cause: 
     at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) 
     at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585) 
     at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570) 
     at org.apache.spark.storage.BlockManager.get(BlockManager.scala:630) 
     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
    Caused by: java.io.IOException: Connection from ubuntu/172.16.0.27:56727 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     ... 1 more 

enter image description here

Где, как я не получают каких-либо исключений в то время как мы не кэшируем данные. Более того, каждый узел содержит отображение для ubuntu и ubuntu1 их соответствующим IP-адресам в своем файле-хосте.

Также, как упоминалось на скриншоте, он разделил все данные 8 парити. Разъем SparkCassandra должен был распределить задания интеллектуально, но почему Locality_Level показывает это ANY Что означает, что он не смог найти данные на одном узле, почему?

ответ

0

Эта проблема была решена в новой версии:

ошибка, которая возникает из-за неудачных выборок из NON-перетасовки блоков (например, радиопередач или кэшированных RDD блоков).

https://issues.apache.org/jira/browse/SPARK-14209 
https://issues.apache.org/jira/browse/SPARK-17484 
Смежные вопросы