1

Учитывая искру DataFrame, который выглядит примерно так:Консерванты Спарк DataFrame перегородки колонки по преобразованию RDD

================================== 
| Name | Col1 | Col2 | .. | ColN | 
---------------------------------- 
| A | 1 | 11 | .. | 21 | 
| A | 31 | 41 | .. | 51 | 
| B | 2 | 12 | .. | 22 | 
| B | 32 | 42 | .. | 52 | 
================================== 

Я хотел бы запустить логику, которая выполняет агрегации/вычисления для разбиения таблицы, которая соответствует значение Name. Указанная логика требует, чтобы полное содержимое раздела - и - только, что раздел - будет реализован в памяти на узле, выполняющем логику; это будет выглядеть как processSegment функции ниже:

def processDataMatrix(dataMatrix): 
    # do some number crunching on a 2-D matrix 

def processSegment(dataIter): 
    # "running" value of the Name column in the iterator 
    dataName = None 
    # as the iterator is processed, put the data in a matrix 
    dataMatrix = [] 

    for dataTuple in dataIter: 
     # separate the name column from the other columns 
     (name, *values) = dataTuple 
     # SANITY CHECK: ensure that all rows have same name 
     if (dataName is None): 
      dataName = name 
     else: 
      assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName) 

     # put the row in the matrix 
     dataMatrix.append(values) 

    # if any rows were processed, number-crunch the matrix 
    if (dataName is not None): 
     return processDataMatrix(dataMatrix) 
    else: 
     return [] 

Я попытался сделать эту работу переразметкой на основе Name колонке, а затем работает processSegment каждого раздела с помощью mapPartitions на подстилающей РДУ:

result = \ 
    stacksDF \ 
     .repartition('Name') \ 
     .rdd \ 
     .mapPartitions(processSegment) \ 
     .collect() 

Однако этот процесс обычно не проходит SANITY CHECK утверждения в processSegment:

AssertionError: row name Q7 does not match expected A9 

Почему разбиение, которое якобы выполняется на DataFrame, не сохраняется при попытке запустить mapPartitions на базовом RDD? Если вышеприведенный подход недействителен, существует ли какой-либо подход (с использованием API-интерфейса DataFrame или API RDD), который позволит мне выполнить логику агрегации при передаче данных в разделе DataFrame в памяти?

(Как я использую PySpark, и определенное количество хруст логики я хочу, чтобы выполнить это Python, определяемые пользователем агрегатные функции (UDAFs) would not appear to be an option.)

ответ

1

Я считаю, что вы непонятых, как секционирования работы. В общем partioner является сюръективной функцией, а не биективной. Хотя все записи для определенного значения будут перемещены в один раздел, раздел может содержать записи с несколькими разными значениями.

DataFrame API не дает вам никакого контроля над секционированием, но при использовании RDD API можно определить пользовательский partitionFunc. Это означает, что вы можете использовать тот, который биективен, например:

mapping = (df 
    .select("Name") 
    .distinct() 
    .rdd.flatMap(lambda x: x) 
    .zipWithIndex() 
    .collectAsMap()) 

def partitioner(x): 
    return mapping[x] 

и использовать его следующим образом:

df.rdd.map(lambda row: (row.Name, row)).partitionBy(len(mapping), partitioner) 

Хотя возможно, вы должны помнить, что перегородки не являются свободными, и если числа уникальные значения велики, это может стать серьезной проблемой производительности.

Смежные вопросы