2015-09-21 2 views
2

Новый Спарк, и все примеры, которые я прочитал дело с небольшими наборами данных, таких как:PySpark: Многие функции меченого Точка RDD

RDD = sc.parallelize([ 
LabeledPoint(1, [1.0, 2.0, 3.0]), 
LabeledPoint(2, [3.0, 4.0, 5.0]), 

Однако, у меня есть большой набор данных с 50+ функциями.

Пример строки

u'2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5' 

Я хочу, чтобы быстро создать Labeledpoint RDD в PySpark. Я пытаюсь проиндексировать последнюю позицию в качестве моей первой точки данных в RDD Labeledpoint, а затем индексировать первые n-1 положения как плотный вектор. Однако я получаю следующую ошибку. Любое руководство ценится! Примечание: если я изменяю [] на() при создании помеченной точки, я получаю сообщение об ошибке «Недопустимый синтаксис».

df = myDataRDD.map(lambda line: line.split(',')) 
data = [ 
    LabeledPoint(df[54], df[0:53]) 
] 
TypeError: 'PipelinedRDD' object does not support indexing 
--------------------------------------------------------------------------- 
TypeError         Traceback (most recent call last) 
<ipython-input-67-fa1b56e8441e> in <module>() 
     2 df = myDataRDD.map(lambda line: line.split(',')) 
     3 data = [ 
----> 4  LabeledPoint(df[54], df[0:53]) 
     5 ] 

TypeError: 'PipelinedRDD' object does not support indexing 
+0

Для уточнения, когда вы упоминаете о последней позиции в качестве вашей первой точки данных, вы имеете в виду это как метку и остальные элементы в качестве функций класса LabaledPoint? –

ответ

4

Как ошибка вы получаете говорится, вы не можете получить доступ к RDD индексами. Вам нужно второй map заявление для преобразования последовательностей в LabeledPoint сек

rows = [u'2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5', u'2596,51,3,258,0,510,221,232,148,6279,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5'] 

rows_rdd = sc.parallelize(rows) # create RDD with given rows 

labeled_points_rdd = rows_rdd\ 
        .map(lambda row: row.split(','))\     # split rows into sequences 
        .map(lambda seq: LabeledPoint(seq[-1],seq[:-2])) # create Labeled Points from these sequences with last Item as label 

print labeled_points_rdd.take(2) 
# prints [LabeledPoint(5.0, [2596.0,51.0,3.0,258.0,0.0,510.0,221.0,...]), 
#   LabeledPoint(5.0,[2596.0,51.0,3.0,258.0,0.0,510.0,221.0,...]) 

Обратите внимание, что отрицательные показатели в питоне Пусть вам последовательность доступа в обратном направлении.

С .take(n) вы получаете первые n элементов с вашего RDD.

Надеюсь, это поможет.

2

Вы не можете использовать индексацию, вместо этого вы должны использовать методы, доступные в API Spark. Итак:

data = [ LabeledPoint(myDataRDD.take(RDD.count()), #Last element 
         myDataRDD.top(RDD.count()-1)) #All but last ] 

(непроверенная, тем не менее, это общая идея)

+0

Спасибо за помощь, я считаю, что это работает только по строкам правильно? Как я буду делать это по столбцам? – adlopez15

Смежные вопросы