3

У меня есть (довольно большой, думаю 10e7 Ряды) DataFrame, из которого я фильтровать элементы на основе некоторого свойстваPartition Расположение RDD/Dataframe

val res = data.filter(data(FieldNames.myValue) === 2).select(pk.name, FieldName.myValue) 

Мой DataFrame имеет п Перегородки data.rdd.getNumPartitions

Теперь я хочу узнать, из какого раздела произошли мои строки. Я знаю, что я мог бы просто перебрать все разделы с чем-то вроде этого

val temp = res.first() //or foreach, this is just an example 
data.foreachPartition(f => { 
    f.exists(row => row.get(0)==temp.get(0)) 
    //my code here 
}) //compare PKs 

или data.rdd.mapPartitionsWithIndex((idx, f) => ...)

Однако, это кажется чрезмерным, а также не очень производительным, если мои результаты и мой DataFrame становится большим.

Есть ли способ искры, чтобы сделать это после того, как я выполнил фильтр() - операцию?

Или, альтернативно, существует ли способ переписать/альтернативу запросу filter(), чтобы он возвращал начало строки?

Я мог бы также сохранить местоположение разделов в моем DataFrame и обновлять, что на переразметкой, но я предпочел бы сделать это в искровым способом

(Единственный подобный вопрос я нашел был here, и ни один вопрос ни комментарий очень полезен. Я также нашел this, который может быть похож, но не такой)

Заранее благодарим за любую помощь/указатели, и я извиняюсь, если упустил вопрос, похожий на мой, на который уже был дан ответ.

+0

mapPartitionsWithIndex - простая операция с картами. Он не включает перетасовку, просто распределенное отображение. Возможно, будет другой способ, но я не уверен, что это может быть действительно более результативным, чем это. – Marie

ответ

0

Номера разделов/подсчеты не стабильны, так как Spark будет выполнять автоматическое расширение & сокращение перегородки. Это означает, что количество входных разделов может быть не таким, как, например, количество входных файлов.

Общий шаблон в этих ситуациях заключается в создании некоторого типа составного ключа на основе данных в каждом входном файле. Если ключ большой, вы можете хэшировать его, чтобы уменьшить размер. Если вы не заботитесь о столкновениях, используйте Murmur3. Если вас беспокоит столкновение, используйте MD5, который все еще довольно быстрый.

Если единственная уникальная функция, которую у вас есть, это путь к входному файлу, вам нужно будет добавить путь к файлу в качестве отличительного столбца. Вот способ сделать это:

val paths = Seq(...) 
val df = paths 
    .map { path => 
    sqlContext.read.parquet(path) 
     .withColumn("path", lit(path)) 
    } 
    .reduceLeft(_ unionAll _) 

Идея проста: прочитать входные файлы по одному, добавить уникальный столбец, связанный с ними, а затем объединить их вместе с помощью UNION ALL.