Как результаты фильтра могут быть мгновенными?
Выполнение вашего заявления возвращается так быстро, потому что оно фактически не выполняет фильтрацию. Spark использует ленивую оценку: она фактически не выполняет преобразования, пока вы не выполните действие, которое фактически собирает результаты. Вызов метода преобразования, например filter
, просто создает новый RDD, который представляет это преобразование и его результат. Вы должны выполнить действие как collect
или count
на самом деле он будет выполнен:
def myGraph: Graph = ???
// No filtering actually happens yet here, the results aren't needed yet so Spark is lazy and doesn't do anything
val filteredEdges = myGraph.edges.filter()
// Counting how many edges are left requires the results to actually be instantiated, so this fires off the actual filtering
println(filteredEdges.count)
// Actually gathering all results also requires the filtering to be done
val collectedFilteredEdges = filteredEdges.collect
Обратите внимание, что в этих примерах результаты фильтра не сохраняется между ними: из-за лени фильтрации повторяется для обоих действия. Чтобы предотвратить это дублирование, вы должны изучить функциональность кэширования Spark, прочитав подробности о преобразованиях и действиях и о том, что Spark действительно делает за сценой: https://spark.apache.org/docs/latest/programming-guide.html#rdd-operations.
Как именно работает поиск фильтра для моих запрошенных ребер (когда я выполняю действие)?
в Spark GraphX ребра хранятся в RDD типа EdgeRDD[ED]
, где ED
- тип атрибута вашего края, в вашем случае String
. Этот специальный RDD выполняет некоторые специальные оптимизации в фоновом режиме, но для ваших целей он ведет себя как его суперкласс RDD[Edge[ED]]
, и фильтрация происходит как фильтрация любого RDD: он будет перебирать все элементы, применяя данный предикат к каждому. Однако RDD разбивается на несколько разделов, и Spark будет фильтровать несколько разделов параллельно; в вашем случае, когда вы, кажется, запускаете Spark локально, он будет делать столько же, сколько количество ядер, которые вы имеете, или сколько вы указали явно с помощью --master local[4]
, например.
RDD с ребрами разбит на основе PartitionStrategy
, который задан, например, если вы создали свой график с помощью Graph.fromEdgeTuples
или по телефону partitionBy
на вашем графике. Однако все стратегии основаны на вершинах края, поэтому не имеют никаких знаний о вашем атрибуте и поэтому не влияют на вашу операцию фильтрации, за исключением, возможно, некоторой несбалансированной сетевой нагрузки, если вы запустили ее в кластере, cat 'заканчиваются тем же самым разделом/исполнителем, и вы делаете collect
или какую-то операцию в случайном порядке. См. GraphX docs on Vertex and Edge RDDs для получения дополнительной информации о том, как графические представления представлены и разделены.
Я не думал, что нужно показать все, что я сделал. Я просто хочу знать, как просматриваются ребра, когда я использую фильтр. Я имею в виду, когда я вызываю Mygraph.edges.filter (_. Attr (0) .equals («cat»)), как ребра, которые ищут атрибут «cat»? – CMWasiq
Да, говоря, что Spark делает ленивую оценку, ничего не говорит о том, как значения распределяются и распределяются при фактическом применении фильтра. –
Достаточно справедливо, я сосредоточился на первом запуске вопроса: он увидел мгновенный возврат, который теперь с добавленным вызовом показывает, что в первую очередь не было фильтрации. Попробуйте 'Mygraph.edges.filter (_. Attr (0) .equals (" cat ")). Count()', чтобы увидеть, как долго проходит фильтрация. Теперь я сделал быстрое редактирование с базовым ответом для этого случая, я могу немного глубже понять его позже. – sgvd