2016-06-13 2 views
3

Я пытаюсь в полной мере использовать сериализацию крио для искры. УстановкаКак зарегистрировать byte [] [] с использованием сериализации kryo для искры

.set("spark.kryo.registrationRequired", "true") 

Это даст мне знать, какие классы необходимо зарегистрировать. Я зарегистрировал около 40 классов, некоторые из моих классов и некоторые классы искры. Я последовал за сообщением Require kryo serialization in Spark (Scala), чтобы зарегистрировать/настроить все.

Теперь я столкнулся с следующим и не могу понять, как зарегистрировать его в scala. Кто-нибудь решил эту проблему?

Я перепробовал кучу различных комбинаций, включая:

kryo.register(classOf[Array[Array[Byte]]]) 
conf.set("classesToRegister", "classOf[Array[Array[Byte]]]") 
conf.registerKryoClasses(Array(classOf[Array[Array[Byte]]])) 

Я нашел безответный пост https://mail-archives.apache.org/mod_mbox/spark-user/201603.mbox/%[email protected].com%3E о том, с той же проблемой.

java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: byte[][] 
Note: To register this class use: kryo.register(byte[][].class); 
Serialization trace: 
buffers (org.apache.spark.sql.columnar.CachedBatch) 
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) 
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) 
at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) 
at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) 
at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) 
at org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:191) 
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:480) 
at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) 
at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) 
at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) 
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) 
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) 
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) 
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) 
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) 
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) 
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
at java.lang.Thread.run(Thread.java:745) 

ответ

7
conf.registerKryoClasses(Array(Class.forName("[[B"))) 

должен работать

+0

Отлично спасибо! –

+0

Это полезно. У меня была такая же проблема, и это помогло мне это исправить. Однако я не понимаю, как [[B соответствует байту [] []. Я знаю, что это старый пост, но если кто-то может дать более глубокое объяснение, это будет полезно. –

+0

@TimRyan См. Https://docs.oracle.com/javase/6/docs/api/java/lang/Class.html#getName%28%29 –

Смежные вопросы