2016-03-10 2 views
2

У меня есть вкладка разделенных файл, содержащий строки, какПравильно читать типы из файла в PySpark

id1 name1 ['a', 'b'] 3.0 2.0 0.0 1.0 

, то есть идентификатор, имя, список с некоторыми строками, и ряд из 4 атрибутов с плавающей точкой , Я читаю этот файл как

rdd = sc.textFile('myfile.tsv') \ 
    .map(lambda row: row.split('\t')) 
df = sqlc.createDataFrame(rdd, schema) 

где я даю схему, как

schema = StructType([ 
    StructField('id', StringType(), True), 
    StructField('name', StringType(), True), 
    StructField('list', ArrayType(StringType()), True), 
    StructField('att1', FloatType(), True), 
    StructField('att2', FloatType(), True), 
    StructField('att3', FloatType(), True), 
    StructField('att4', FloatType(), True) 
]) 

Проблема, как список и атрибуты не получают правильно читать, судя из collect на DataFrame. На самом деле, я получаю None для всех из них:

Row(id=u'id1', brand_name=u'name1', list=None, att1=None, att2=None, att3=None, att4=None) 

Что я делаю неправильно?

+0

Абсолютно уверен, что все столбцы ограничены табуляцией? (может показаться глупым вопросом, но вы никогда не знаете). Если у вас есть сомнения, сделайте hexdump; пробел имеет шестнадцатеричный код 20, вкладка - 09 – jDo

+0

@jDo Абсолютно уверен и проверен –

ответ

3

Это правильно прочитано, оно просто не работает, как вы ожидаете. Аргумент схемы объявляет то, что являются типами, чтобы избежать дорогостоящего вывода схемы, а не как отбрасывать данные. Предоставление ввода, которое соответствует объявленной схеме, является вашей ответственностью.

Это может быть также обработано либо источником данных (см. spark-csv и inferSchema). Однако он не будет обрабатывать сложные типы, такие как массив.

Поскольку ваша схема в основном равнинная, и вы знаете типы, вы можете попробовать что-то вроде этого:

df = rdd.toDF([f.name for f in schema.fields]) 

exprs = [ 
    # You should excluding casting 
    # on other complex types as well 
    col(f.name).cast(f.dataType) if f.dataType.typeName() != "array" 
    else col(f.name) 
    for f in schema.fields 
] 

df.select(*exprs) 

и обрабатывать сложные типы отдельно с использованием функций обработки строк или пользовательскими. Кроме того, поскольку вы все равно читаете данные на Python, просто создавайте нужные типы перед созданием DF.

Смежные вопросы