Следующий код Spark правильно показывает, что я хочу сделать, и генерирует правильный вывод с помощью крошечного набора демо-данных.Spark merge/объединить массивы в groupBy/aggregate
Когда я запускаю такой же общий тип кода на большом объеме производственных данных, у меня возникают проблемы времени выполнения. Работа Spark работает на моем кластере в течение ~ 12 часов и выходит из строя.
Просто взглянув на код ниже, кажется, неэффективно взорвать каждую строку, просто чтобы слить его обратно. В данном тестовом наборе данных четвертая строка с тремя значениями в array_value_1 и тремя значениями в array_value_2, которая взорвется до 3 * 3 или девяти взорванных строк.
Итак, в большем наборе данных строка с пятью такими столбцами массива и десятью значениями в каждом столбце будет взорваться до 10^5 взорванных строк?
Глядя на предоставленные функции Spark, нет функций, которые бы выполняли то, что я хочу. Я мог бы предоставить пользовательскую функцию. Есть ли какие-то недостатки в скорости?
val sparkSession = SparkSession.builder.
master("local")
.appName("merge list test")
.getOrCreate()
val schema = StructType(
StructField("category", IntegerType) ::
StructField("array_value_1", ArrayType(StringType)) ::
StructField("array_value_2", ArrayType(StringType)) ::
Nil)
val rows = List(
Row(1, List("a", "b"), List("u", "v")),
Row(1, List("b", "c"), List("v", "w")),
Row(2, List("c", "d"), List("w")),
Row(2, List("c", "d", "e"), List("x", "y", "z"))
)
val df = sparkSession.createDataFrame(rows.asJava, schema)
val dfExploded = df.
withColumn("scalar_1", explode(col("array_value_1"))).
withColumn("scalar_2", explode(col("array_value_2")))
// This will output 19. 2*2 + 2*2 + 2*1 + 3*3 = 19
logger.info(s"dfExploded.count()=${dfExploded.count()}")
val dfOutput = dfExploded.groupBy("category").agg(
collect_set("scalar_1").alias("combined_values_2"),
collect_set("scalar_2").alias("combined_values_2"))
dfOutput.show()
Первое решение простой Flatten UDF полностью исправили проблему. Спарк пошел от принятия ~ 12 часов, прежде чем не смог завершить всю работу за 30 минут. Наблюдая за графическим интерфейсом монитора Spark, каждая из внутренних задач запускается и завершается через минуту или меньше. Спасибо за помощь в этом. – clay
Я рад слышать, что, хотя я должен признать, что я удивлен. Я ожидал небольшого улучшения, но ничего такого впечатляющего. Насколько велики индивидуальные списки? – zero323