2015-04-20 2 views
7

Я пишу искрение, используя python. Однако мне нужно прочитать целую кучу файлов avro.Как читать файл Avro в PySpark

This - это самое близкое решение, которое я нашел в папке примеров Spark. Тем не менее, вам нужно отправить этот скрипт python с помощью spark-submit. В командной строке spark-submit вы можете указать класс-драйвер, в этом случае будет находиться весь ваш avrokey, класс avrovalue.

avro_rdd = sc.newAPIHadoopFile(
     path, 
     "org.apache.avro.mapreduce.AvroKeyInputFormat", 
     "org.apache.avro.mapred.AvroKey", 
     "org.apache.hadoop.io.NullWritable", 
     keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter", 
     conf=conf) 

В моем случае, мне нужно, чтобы запустить все, что в сценарии Python, я попытался создать переменную среды для включения в файл фляги, кросс палец Python добавит банку в путь, но очевидно, что оно не , это дает мне неожиданную ошибку класса.

os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar" 

Может ли кто-нибудь помочь мне, как читать файл avro в одном скрипте python?

ответ

3

Вы можете использовать библиотеку spark-avro. Сначала давайте создадим пример набора данных:

import avro.schema 
from avro.datafile import DataFileReader, DataFileWriter 

schema_string ='''{"namespace": "example.avro", 
"type": "record", 
"name": "KeyValue", 
"fields": [ 
    {"name": "key", "type": "string"}, 
    {"name": "value", "type": ["int", "null"]} 
] 
}''' 

schema = avro.schema.parse(schema_string) 

with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt: 
    wrt.append({"key": "foo", "value": -1}) 
    wrt.append({"key": "bar", "value": 1}) 

Читая его, используя spark-csv так просто, как это:

df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro") 
df.show() 

## +---+-----+ 
## |key|value| 
## +---+-----+ 
## |foo| -1| 
## |bar| 1| 
## +---+-----+ 
1

Первое решение требует установки стороннего Java зависимость, которая не является чем-то большинство Python разработчики довольны. Но вам не нужна внешняя библиотека, если все, что вы хотите сделать, - это проанализировать ваши файлы Avro с помощью данной схемы. Вы можете просто прочитать двоичные файлы и проанализировать их с помощью своего любимого пакета Avro python.

Например, это то, как вы можете загрузить Avro файлы с помощью fastavro:

from io import BytesIO 
import fastavro 

schema = { 
    ... 
} 

rdd = sc.binaryFiles("/path/to/dataset/*.avro")\ 
    .flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema)) 

print(rdd.collect())