2016-09-14 3 views
2

Следующий код Spark правильно показывает, что я хочу сделать, и генерирует правильный вывод с помощью крошечного набора демо-данных.Spark merge/объединить массивы в groupBy/aggregate

Когда я запускаю такой же общий тип кода на большом объеме производственных данных, у меня возникают проблемы времени выполнения. Работа Spark работает на моем кластере в течение ~ 12 часов и выходит из строя.

Просто взглянув на код ниже, кажется, неэффективно взорвать каждую строку, просто чтобы слить его обратно. В данном тестовом наборе данных четвертая строка с тремя значениями в array_value_1 и тремя значениями в array_value_2, которая взорвется до 3 * 3 или девяти взорванных строк.

Итак, в большем наборе данных строка с пятью такими столбцами массива и десятью значениями в каждом столбце будет взорваться до 10^5 взорванных строк?

Глядя на предоставленные функции Spark, нет функций, которые бы выполняли то, что я хочу. Я мог бы предоставить пользовательскую функцию. Есть ли какие-то недостатки в скорости?

val sparkSession = SparkSession.builder. 
    master("local") 
    .appName("merge list test") 
    .getOrCreate() 

val schema = StructType(
    StructField("category", IntegerType) :: 
    StructField("array_value_1", ArrayType(StringType)) :: 
    StructField("array_value_2", ArrayType(StringType)) :: 
    Nil) 

val rows = List(
    Row(1, List("a", "b"), List("u", "v")), 
    Row(1, List("b", "c"), List("v", "w")), 
    Row(2, List("c", "d"), List("w")), 
    Row(2, List("c", "d", "e"), List("x", "y", "z")) 
) 

val df = sparkSession.createDataFrame(rows.asJava, schema) 

val dfExploded = df. 
    withColumn("scalar_1", explode(col("array_value_1"))). 
    withColumn("scalar_2", explode(col("array_value_2"))) 

// This will output 19. 2*2 + 2*2 + 2*1 + 3*3 = 19 
logger.info(s"dfExploded.count()=${dfExploded.count()}") 

val dfOutput = dfExploded.groupBy("category").agg(
    collect_set("scalar_1").alias("combined_values_2"), 
    collect_set("scalar_2").alias("combined_values_2")) 

dfOutput.show() 

ответ

6

Это может быть неэффективным explode но принципиально операции вы пытаетесь осуществить это просто дорого. Эффективно это просто еще один groupByKey, и здесь вы не можете сделать это, чтобы сделать его лучше. Так как вы используете Спарк> 2.0 можно collect_list непосредственно и придавить:

val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten.distinct) 

df 
    .groupBy("category") 
    .agg(
    flatten(collect_list("array_value_1")), 
    flatten(collect_list("array_value_2")) 
) 

Также можно использовать custom Aggregator, но я сомневаюсь, что любой из них будет иметь огромное значение.

Если множества относительно велики, и вы ожидаете значительное количество дублей, вы можете попробовать использовать aggregateByKey с изменяемыми наборами:

import scala.collection.mutable.{Set => MSet} 

val rdd = df 
    .select($"category", struct($"array_value_1", $"array_value_2")) 
    .as[(Int, (Seq[String], Seq[String]))] 
    .rdd 

val agg = rdd 
    .aggregateByKey((MSet[String](), MSet[String]()))( 
    {case ((accX, accY), (xs, ys)) => (accX ++= xs, accY ++ ys)}, 
    {case ((accX1, accY1), (accX2, accY2)) => (accX1 ++= accX2, accY1 ++ accY2)} 
) 
    .mapValues { case (xs, ys) => (xs.toArray, ys.toArray) } 
    .toDF 
+1

Первое решение простой Flatten UDF полностью исправили проблему. Спарк пошел от принятия ~ 12 часов, прежде чем не смог завершить всю работу за 30 минут. Наблюдая за графическим интерфейсом монитора Spark, каждая из внутренних задач запускается и завершается через минуту или меньше. Спасибо за помощь в этом. – clay

+0

Я рад слышать, что, хотя я должен признать, что я удивлен. Я ожидал небольшого улучшения, но ничего такого впечатляющего. Насколько велики индивидуальные списки? – zero323

Смежные вопросы