2016-02-01 3 views
0

Я пытаюсь запустить простой искровой потоковой работы, написанные на Python:Спарк Streaming: java.lang.OutOfMemoryError: Java куча пространства

#!/usr/bin/env python 
from pyspark import SparkContext, SparkConf 
from pyspark.streaming import StreamingContext 

conf = SparkConf() 
conf.setMaster("spark://master1:7077,master2:7077") 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc, 1) 

ssc.socketTextStream("master1", 9999).count().pprint() 

ssc.start() 
ssc.awaitTermination() 

Через пару секунд работает, задача не выполнена. Вот исключение, которое я вижу:

java.lang.OutOfMemoryError: Java heap space 
    at java.util.Arrays.copyOf(Arrays.java:3236) 
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) 
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) 
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) 
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) 
    at com.esotericsoftware.kryo.io.Output.require(Output.java:135) 
    at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:420) 
    at com.esotericsoftware.kryo.io.Output.writeString(Output.java:326) 
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) 
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) 
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) 
    at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) 
    at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) 
    at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) 
    at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) 
    at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:169) 
    at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:143) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) 
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) 
    at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:77) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:156) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:127) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$3.onPushBlock(ReceiverSupervisorImpl.scala:108) 
    at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:294) 
    at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:266) 
    at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:108) 

После этого начинается новая задача, поэтому работа продолжает работать. Однако я хотел бы знать, чего мне не хватает.

UPDATE

искровым defaults.conf

spark.serializer     org.apache.spark.serializer.KryoSerializer 
spark.driver.memory    4g 
spark.executor.memory   4g 
spark.executor.extraJavaOptions -XX:+PrintGCDetails 
spark.deploy.recoveryMode  ZOOKEEPER 
spark.deploy.zookeeper.url  master1:2181,master2:2181,master3:2181 
+0

Где вы видите это? Водитель или Исполнитель? Похоже, вам нужно увеличить память ваших исполнителей. Также расскажите о своих конфигурациях кластера. – Sumit

+0

Я вижу это исключение у исполнителей. Каждый исполнитель имеет 4 ГБ ОЗУ. Я обновил вопрос, разместив свой параметр spark-defaults.conf – facha

+0

. Каков размер/тип данных, полученных потоками в каждой партии? Если вы захватили GC-журналы, это тоже сообщение. ваша программа проста, но кажется, что скорость полученных данных слишком высока. Вы видите что-нибудь в Spark-UI, как задача Backlog и т. Д. – Sumit

ответ

0

Попробуйте установить ИСПОЛНИТЕЛЬ память on the application itself:

conf = SparkConf() 
conf.setMaster("spark://master1:7077,master2:7077") 
conf.set("spark.executor.memory", "4g") 
Смежные вопросы