2016-02-22 2 views
6

Я попытался использовать Spark для работы над простой задачей графа. Я нашел пример программы в исходной папке Spark: transitive_closure.py, которая вычисляет транзитивное замыкание в графе с не более чем 200 ребрами и вершинами. Но в моем собственном ноутбуке он работает более 10 минут и не заканчивается. В командной строке я использую: spark-submit transitive_closure.py.Программа программы Spark работает очень медленно

Интересно, почему искра настолько медленна, даже при вычислении такого малого транзитивного результата закрытия? Это обычный случай? Есть ли какая-то конфигурация, которую я пропускаю?

Программа показана ниже и может быть найдена в папке искрообразования на их веб-сайте.

from __future__ import print_function 

import sys 
from random import Random 

from pyspark import SparkContext 

numEdges = 200 
numVertices = 100 
rand = Random(42) 


def generateGraph(): 
    edges = set() 
    while len(edges) < numEdges: 
     src = rand.randrange(0, numEdges) 
     dst = rand.randrange(0, numEdges) 
     if src != dst: 
      edges.add((src, dst)) 
    return edges 


if __name__ == "__main__": 
    """ 
    Usage: transitive_closure [partitions] 
    """ 
    sc = SparkContext(appName="PythonTransitiveClosure") 
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 
    tc = sc.parallelize(generateGraph(), partitions).cache() 

    # Linear transitive closure: each round grows paths by one edge, 
    # by joining the graph's edges with the already-discovered paths. 
    # e.g. join the path (y, z) from the TC with the edge (x, y) from 
    # the graph to obtain the path (x, z). 

    # Because join() joins on keys, the edges are stored in reversed order. 
    edges = tc.map(lambda x_y: (x_y[1], x_y[0])) 

    oldCount = 0 
    nextCount = tc.count() 
    while True: 
     oldCount = nextCount 
     # Perform the join, obtaining an RDD of (y, (z, x)) pairs, 
     # then project the result to obtain the new (x, z) paths. 
     new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0])) 
     tc = tc.union(new_edges).distinct().cache() 
     nextCount = tc.count() 
     if nextCount == oldCount: 
      break 

    print("TC has %i edges" % tc.count()) 

    sc.stop() 

ответ

4

Там можно много причин, почему этот код делает особенно хорошо выполнять на вашей машине, но, скорее всего, это просто еще один вариант проблемы, описанной в Spark iteration time increasing exponentially when using join. Самый простой способ проверить, если это действительно так, чтобы обеспечить spark.default.parallelism параметр на представить:

bin/spark-submit --conf spark.default.parallelism=2 \ 
    examples/src/main/python/transitive_closure.py 

Если не ограничено иначе, SparkContext.union, RDD.join и RDD.union установить ряд разделов ребенка к общему числу разделов в родителях. Обычно это желаемое поведение, но может быть крайне неэффективным, если оно применяется итеративно.

+1

Спасибо. Действительно полезно. У меня есть еще один вопрос, если вы можете помочь, я буду очень благодарен. Предположим, у меня есть программа, которая использует множество реляционных операций, таких как объединение, выбор, объединение, обновление и т. Д. В цикле до тех пор, пока факты в отношениях к фиксированной точке. Даже с полными кортежами не более 50, я застрял на второй итерации и исключении размера Java-кучи. Я использовал cache() и объединил (1) в каждой операции с файловой системой. Что может быть проблемой, о которой вы думаете? – c21

0

Useage говорит командная строка

transitive_closure [partitions] 

Настройки по умолчанию параллелизм поможет только стыкам в каждом разделе, а не щётки распределения работы.

Я собираюсь утверждать, что необходимо использовать БОЛЬШЕ разделов. Установка параллелизма по умолчанию может по-прежнему помогать, но код, который вы выложили, явно задает номер (переданный аргумент или 2, в зависимости от того, что больше). Абсолютным минимумом должны быть ядра, доступные для Spark, иначе вы всегда будете работать менее чем на 100%.

+0

Здесь нет никакой ценности в увеличении параллелизма. Фактически данный объем данных вы можете получить больше, уменьшив его до 1 :) Не говоря уже о том, чтобы сбросить Spark вообще. – zero323

Смежные вопросы