2017-01-10 4 views
4

Я создаю график из gz сжатого json файла edge и vertices типа.Кратчайший путь в Graphx с Spark

Я поместил файлы в папку Dropbox here

загружаю и сопоставить эти json записи для создания vertices и edge типы требуемых graphx так:

val vertices_raw = sqlContext.read.json("path/vertices.json.gz") 
val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index"))) 
val verticesRDD: RDD[(VertexId, Long)] = vertices 
val edges_raw = sqlContext.read.json("path/edges.json.gz") 
val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length")))) 
val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut) 

Затем я использую этот dijkstra я нашел для вычисления кратчайшего пути между двумя вершинами:

def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = { 
      var g2 = g.mapVertices(
     (vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]()) 
     ) 
      for (i <- 1L to g.vertices.count - 1) { 
      val currentVertexId: VertexId = g2.vertices.filter(!_._2._1) 
       .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
       (a, b) => if (a._2._2 < b._2._2) a else b) 
       ._1 

      val newDistances: VertexRDD[(Double, List[VertexId])] = 
       g2.aggregateMessages[(Double, List[VertexId])](
      ctx => if (ctx.srcId == currentVertexId) { 
       ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId)) 
      }, 
      (a, b) => if (a._1 < b._1) a else b 
     ) 
     g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => { 
      val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]())) 
      (
      vd._1 || vid == currentVertexId, 
      math.min(vd._2, newSumVal._1), 
      if (vd._2 < newSumVal._1) vd._3 else newSumVal._2 
      ) 
     }) 
     } 

      g.outerJoinVertices(g2.vertices)((vid, vd, dist) => 
     (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]())) 
      .productIterator.toList.tail 
     )) 
     } 

Я беру две случайные вершины идентификаторов:

val v1 = 4000000028222916L 
val v2 = 4000000031019012L 

и вычислить путь между ними:

val results = dijkstra(my_graph, v1).vertices.map(_._2).collect 

Я не могу вычислить это локально на моем ноутбуке, не получая ошибки StackOverflow. Я вижу, что он использует 3 из 4 ядер. Я могу загрузить этот график и вычислить кратчайшие 10 путей в секунду с помощью библиотеки igraph в Python на точно таком же графике. Является ли это неэффективным средством вычисления путей? В масштабе, на нескольких узлах, пути будут вычисляться (нет ошибки stackoverflow), но это все равно 30/40 секунд для каждого вычисления пути.

ответ

0

Как вы можете прочитать на python-igraph github

«Он предназначен для быть мощным (то есть. Быстро), как это возможно, чтобы позволить анализ больших графов.»

Для того, чтобы объяснить, почему он принимает более 4000x время на апача-искры, чем на местном питона, вы можете посмотреть here (глубокое погружение в узкие места в производительности с Спарк членами PMC Kay Остераут.), Чтобы увидеть, что это, вероятно, из-за узкого места:

... начиная с идеи о том, что сети и дискового ввода/вывода являются основными узкие ... Вам не нужно хранить данные в памяти, потому что работа может не так быстро. Это говорит, что если вы переместили сериализованные сжатые данные на диске в памяти ...

вы также можете увидеть here & here некоторой информации, но лучше последний метод является бенчмарка вашего кода, чтобы знать где узким местом является.

Смежные вопросы