2017-01-16 5 views
0

Я начал использовать искру совсем недавно. В настоящее время я тестирую двудольный граф, который имеет разные типы вершин и ребер.spark graphx multiple edge types

Из исследования, которое я сделал в графике, чтобы иметь разные ребра, а некоторые со свойствами, мне нужно подклассировать края.

Вот фрагмент кода:

scala> trait VertexProperty 
defined trait VertexProperty 

scala> case class paperProperty(val paperid: Long, val papername: String, val doi: String, val keywords: String) extends VertexProperty 
defined class paperProperty 

scala> case class authorProperty(val authorid: Long, val authorname: String) extends VertexProperty 
defined class authorProperty 

scala> val docsVertces: RDD[(VertexId, VertexProperty)] = docs.rdd.map(x => (x(0).asInstanceOf[VertexId],paperProperty(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[String],x(2).asInstanceOf[String],x(3).asInstanceOf[String]))) 
docsVertces: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, VertexProperty)] = MapPartitionsRDD[23] at map at <console>:47 

scala> val authorVertces: RDD[(VertexId, VertexProperty)] = authors.rdd.map(x => (x(0).asInstanceOf[VertexId],authorProperty(x(0).asInstanceOf[Long],x(1).asInstanceOf[String]))) 
authorVertces: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, VertexProperty)] = MapPartitionsRDD[24] at map at <console>:41 

scala> val vertices = VertexRDD(docsVertces ++ authorVertces) 
vertices: org.apache.spark.graphx.VertexRDD[VertexProperty] = VertexRDDImpl[28] at RDD at VertexRDD.scala:57 

scala> 

Однако я неудачу с краями.

scala> class EdgeProperty() 
defined class EdgeProperty 

scala> case class authorEdgeProperty(val doccount: Long) extends EdgeProperty() 
defined class authorEdgeProperty 

scala> case class citeEdgeProperty() extends EdgeProperty() 
defined class citeEdgeProperty 

scala> // edge using subclass will not work we need to have one consistent superclass 

scala> val docauthoredges = docauthor.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId],  authorEdgeProperty(x(1).asInstanceOf[Long]))) 
docauthoredges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[authorEdgeProperty]] = [srcId: bigint, dstId: bigint ... 1 more field] 

scala> val docciteedges = doccites.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId], citeEdgeProperty())) 
docciteedges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[citeEdgeProperty]] = [srcId: bigint, dstId: bigint ... 1 more field] 

scala> docauthoredges.unionAll(docciteedges) 
<console>:52: error: type mismatch; 
found : org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[citeEdgeProperty]] 
required: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[authorEdgeProperty]] 
     docauthoredges.unionAll(docciteedges) 
          ^

scala> 

Я попытался бросить край их суперкласса и я ПОЛУЧАТЬ следующее сообщение:

scala> val docauthoredges = docauthor.map(x => Edge(x(0).asInstanceOf[VertexId],x(1).asInstanceOf[VertexId],   authorEdgeProperty(x(1).asInstanceOf[Long]).asInstanceOf[EdgeProperty])) 
java.lang.UnsupportedOperationException: No Encoder found for EdgeProperty 
- field (class: "EdgeProperty", name: "attr") 
- root class: "org.apache.spark.graphx.Edge" 
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598) 
... 

Любая помощь будет принята с благодарностью

ответ

0

Ваш вопрос немного бесполезным, потому что Graphx Безразлично» t Datasets, и оба края и вершины должны быть переданы как RDDs, но ради аргумента:

  • Вы получаете первое исключение, потому что распределенные структуры данных в Spark являются инвариантными. Не используйте asInstanceOf. Просто будьте ясными с аннотациями типов.
  • Вы получаете второе исключение, так как Datasets также ограничено использованием Encoders. Весь объект в Dataset должен использовать тот же Encoder, что в этом случае возможно только с двоичным кодировщиком, который не будет неявно доступен для определенного пользователем класса.

С помощью этих двух частей в сочетании:

import org.apache.spark.sql.{Dataset, Encoders} 

sealed trait EdgeProperty 

case class AuthorEdgeProperty(val doccount: Long) extends EdgeProperty 
case class CiteEdgeProperty() extends EdgeProperty 

val docauthoredges: Dataset[EdgeProperty] = spark.range(10) 
    .map(AuthorEdgeProperty(_): EdgeProperty)(Encoders.kryo[EdgeProperty]) 

val docciteedges: Dataset[EdgeProperty] = spark.range(5) 
    .map(_ => CiteEdgeProperty(): EdgeProperty)(Encoders.kryo[EdgeProperty]) 

val edges: Dataset[EdgeProperty] = docauthoredges.union(docciteedges) 

Преобразовать в RDD, чтобы сделать его пригодным для использования в Graphx:

edges.rdd 
Смежные вопросы