2016-10-26 2 views
2

Я пытаюсь использовать подключенные компоненты, но проблема с масштабированием. Мой Вот что у меня есть -Spark - GraphX ​​- масштабирование связанных компонентов

// get vertices 
val vertices = stage_2.flatMap(x => GraphUtil.getVertices(x)).cache 

// get edges 
val edges = stage_2.map(x => GraphUtil.getEdges(x)).filter(_ != null).flatMap(x => x).cache 

// create graph 
val identityGraph = Graph(vertices, edges) 

// get connected components 
val cc = identityGraph.connectedComponents.vertices 

Где, GraphUtil имеет вспомогательные функции для возврата вершин и ребер. На этом этапе мой график имеет ~ 1 миллион узлов и ~ 2 миллиона ребер (кстати, ожидается, что он вырастет до 100 миллионов узлов). Мой график довольно редко связан, поэтому я ожидаю множество небольших графиков.

Когда я запускаю вышеуказанное, я продолжаю получать java.lang.OutOfMemoryError: Java heap space. Я пробовал с executor-memory 32g и запускал кластер из 15 узлов с 45 г в качестве размера контейнера пряжи.

Вот деталь исключение:

16/10/26 10:32:26 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext 
java.lang.OutOfMemoryError: Java heap space 
    at java.util.Arrays.copyOfRange(Arrays.java:2694) 
    at java.lang.String.<init>(String.java:203) 
    at java.lang.StringBuilder.toString(StringBuilder.java:405) 
    at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:360) 
    at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:98) 
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2216) 
    at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32) 
    at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44) 
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146) 
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146) 
    at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:173) 
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34) 
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55) 
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64) 
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) 

Кроме того, я получаю много из следующих журналов:

16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 320 is 263 bytes 
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 321 is 268 bytes 
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 322 is 264 bytes 

Мой вопрос кто-нибудь пробовал ConnectedComponents в таком масштабе? Если да, что я делаю неправильно?

ответ

1

Алгоритм подключенного компонента не очень хорошо масштабируется, и его производительность сильно зависит от топологии вашего графика. Сокращение краев не означает, что у вас небольшие компоненты. Длинная строка ребер очень разрежена (количество ребер = число вершин - 1), но алгоритм грубой силы, реализованный в GraphX, не будет очень эффективным (см. Источник cc и pregel).

Вот что вы можете попробовать (сортируют, только код):

  1. Checkpoint ваши вершины и ребра паркетом (на диске), а затем загрузить их снова, чтобы построить график вы. Кэширование иногда просто не сокращает его, когда план выполнения слишком велик.
  2. Преобразуйте свой график таким образом, чтобы результат алгоритма оставался неизменным. Например, вы можете видеть в code, что алго распространяет информацию в обоих направлениях (как и должно быть, по умолчанию). Поэтому, если у вас несколько ребер, соединяющих одни и те же две вершины, отфильтруйте их из своего графика, на котором вы применяете альго.
  3. Оптимизировать Graphx код самостоятельно (это действительно довольно просто), используя либо общую память сохранения оптимизации (т.е. контрольной точки на диске при каждой итерации, чтобы избежать OOM), или домен конкретных оптимизации (подобно точке 2)

Если вы согласны оставить GraphX ​​(который становится несколько устаревшим) позади, вы можете рассмотреть GraphFrames (package, blog ). Я никогда не пробовал, поэтому я не знаю, есть ли у него CC.

Я уверен, вы можете найти другие возможности среди искровых пакетов, но, возможно, вы даже захотите использовать что-то вне Spark. Но это выходит за рамки вопроса.

Удачи!

+1

GraphFrames использует DataFrames и GraphX ​​под капотом, поэтому я не понимаю, как это поможет OP. – eliasah

+0

@eliasah Надеюсь, GraphFrames будет более оптимизирован, чем GraphX. Тот факт, что они работают на DataFrames, является хорошим признаком, потому что тогда они могут использовать оптимизатор катализатора и вольфрам. Как я уже сказал, я не пытался, я просто надежен. – Wilmerton

+0

GraphX ​​- это проект, над которым никто не хочет работать, потому что теория базового графика трудно масштабировать. К сожалению, это «почти» мертвый проект. Я не думаю, что GraphFrames будет намного дальше, чем GraphX. – eliasah

0

Как я писал выше в комментариях, я реализовал подключенный компонент, используя map/reduce на Spark. Вы можете найти более подробную информацию здесь - https://www.linkedin.com/pulse/connected-component-using-map-reduce-apache-spark-shirish-kumar и исходный код лицензии MIT здесь - https://github.com/kwartile/connected-component.