Я создаю график из gz
сжатого json
файла edge
и vertices
типа.Кратчайший путь в Graphx с Spark
Я поместил файлы в папку Dropbox here
загружаю и сопоставить эти json
записи для создания vertices
и edge
типы требуемых graphx
так:
val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))
val verticesRDD: RDD[(VertexId, Long)] = vertices
val edges_raw = sqlContext.read.json("path/edges.json.gz")
val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"))))
val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)
Затем я использую этот dijkstra
я нашел для вычисления кратчайшего пути между двумя вершинами:
def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
var g2 = g.mapVertices(
(vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
)
for (i <- 1L to g.vertices.count - 1) {
val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
.fold((0L, (false, Double.MaxValue, List[VertexId]())))(
(a, b) => if (a._2._2 < b._2._2) a else b)
._1
val newDistances: VertexRDD[(Double, List[VertexId])] =
g2.aggregateMessages[(Double, List[VertexId])](
ctx => if (ctx.srcId == currentVertexId) {
ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
},
(a, b) => if (a._1 < b._1) a else b
)
g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]()))
(
vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
)
})
}
g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
.productIterator.toList.tail
))
}
Я беру две случайные вершины идентификаторов:
val v1 = 4000000028222916L
val v2 = 4000000031019012L
и вычислить путь между ними:
val results = dijkstra(my_graph, v1).vertices.map(_._2).collect
Я не могу вычислить это локально на моем ноутбуке, не получая ошибки StackOverflow. Я вижу, что он использует 3 из 4 ядер. Я могу загрузить этот график и вычислить кратчайшие 10 путей в секунду с помощью библиотеки igraph
в Python на точно таком же графике. Является ли это неэффективным средством вычисления путей? В масштабе, на нескольких узлах, пути будут вычисляться (нет ошибки stackoverflow), но это все равно 30/40 секунд для каждого вычисления пути.