2016-04-19 6 views
-1

Я использую dataframes sparksql.map array и сохранить исходный формат

df = sql.read.parquet("toy_data") 
df.show() 
+-----------+----------+ 
|   x|   y| 
+-----------+----------+ 
| -4.5707927| -5.282721| 
| -5.762503| -4.832158| 
| 7.907721| 6.793022| 
| 7.4408655| -6.601918| 
| -4.2428184| -4.162871| 

У меня есть список кортежей следующую структуру:

(Row (х = -8.45811653137207, Y = -5.179722309112549), ((-1819,748514533043, +47,745243303477764), 333))

где первый элемент является точкой, второй элемент является (sum_of_points, number_of_points) кортежем.

Когда я разделить sum_of_points по num_of_points, как это:

new_centers = center_sum_num.map(lambda tup: np.asarray(tup[1][0])/tup[1][1]).collect() 

Я получаю следующее, который является массивом Numpy массивов.

[array([-0.10006594, -6.7719144 ]), array([-0.25844196, 5.28381418]), array([-5.12591623, -4.5685448 ]), array([ 5.40192709, -4.35950824])] 

Однако, я хочу, чтобы держать их точки в оригинальном формате, например:

[Row(x=-5.659833908081055, y=7.705344200134277), Row(x=3.17942214012146, y=-9.446121215820312), Row(x=9.128270149230957, y=4.5666022300720215), Row(x=-6.432034969329834, y=-4.432190895080566)] 

Значение Я не хочу, массива numpy_arrays - Я хочу, чтобы массив Его (х = ..., y = ...) thingys.

Как это сделать?

Мой полный код прилагается для справки:

new_centers = [Row(x=-5.659833908081055, y=7.705344200134277), Row(x=3.17942214012146, y=-9.446121215820312), Row(x=9.128270149230957, y=4.5666022300720215), Row(x=-6.432034969329834, y=-4.432190895080566)] 




while old_centers is None or not has_converged(old_centers, new_centers, epsilon) and iteration < max_iterations: 
    # update centers 
    old_centers = new_centers 


    center_pt_1 = points.rdd.map(lambda point: (old_centers[nearest_center(old_centers, point)[0]], (point, 1))) 
    note that nearest_center()[0] is the index 

    center_sum_num =center_pt_1.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1]) ,a[1] + b[1])) 



    new_centers = center_sum_num.map(lambda tup: np.asarray(tup[1][0])/tup[1][1]).collect() 





    iteration += 1 

return new_centers 

ответ

0

Определить структуру

from pyspark.sql import Row 

row = Row("x", "y") 

и распаковывать результаты:

x = (
    Row(x=-8.45811653137207, y=-5.179722309112549), 
    ((-1819.748514533043, 47.745243303477764), 333) 
) 
f = lambda tup: row(*np.asarray(tup[1][0])/tup[1][1]) 
f(x) 
## Row(x=-5.4647102538529815, y=0.14337910901945275) 
Смежные вопросы