2016-03-24 2 views
0

У меня есть DataFrame как:как сопоставить DataFrame к EdgeRDD

val data = sc.parallelize(Array((1,10,10,7,7),(2,7,7,7,8),(3, 5,5,6,8))).toDF("id","col1","col2","col3","col4") 

То, что я хочу сделать, это создать EdgeRDD, где два Идентификаторы поделиться ссылкой, если они имеют то же значение, по меньшей мере, один колонн

id col1 col2 col3 col4 
1 10 10 7 7 
2 7 7 7 8 
3 5 5 6 8 

затем узел 1 и 2 имеют неориентированную ссылку 1--2, потому что они имеют общую ценность в col3.

По той же причине, узел 2 и 3 доли неориентированной связи, потому что они имеют общее значение в COL4

Я знаю, как решить эту проблему в уродливом виде (но у меня есть слишком много столбцов для принятия этого стратегия в моем реальном случае)

val data2 = data.withColumnRenamed("id", "idd").withColumnRenamed("col1", "col1d").withColumnRenamed("col2", "col2d").withColumnRenamed("col3", "col3d").withColumnRenamed("col4", "col4d") 
val res = data.join(data2, data("id") < data2("idd") 
        && (data("col1") === data2("col1d") 
        || data("col2") === data2("col2d") 
        || data("col3") === data2("col3d") 
        || data("col4") === data2("col4d"))) 
               //> res : org.apache.spark.sql.DataFrame = [id: int, col1: int, col2: int, col 
               //| 3: int, col4: int, idd: int, col1d: int, col2d: int, col3d: int, col4d: int 
               //| ] 
res.show          //> +---+----+----+----+----+---+-----+-----+-----+-----+ 
               //| | id|col1|col2|col3|col4|idd|col1d|col2d|col3d|col4d| 
               //| +---+----+----+----+----+---+-----+-----+-----+-----+ 
               //| | 1| 10| 10| 7| 7| 2| 7| 7| 7| 8| 
               //| | 2| 7| 7| 7| 8| 3| 5| 5| 6| 8| 
               //| +---+----+----+----+----+---+-----+-----+-----+-----+ 
               //| 
val links = EdgeRDD.fromEdges(res.map(row => Edge(row.getAs[Int]("id").toLong, row.getAs[Int]("idd").toLong, "indirect"))) 
               //> links : org.apache.spark.graphx.impl.EdgeRDDImpl[String,Nothing] = EdgeRDD 
               //| Impl[27] at RDD at EdgeRDD.scala:42 
links.foreach(println)      //> Edge(1,2,indirect) 
               //| Edge(2,3,indirect) 

как решить это для гораздо большего количества столбцов?

+0

Не могли бы вы объяснить, какую часть вы найдете уродливой? условие 'join'? И 'data2' просто копия' data'? – zero323

+0

сказать, что у меня есть 100 столбцов, мне не нравится указывать === для каждого столбца ... должен быть способ автоматизировать эту часть? или, может быть, нет ... :( – user299791

+0

yep, извините, забыл добавить эту часть ... только что отредактировал – user299791

ответ

2

Вы имеете в виду что-то вроде этого?

val expr = data.columns.diff(Seq("id")) 
    .map(c => data(c) === data2(s"${c}d")) 
    .reduce(_ || _) 

data.join(data2, data("id") < data2("idd") && expr) 

Вы также можете использовать псевдонимы

import org.apache.spark.sql.functions.col 

val expr = data.columns.diff(Seq("id")) 
    .map(c => col(s"d1.$c") === col(s"d2.$c")) 
    .reduce(_ || _) 

data.alias("d1").join(data.alias("d2"), col("d1.id") < col("d2.id") && expr) 

Вы можете легко следить за каждым из этого, а просто select ($ эквивалентно col но требует импорта sqlContext.implicits.StringToColumn)

.select($"id".cast("long"), $"idd".cast("long")) 

или

.select($"d1.id".cast("long"), $"d2.id".cast("long")) 

и сопоставление с образцом:

.rdd.map { case Row(src: Long, dst: Long) => Edge(src, dst, "indirect") } 

Сразу отметим, что логические дизъюнкции, как эта, не могут быть оптимизированы и расширяются до декартово произведение с последующим filter. Если вы хотите избежать, вы можете попытаться подойти к этой проблеме по-разному.

Давайте начнем с изменения формы данных от широкоугольного до тех пор:

val expr = explode(array(data.columns.tail.map(
    c => struct(lit(c).alias("column"), col(c).alias("value")) 
): _*)) 

val long = data.withColumn("tmp", expr) 
    .select($"id", $"tmp.column", $"tmp.value") 

Это даст нам DataFrame со следующей схемой:

long.printSchema 

// root 
// |-- id: integer (nullable = false) 
// |-- column: string (nullable = false) 
// |-- value: integer (nullable = false) 

С данными, как это у вас есть несколько вариантов, включая оптимизированные join :

val pairs = long.as("long1") 
    .join(long.as("long2"), 
    $"long1.column" === $"long2.column" && // Optimized 
    $"long1.value" === $"long2.value" && // Optimized 
    $"long1.id" < $"long2.id" // Not optimized - filtered after sort-merge join 
) 
    // Select only ids 
    .select($"long1.id".alias("src"), $"long2.id".alias("dst")) 
    // And keep distict 
    .distinct 

pairs.show 
// +---+---+ 
// |src|dst| 
// +---+---+ 
// | 1| 2| 
// | 2| 3| 
// +---+---+ 

Th может быть дополнительно улучшена с использованием различных методов хэширования, чтобы избежать большого количества записей, генерируемых в результате взрыва.

Вы также можете подумать об этой проблеме как о двудольном графе, где наблюдения относятся к категории узлов и парам свойств к другому.

sealed trait CustomNode 
case class Record(id: Long) extends CustomNode 
case class Property(name: String, value: Int) extends CustomNode 

При этом в качестве отправной точки вы можете использовать long для создания края следующего типа:

Record -> Property 

и решить эту проблему с помощью Graphx непосредственно путем поиска путей, как

Record -> Property <- Record 

Подсказка: собрать соседей по каждому свойству и размножаться обратно.

Как и прежде, вам следует рассмотреть возможность использования хэширования или ведер, чтобы уменьшить ограничение числа сгенерированных узлов Property.

+0

Это решение работает в теории, но на самом деле я не могу запустить его, потому что он занимает слишком много дискового пространства даже для относительно небольших сеть из 297 узлов и 300 ссылок ... есть ли какой-либо другой подход, который я могу попробовать, который можно было бы оптимизировать? – user299791

+0

Он не должен терпеть неудачу с 300 узлами, но, как я уже сказал, до того, как логические альтернативы не могут быть оптимизированы. Таким образом, этот подход примерно равен 0 (N^2 * M) _ с N узлами и функциями M. Есть ли другой подход? Вероятно, более одного. Например, вы можете рассмотреть хеширование для поиска возможных кандидатов или представлять ваши данные в виде двудольного графика с самого начала. скорее всего, какой-либо из них будет соответствовать SO-формату. – zero323

+0

, представляющий данные как двудольный граф, не является чем-то, что приходит мне на ум, потому что узлы принадлежат к одному и тому же набору ... о «хешировании» для поиска (я не компьютерный ученый) ... но спасибо за предложения, я попробую это сделать и посмотрю ... – user299791

Смежные вопросы