2016-05-07 2 views
0

Я очень новичок в Spark и не знаю основ, я просто прыгнул в него, чтобы решить проблему. Решение проблемы включает в себя создание графика (с использованием GraphX), где ребра имеют строковый атрибут. Пользователь может запросить этот график, и я обрабатываю запросы, отфильтровывая только те ребра, у которых есть строковый атрибут, который равен запросу пользователя.Как работает фильтр Spark на грани GraphX?

Теперь мой график имеет более 16 миллионов ребер; для создания графика требуется более 10 минут, когда я использую все 8 ядер моего компьютера. Однако, когда я запрашиваю этот график (как я уже упоминал выше), я получаю результаты мгновенно (к моему приятному удивлению).

Итак, мой вопрос в том, как точно работает поиск фильтра для моих запрошенных ребер? Рассматривает ли они их итеративно? Происходит ли поиск краев на нескольких ядрах, и это кажется очень быстрым? Или есть какое-то хеширование?

Вот пример того, как я использую фильтр: Mygraph.edges.filter (_. Attr (0) .equals ("cat")), что означает, что я хочу получить ребра, у которых есть атрибут cat " в них. Как просматриваются ребра?

ответ

2

Как результаты фильтра могут быть мгновенными?

Выполнение вашего заявления возвращается так быстро, потому что оно фактически не выполняет фильтрацию. Spark использует ленивую оценку: она фактически не выполняет преобразования, пока вы не выполните действие, которое фактически собирает результаты. Вызов метода преобразования, например filter, просто создает новый RDD, который представляет это преобразование и его результат. Вы должны выполнить действие как collect или count на самом деле он будет выполнен:

def myGraph: Graph = ??? 

// No filtering actually happens yet here, the results aren't needed yet so Spark is lazy and doesn't do anything 
val filteredEdges = myGraph.edges.filter() 

// Counting how many edges are left requires the results to actually be instantiated, so this fires off the actual filtering 
println(filteredEdges.count) 

// Actually gathering all results also requires the filtering to be done 
val collectedFilteredEdges = filteredEdges.collect 

Обратите внимание, что в этих примерах результаты фильтра не сохраняется между ними: из-за лени фильтрации повторяется для обоих действия. Чтобы предотвратить это дублирование, вы должны изучить функциональность кэширования Spark, прочитав подробности о преобразованиях и действиях и о том, что Spark действительно делает за сценой: https://spark.apache.org/docs/latest/programming-guide.html#rdd-operations.

Как именно работает поиск фильтра для моих запрошенных ребер (когда я выполняю действие)?

в Spark GraphX ​​ребра хранятся в RDD типа EdgeRDD[ED], где ED - тип атрибута вашего края, в вашем случае String. Этот специальный RDD выполняет некоторые специальные оптимизации в фоновом режиме, но для ваших целей он ведет себя как его суперкласс RDD[Edge[ED]], и фильтрация происходит как фильтрация любого RDD: он будет перебирать все элементы, применяя данный предикат к каждому. Однако RDD разбивается на несколько разделов, и Spark будет фильтровать несколько разделов параллельно; в вашем случае, когда вы, кажется, запускаете Spark локально, он будет делать столько же, сколько количество ядер, которые вы имеете, или сколько вы указали явно с помощью --master local[4], например.

RDD с ребрами разбит на основе PartitionStrategy, который задан, например, если вы создали свой график с помощью Graph.fromEdgeTuples или по телефону partitionBy на вашем графике. Однако все стратегии основаны на вершинах края, поэтому не имеют никаких знаний о вашем атрибуте и поэтому не влияют на вашу операцию фильтрации, за исключением, возможно, некоторой несбалансированной сетевой нагрузки, если вы запустили ее в кластере, cat 'заканчиваются тем же самым разделом/исполнителем, и вы делаете collect или какую-то операцию в случайном порядке. См. GraphX docs on Vertex and Edge RDDs для получения дополнительной информации о том, как графические представления представлены и разделены.

+0

Я не думал, что нужно показать все, что я сделал. Я просто хочу знать, как просматриваются ребра, когда я использую фильтр. Я имею в виду, когда я вызываю Mygraph.edges.filter (_. Attr (0) .equals («cat»)), как ребра, которые ищут атрибут «cat»? – CMWasiq

+1

Да, говоря, что Spark делает ленивую оценку, ничего не говорит о том, как значения распределяются и распределяются при фактическом применении фильтра. –

+0

Достаточно справедливо, я сосредоточился на первом запуске вопроса: он увидел мгновенный возврат, который теперь с добавленным вызовом показывает, что в первую очередь не было фильтрации. Попробуйте 'Mygraph.edges.filter (_. Attr (0) .equals (" cat ")). Count()', чтобы увидеть, как долго проходит фильтрация. Теперь я сделал быстрое редактирование с базовым ответом для этого случая, я могу немного глубже понять его позже. – sgvd