2015-08-17 2 views
0

Я пытался выполнить программу SCALA, а выход как-то всегда кажется, что-то вроде этого:OutOfMemoryError: пространство кучи Java и памяти переменных в Спарк

15/08/17 14:13:14 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext 
java.lang.OutOfMemoryError: Java heap space 
at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:64) 
at java.lang.StringBuilder.<init>(StringBuilder.java:97) 
at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:339) 
at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83) 
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2344) 
at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32) 
at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44) 
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143) 
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143) 
at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:169) 
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34) 
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56) 
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37) 
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79) 
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215) 
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) 

или как это

15/08/19 11:45:11 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext 
java.lang.OutOfMemoryError: GC overhead limit exceeded 
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:526) 
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:505) 
    at com.fasterxml.jackson.databind.ObjectMapper._serializerProvider(ObjectMapper.java:2846) 
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) 
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) 
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) 
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) 
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) 
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) 
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) 
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) 
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) 
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) 
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) 
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) 
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) 
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) 
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) 
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) 
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22) 
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) 
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) 
    at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:2881) 

Являются ли эти ошибки на стороне водителя или исполнителя?

Я немного смущен переменными памяти, которые использует Spark. Мои текущие настройки

spark-env.sh

export SPARK_WORKER_MEMORY=6G 
export SPARK_DRIVER_MEMORY=6G 
export SPARK_EXECUTOR_MEMORY=4G 

искрового defaults.conf

# spark.driver.memory    6G 
# spark.executor.memory   4G 
# spark.executor.extraJavaOptions ' -Xms5G -Xmx5G ' 
# spark.driver.extraJavaOptions ' -Xms5G -Xmx5G ' 

ли мне нужно раскомментировать любого из переменных, содержащихся в искровом defaults.conf, или они избыточны?

Есть, например, установка SPARK_WORKER_MEMORY, эквивалентная установке spark.executor.memory?

Часть моего кода, где лестницу он останавливается после нескольких итераций:

val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct.collect 
    for (id <- filteredNodesGroups){ 
     val clusterGraph = connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id) 
     val pagerankGraph = clusterGraph.pageRank(0.15) 
     val completeClusterPagerankGraph = clusterGraph.outerJoinVertices(pagerankGraph.vertices) { 
      case (uid, attrList, Some(pr)) => 
       attrList :+ ("inClusterPagerank:" + pr) 
      case (uid, attrList, None) => 
       attrList :+ "" 
     } 
     val sortedClusterNodes = completeClusterPagerankGraph.vertices.toArray.sortBy(_._2(pagerankIndex + 1)) 
     println(sortedClusterNodes(0)._2(1) + " with rank: " + sortedClusterNodes(0)._2(pagerankIndex + 1)) 

    }   

Многие вопросы, замаскированные как единое целое. Заранее спасибо!

+0

Это действительно зависит от того, чего вы пытаетесь достичь, не могли бы вы предоставить какой-то код для иллюстрации? –

+0

Просто добавьте, что мастер часто умирает после неудачного выполнения. – sofia

+0

Я обновил исходное сообщение, чтобы включить код – sofia

ответ

0

Я не эксперт Спарк, но есть линия, которая кажется мне подозрительным:

val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct.collect 

В принципе, используя оплаченный метод, вы получаете обратно все данные из исполнителей (еще до того, обрабатывая его) водителю. У вас есть представление о размере этих данных?

Чтобы исправить это, вы должны действовать более функционально. Для того, чтобы извлечь различные значения, можно, например, использовать GroupBy и карту:

val pairs = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) } 
pairs.groupBy(_./* the property to group on */) 
    .map { case (_, arrays) => /* map function */ } 

Относно сборные, должна быть способом сортировки каждого раздела, а затем вернуть (обрабатывается) приводят к водителю. Я хотел бы помочь вам больше, но мне нужно больше информации о том, что вы пытаетесь сделать.

UPDATE

Порывшись немного, вы можете сортировать ваши данные с помощью перетасовки, как описано here

UPDATE

До сих пор я пытался избежать собирать, и как можно больше получить данные обратно в драйвер, но я не знаю, как это можно решить:

val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct() 
val clusterGraphs = filteredNodesGroups.map { id => connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id) } 
val pageRankGraphs = clusterGraphs.map(_.pageRank(0.15)) 

В принципе, вам нужно присоединиться к двум RDD [Graph [Array [String], String]], но я не знаю, какой ключ использовать, а во-вторых, это обязательно вернет RDD RDD (я не знаю если вы даже можете это сделать). Я попытаюсь найти что-то позже в этот день.

+0

Я действительно собираю группы в начале. 'filtersNodesGroups' содержит целые числа, каждый из которых представляет собой узел, поэтому итерация (которая происходит локально) вычисляет подграф, а затем - индекс на каждом подграфе. – sofia

+0

Идея проста: мне нужно разбить график на меньшие графики (в соответствии с одним из атрибутов, который содержит каждый узел, он переходит к конкретному подграфу). Затем мне нужно вычислить pagerank на каждом из подграфов и напечатать узел с самым большим pagerank. Большинство подграфов имеют длину 2-20 узлов и один подграф, содержащий около 13 000 узлов и несколько тысяч ребер. Однако вычисление останавливается обычно до достижения этого подграфа. Поэтому я не могу оправдать все эти расходы на память. Есть идеи? – sofia

+0

Это полный код, внутри 'Защита clusterElements (connCompGraph: График [Array [String], String], pagerankIndex: Int) = {... приведенный выше код ...}' Действительно, connCompGraph является a График. – sofia