Учитывая искру DataFrame, который выглядит примерно так:Консерванты Спарк DataFrame перегородки колонки по преобразованию RDD
==================================
| Name | Col1 | Col2 | .. | ColN |
----------------------------------
| A | 1 | 11 | .. | 21 |
| A | 31 | 41 | .. | 51 |
| B | 2 | 12 | .. | 22 |
| B | 32 | 42 | .. | 52 |
==================================
Я хотел бы запустить логику, которая выполняет агрегации/вычисления для разбиения таблицы, которая соответствует значение Name
. Указанная логика требует, чтобы полное содержимое раздела - и - только, что раздел - будет реализован в памяти на узле, выполняющем логику; это будет выглядеть как processSegment
функции ниже:
def processDataMatrix(dataMatrix):
# do some number crunching on a 2-D matrix
def processSegment(dataIter):
# "running" value of the Name column in the iterator
dataName = None
# as the iterator is processed, put the data in a matrix
dataMatrix = []
for dataTuple in dataIter:
# separate the name column from the other columns
(name, *values) = dataTuple
# SANITY CHECK: ensure that all rows have same name
if (dataName is None):
dataName = name
else:
assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName)
# put the row in the matrix
dataMatrix.append(values)
# if any rows were processed, number-crunch the matrix
if (dataName is not None):
return processDataMatrix(dataMatrix)
else:
return []
Я попытался сделать эту работу переразметкой на основе Name
колонке, а затем работает processSegment
каждого раздела с помощью mapPartitions
на подстилающей РДУ:
result = \
stacksDF \
.repartition('Name') \
.rdd \
.mapPartitions(processSegment) \
.collect()
Однако этот процесс обычно не проходит SANITY CHECK
утверждения в processSegment
:
AssertionError: row name Q7 does not match expected A9
Почему разбиение, которое якобы выполняется на DataFrame, не сохраняется при попытке запустить mapPartitions
на базовом RDD? Если вышеприведенный подход недействителен, существует ли какой-либо подход (с использованием API-интерфейса DataFrame или API RDD), который позволит мне выполнить логику агрегации при передаче данных в разделе DataFrame в памяти?
(Как я использую PySpark, и определенное количество хруст логики я хочу, чтобы выполнить это Python, определяемые пользователем агрегатные функции (UDAFs) would not appear to be an option.)