2016-05-04 2 views
3

У меня есть DataFrame с двумя колонками:pyspark: объединить ряды DataFrame в DenseVector

df = sqlContext.createDataFrame([ 
    (1, 'a'), (2, 'a'), 
    (3, 'b'), (4, 'b'), 
    (5, 'c'), (6, 'c'), 
    (7, 'd'), (8, 'd'), 
], schema=['value', 'name']) 

Редактировать 2017/01/13: Я получаю эту dataframe из таблицы SQL на основе Entity-Attribute-Value модель. Таким образом, для каждой строки будет доступен дополнительный третий столбец «id».

Я хочу превратить его в «функции» DataFrame, как того требуют классификаторы пакета ml. Для отдельных столбцов это может быть достигнуто с помощью VectorAssembler:

from pyspark.ml.feature import VectorAssembler 

assembler = VectorAssembler(inputCols=['value'], outputCol="features") 
selected_features = assembler.transform(df).select('features') 
selected_features.collect() 

[Row(features=DenseVector([1.0])), 
Row(features=DenseVector([2.0])), 
Row(features=DenseVector([3.0])), 
Row(features=DenseVector([4.0])), 
Row(features=DenseVector([5.0])), 
Row(features=DenseVector([6.0])), 
Row(features=DenseVector([7.0])), 
Row(features=DenseVector([8.0]))] 

То, что я хочу это:

[Row(features=DenseVector([1.0, 2.0])), 
Row(features=DenseVector([3.0, 4.0])), 
Row(features=DenseVector([5.0, 6.0])), 
Row(features=DenseVector([7.0, 8.0]))] 

Что бы наиболее эффективный способ объединения значений столбца value в DenseVector на основе значения столбца name?

Я думал, например, из пользовательской функции агрегации для GroupedData, который будет работать с groupby:

df.groupby('name').vector_agg().collect() 

аналогична функции PostgreSQL array_agg:

SELECT array_agg(df.value) FROM table as df 
GROUP BY df.name; 

ответ

1

Я думаю, что ваш вопрос плохо определен, так как при фиксированном name нет никакого способа узнать, какой value принадлежит в каком столбце. Классификаторы в пакете ml потребуют, чтобы каждый столбец использовался последовательно между образцами обучения. В вашем примере столбцы оказываются в нужном порядке, но на практике вы не можете полагаться на это.

Ваша проблема может быть решена, если вы можете дать вашим характеристикам показатели и начать что-то вроде этого:

df = sc.sql.createDataFrame([ 
    ('a', ('f1', 1)), ('a', ('f2', 2)), 
    ('b', ('f1', 3)), ('b', ('f2', 4)), 
    ('c', ('f1', 5)), ('c', ('f2', 6)), 
    ('d', ('f1', 7)), ('d', ('f2', 8)), 
], schema=['name', 'feature']) 

первой, группа name и агрегировать свои функции в виде списка:

import pyspark.sql.functions as F 

df.groupBy('name')\ 
    .agg(F.collect_list('feature'))\ 
    .show() 

Выход:

+----+---------------------+ 
|name|collect_list(feature)| 
+----+---------------------+ 
| d|  [[f1,7], [f2,8]]| 
| c|  [[f1,5], [f2,6]]| 
| b|  [[f1,3], [f2,4]]| 
| a|  [[f1,1], [f2,2]]| 
+----+---------------------+ 

Далее следует использовать udf в withColumn для преобразования этого массива в DenseVector. Собираем все вместе:

from pyspark.ml.linalg import Vectors, VectorUDT 
import pyspark.sql.functions as F 

list_to_dense = F.udf(lambda l: Vectors.dense([v for (k,v) in sorted(l)]), VectorUDT()) 

df.groupBy('name')\ 
    .agg(F.collect_list('features'))\ 
    .withColumn('features', list_to_dense('collect_list(features)'))\ 
    .select('features')\ 
    .collect() 

Выход:

[Row(features=DenseVector([7.0, 8.0])), 
Row(features=DenseVector([5.0, 6.0])), 
Row(features=DenseVector([3.0, 4.0])), 
Row(features=DenseVector([1.0, 2.0]))] 
+0

Вы правы!Проблема была связана с обработкой таблицы SQL на основе модели Entity-Attribute-Value. Таким образом, будет третий столбец объекта. Я уточню вопрос соответствующим образом. – mdh

+0

'collect_list' - это то, что я искал – mdh

1

Из вашей структуры данных нужно просто сделать join с той же таблицей и filter те строки, где values такие же (или инвертированные).

df = sqlContext.createDataFrame([ 
    (1, 'a'), (2, 'a'), 
    (3, 'b'), (4, 'b'), 
    (5, 'c'), (6, 'c'), 
    (7, 'd'), (8, 'd'), 
], schema=['value', 'name']) 

xf = df.select(df["name"].alias("nam"), df["value"].alias("val")) 
pf = df.join(xf, df["name"] == xf["nam"], "inner").where(xf["val"] < df["value"]).select(df["value"], xf["val"], df["name"]) 

from pyspark.ml.feature import VectorAssembler 


assembler = VectorAssembler(inputCols=['value', "val"], outputCol="features") 
selected_features = assembler.transform(pf).select('features') 
selected_features.collect() 


#[Row(features=DenseVector([2.0, 1.0])), 
# Row(features=DenseVector([4.0, 3.0])), 
# Row(features=DenseVector([6.0, 5.0])), 
# Row(features=DenseVector([8.0, 7.0]))] 
+0

Это прекрасно работает для очень простого примера я представил, но я ищу для более общего подхода. Я уточнил свой вопрос. – mdh

Смежные вопросы