2015-04-29 4 views
12

Я использую python на Spark и хотел бы получить csv в dataframe.Получить CSV для Spark dataframe

documentation для Spark SQL странно не дает объяснений CSV в качестве источника.

Я нашел Spark-CSV, однако у меня есть проблемы с двумя частями документации:

  • "This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3" ли мне действительно нужно добавить этот аргумент каждый раз я запускаю pyspark или искровым представить? Это кажется очень неэлегантным. Разве нет способа импортировать его в python, а не перегружать его каждый раз?

  • df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv") Даже если я сделаю это, это не сработает. Что означает аргумент «source» в этой строке кода? Как просто загрузить локальный файл в linux, скажем, «/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv»?

ответ

11

Прочтите файл csv в RDD, а затем сгенерируйте RowRDD из исходного RDD.

Создать схему, представленную в StructType, соответствующей структуре строк в RDD, созданный на шаге 1.

Применить схему к RDD строк с помощью метода createDataFrame, предоставленной SQLContext.

lines = sc.textFile("examples/src/main/resources/people.txt") 
parts = lines.map(lambda l: l.split(",")) 
# Each line is converted to a tuple. 
people = parts.map(lambda p: (p[0], p[1].strip())) 

# The schema is encoded in a string. 
schemaString = "name age" 

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] 
schema = StructType(fields) 

# Apply the schema to the RDD. 
schemaPeople = spark.createDataFrame(people, schema) 

Источник: SPARK PROGRAMMING GUIDE

+0

этого ответ старый, новые версии искры есть более простые способы для достижения этой цели. Обратитесь к ответам https://stackoverflow.com/a/41638342/187355 и https://stackoverflow.com/a/46539901/187355 –

20
from pyspark.sql.types import StringType 
from pyspark import SQLContext 
sqlContext = SQLContext(sc) 

Employee_rdd = sc.textFile("\..\Employee.csv") 
       .map(lambda line: line.split(",")) 

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name']) 

Employee_df.show() 
+0

В этом ответе есть несколько оборотов, но мне не совсем понятно, что происходит - вы делаете SQLContext (sc) и вызывать этот sqlContext, тогда вы ничего не делаете с этим .. это просто посторонний код? Когда я пытаюсь тем же код с простым файлом CSV в дирижабле ноутбуке я получаю ошибку: '' 'Traceback (самый последний вызов последнего): Файла«/tmp/zeppelin_pyspark-7664300769638364279.py», строка 252 в Eval (compiledCode) Файл "", строка 1, в AttributeError: 'INT' объект не имеет атрибута «не map'''' – tamale

+0

Пожалуйста, поделитесь своим кодом, чтобы получить помощь. Я использовал код в нескольких экземплярах, у меня не было проблем –

0

я столкнулся с подобной проблемой. Решение состоит в том, чтобы добавить переменную среды с именем «PYSPARK_SUBMIT_ARGS» и установить ее значение в «--packages com.databricks: spark-csv_2.10: 1.4.0 pyspark-shell». Это работает с интерактивной оболочкой Spark's Python.

Убедитесь, что вы соответствуете версии spark-csv с установленной версией Scala. С Scala 2.11 это искра-csv_2.11, а Scala 2.10 или 2.10.5 - искра-csv_2.10.

Надеюсь, что он работает.

8

Если вы не против дополнительной зависимости пакета, вы можете использовать Pandas для анализа файла CSV. Он обрабатывает внутренние запятые просто отлично.

Зависимости:

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 

Читать весь файл сразу в Спарк DataFrame:

sc = SparkContext('local','example') # if using locally 
sql_sc = SQLContext(sc) 

pandas_df = pd.read_csv('file.csv') # assuming the file contains a header 
# If no header: 
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) 
s_df = sql_sc.createDataFrame(pandas_df) 

Или даже больше данных сознательно, вы можете ломоть данные в искры RDD затем DF :

chunk_100k = pd.read_csv('file.csv', chunksize=100000) 

for chunky in chunk_100k: 
    Spark_temp_rdd = sc.parallelize(chunky.values.tolist()) 
    try: 
     Spark_full_rdd += Spark_temp_rdd 
    except NameError: 
     Spark_full_rdd = Spark_temp_rdd 
    del Spark_temp_rdd 

Spark_DF = Spark_full_rdd.toDF(['column 1','column 2']) 
+0

createDataFrame часто дает и ошибки, подобные этому: IllegalArgumentException: «Ошибка при создании экземпляра» org.apache.spark.sql.hive.HiveSessionState: «... любой случайный удар это? – mathtick

6

После Спарк 2.0, рекомендуется использовать Спарк Session:

from pyspark.sql import SparkSession 
from pyspark.sql import Row 

# Create a SparkSession 
spark = SparkSession \ 
    .builder \ 
    .appName("basic example") \ 
    .config("spark.some.config.option", "some-value") \ 
    .getOrCreate() 

def mapper(line): 
    fields = line.split(',') 
    return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3])) 

lines = spark.sparkContext.textFile("file.csv") 
df = lines.map(mapper) 

# Infer the schema, and register the DataFrame as a table. 
schemaDf = spark.createDataFrame(df).cache() 
schemaDf.createOrReplaceTempView("tablename") 
6

С более свежими версиями Spark (как я полагаю, 1.4) это стало намного проще.Выражение sqlContext.read дает вам DataFrameReader экземпляр, с помощью метода .csv():

df = sqlContext.read.csv("/path/to/your.csv") 

Обратите внимание, что вы можете также указать, что файл CSV имеет заголовок, добавив ключевое слово аргумент header=True к .csv() вызова. Доступно несколько других опций и описано в ссылке выше.

0

Основываясь на ответе Араванда, но гораздо короче, например. :

lines = sc.textFile("/path/to/file").map(lambda x: x.split(",")) 
df = lines.toDF(["year", "month", "day", "count"]) 
2

для Pyspark, при условии, что первая строка файла CSV содержит заголовок

spark = SparkSession.builder.appName('chosenName').getOrCreate() 
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True) 
Смежные вопросы