2016-11-11 2 views
0

из нескольких файлов CSV с 150 атрибута в каждой строке в среднем, мне нужно, чтобы удовлетворить этот запрос SQL в pyspark:pyspark reduceByKey только 1 значение за ключ

SELECT object_id, COUNT(*), MAX(source_id), MIN(ra), MAX(ra), MIN(decl), MAX(decl) 

я использовал одну функцию карты, которая принимает каждую строку, фильтр и выводит необходимые поля:

Map Output : < object_id , Array(objectid , 1,sourceid,ra,decl]) > 

Я использовал один уменьшить функцию, которая вычисляет все необходимые агрегированные функции сразу (а и в являются массивами, как описано в выходных данных карты):

def generalReduce(A,B): 
    myarrayRet = [0,0,0,0,0,0,0] 
    myarrayRet[0] = A[0] 
    #count 
    myarrayRet[1] = A[1] + B[1] 
    #maxSrcId 
    myarrayRet[2] = A[2] if A[2] > B[2] else B[2] 
    #minRa 
    myarrayRet[3] = A[3] if A[3] < B[3] else B[3] 
    #maxRa 
    myarrayRet[4] = A[3] if A[3] > B[3] else B[3] 
    #minDecl 
    myarrayRet[5] = A[4] if A[4] > B[4] else B[4] 
    #maxDecl 
    myarrayRet[6] = A[4] if A[4] > B[4] else B[4] 
    return myarrayRet 

Проблема в том, что есть некоторые клавиши, которые имеют только 1 значение, поэтому фаза уменьшения выводит массив из 4 позиций. Это позволяет мне думать, что функция reduce вызывается только в том случае, если ключ имеет более одного значения, я ошибаюсь?; если нет, как я могу сделать, чтобы вывести пользовательское значение, если для одного ключа есть только одно значение?

спасибо.

+0

testttttttttttttt – user1038445

+0

Извините, это было плохо идентифицировано, и я впервые использовал платформу. – user1038445

ответ

1

Я не уверен, правильно ли я понимаю вашу проблему. Если у вас есть ключ-значение RDD как data, как показано ниже:

Map Output : < object_id , Array(objectid , 1,sourceid,ra,decl]) > 

И использовать data.reduceByKey(generalReduce), ваша функция generalReduce должна быть коммутативной и ассоциативной. То есть, если у вас есть три элемента, generalReduce(generalReduce(elem1,elem2),elem3) должен быть равен generalReduce(elem1,generalReduce(elem2,elem3). В вашем коде возвращаемое значение generalReduce(elem1,elem2) не совпадает с elem3, поэтому вы должны учитывать это. На самом деле, я не думаю, что ваш код делает то, что вы намереваетесь сделать.

Для вашего второго вопроса, в случае, если это то, что вы хотите сделать, вы можете использовать карту для преобразования значений в правильном формате.

Смежные вопросы