2016-07-28 4 views
1

Я читаю файл csv через Spark, используя следующее.Прочтите CSV с Spark

rdd=sc.textFile("emails.csv").map(lambda line: line.split(",")) 

Мне нужно создать Spark DataFrame.

Я преобразовал этот RDD, чтобы зажечь ДФ с помощью следующих действий:

dataframe=rdd.toDF() 

Но мне нужно указать схему ФРА при преобразовании ДРРА в ФР. Я попытался сделать это: (я просто 2 колонки-файл и сообщение)

from pyspark import Row 

email_schema=Row('file','message') 

email_rdd=rdd.map(lambda r: email_schema(*r)) 

dataframe=sqlContext.createDataFrame(email_rdd) 

Однако я получаю ошибку: java.lang.IllegalStateException: Входной строки не ожидал число значений, требуемых схема. 2 поля требуются, если указаны 1 значение.

Я также попытался чтением моего файла CSV с помощью этого:

rdd=sc.textFile("emails.csv").map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1])) 

Я получаю ошибку: TypeError: «списка» объект не вызываемая

Я попытался с помощью панд, чтобы прочитать мой файл CSV в кадр данных pandas, а затем преобразовал его в искривление DataFrame, но мой файл слишком большой для этого.

Я также добавил:

bin/pyspark --packages com.databricks:spark-csv_2.10:1.0.3 

И прочитал мой файл, используя следующие:

df=sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('emails.csv') 

Я получаю сообщение об ошибке: java.io.IOException: (StartLine 1) EOF достигнуты до инкапсулированный токен завершен

Я прошел через несколько других связанных потоков и попытался, как указано выше. Может ли кто-нибудь объяснить, где я ошибаюсь?

[Использование Python 2.7, Спарк 1.6.2 на MacOSX]

Отредактировано:

1-3 строки, как показано ниже. Мне нужно извлечь только содержимое письма. Как мне это сделать?

allen-p/_sent_mail/1. «Message-ID: < [email protected]> Дата: Пн, 14 мая 2001 года 16:39:00 -0700 (PDT) От: [email protected] Кому: [email protected] .com Тема: Mime-Version: 1,0 Content-Type: текст/равнину; Charset = US-ASCII Content-Transfer-Encoding: 7bit X-From: Phillip K Allen X-To: Tim Belden X -cc: X-Bcc: X-Folder: \ Phillip_Allen_Jan2002_1 \ Allen, Phillip K. Отправленная почта X-Origin: Allen-P X-FileName: pallen (непривилегированный).ПСТ

Вот наш прогноз "

шестигранный-р/_sent_mail/10." Message-ID: < [email protected]> Дата: Пт, 4 мая 2001 13:51 : 00 -0700 (PDT) От: [email protected] Кому: [email protected] Тема: Re: Mime-Version: 1.0 Content-Type: text/plain; Charset = US-ASCII Content-Transfer-Encoding: 7bit X-From: Phillip K Allen X-To: John J Lavorato X-кубовый: X-ОЦК: X-папка: \ Phillip_Allen_Jan2002_1 \ Allen, Phillip K \ 'Отправленные X-Origin:. Allen-P X-FileName: Pallen (Непривилегированный) .pst

Путешествия на деловую встречу принимает удовольствие от поездки. Особенно, если вам нужно подготовить презентацию. Я бы предложил провести здесь заседания бизнес-плана, а затем совершить поездку без каких-либо официальных деловых встреч. Я даже попытался бы получить некоторые честные мнения о том, является ли поездка даже желаемой или необходимой.

Что касается деловых встреч, я думаю, что было бы более продуктивно пытаться стимулировать дискуссии между различными группами о том, что работает, а что нет. Слишком часто ведущий говорит, а остальные молчат, ожидая своей очереди. Заседания могут быть лучше, если они будут проведены в формате круглого стола.

Мое предложение о том, куда идти, - Остин. Играйте в гольф и арендуйте лыжную лодку и водные лыжи. Полет где-то занимает слишком много времени "

шестигранный-р/_sent_mail/100." Message-ID:. < [email protected]> Дата: Ср, 18 октября 2000 3:00:00 -0700 (PDT) От: [email protected] Кому: [email protected] Тема: Re: test Mime-Version: 1.0 Content-Type: text/plain; Charset = US-ASCII Content-Transfer-Encoding: 7bit X-From: Phillip K Allen X-To: Лия ​​Ван Arsdall X-кубовый: X-ОЦК: X-папка: \ Phillip_Allen_Dec2000 \ Notes папки \ 'отправлено письмо X-Origin: Allen-P X-FileName: pallen.nsf

тест успешный. путь!"

+0

можно напечатать образец из первых пяти строк от' emails.csv' (anonymyzing данные по мере необходимости)? – Alexander

+0

'line (line [0], line [1])' .. Это внешнее использование 'line()' означает, что вы пытаетесь вызвать объект списка, таким образом, ошибка –

ответ

0

Если РДД будет вписываться в памяти, а затем:

rdd.toPandas().to_csv('emails.csv') 

Если нет, используйте spark-csv для вашей версии Spark:

rdd.write.format('com.databricks.spark.csv').save('emails.csv') 

В вашем примере выше:

rdd=....map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1])) 

не хотите:

rdd=....map(lambda line: line.split(",")).map(lambda line: (line[0], line[1])) 
+0

Я пробовал это, но это довольно огромный набор данных. – tg89

+0

Как подключить библиотеку com.databricks.spark.csv к искробезопасности? Я использовал формат, указанный в моем сообщении. Вы знаете, где я ошибаюсь? И я пытаюсь прочитать файл. – tg89

+0

Спасибо @Alexander. Я починил это. Но теперь, читая файл, я получаю java.lang.IllegalStateException: строка ввода не имеет ожидаемого количества значений, требуемых схемой. 2 поля требуются, если указаны 1 значение. Я редактировал свой пост, чтобы показать свои первые 3 строки. 1-й столбец - это файл размером всего 1, 2, 3 жирным шрифтом. И второй столбец - это сообщение, содержащее все содержимое сообщения. Не могли бы вы рассказать мне, как я могу исправить это, чтобы включить 2 столбца, как есть? – tg89

0

Если у вас есть огромный файл, почему бы не использовать панд dataframe в куски, а не загружаются все это сразу, что-то вроде:

import pandas as pd 
df_pd = pd.read_csv('myfilename.csv',chunksize = 10000) 

for i,chunk in enumerate(df1): 
    if i==0: 
     df_spark = sqlContext.createDataFrame(chunk) 
    else: 
     df_spark = df_spark.unionAll(sqlContext.createDataFrame(chunk)) 

df_spark будет ваш требуется искра dataframe. Это неэффективно, но это сработает. Для некоторых других способов их реализации вы можете найти ответы на это question

Еще один возможный метод - использовать метод inferSchema для rdd, но для этого вам нужно иметь имена столбцов в вашем файле csv, см. this. Таким образом, вы можете сделать что-то вроде:

srdd = inferSchema(rdd) 
email_rdd=rdd.map(lambda r: srdd(*r)) 

dataframe=sqlContext.createDataFrame(email_rdd)