2016-08-05 3 views
1

У меня есть данные csv и создан фрейм данных pnadas с использованием read_csv и форсирование всех столбцов в виде строки. Затем, когда я пытаюсь создать световой блок данных из фреймворка pandas, я получаю сообщение об ошибке ниже.pandas dataframe для искробезопасного кадра данных «Невозможно объединить ошибку типа»

from pyspark import SparkContext 
    from pyspark.sql import SQLContext 
    from pyspark.sql.types import * 
    z=pd.read_csv("mydata.csv", dtype=str) 
    z.info() 
<class 'pandas.core.frame.DataFrame'> 
Int64Index: 74044003 entries, 0 to 74044002 
Data columns (total 12 columns): 
primaryid  object 
event_dt  object 
age    object 
age_cod   object 
age_grp   object 
sex    object 
occr_country object 
drug_seq  object 
drugname  object 
route   object 
outc_cod  object 
pt    object 

q= sqlContext.createDataFrame(z) 

File "<stdin>", line 1, in <module> 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 425, in createDataFrame 
rdd, schema = self._createFromLocal(data, schema) 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 341, in _createFromLocal 
struct = self._inferSchemaFromList(data) 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 241, in _inferSchemaFromList 
schema = reduce(_merge_type, map(_infer_schema, data)) 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 862, in _merge_type 
for f in a.fields] 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 856, in _merge_type 
raise TypeError("Can not merge type %s and %s" % (type(a), type(b))) 
TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'> 

Вот пример. Я загружаю общедоступные данные и создаю базу данных pandas, но искра не создает искровой фреймворк из фреймворка pandas.

 import pandas as pd 
     from pyspark import SparkContext 
     from pyspark.sql import SQLContext 
     from pyspark.sql.types import * 

     url ="http://www.nber.org/fda/faers/2016/demo2016q1.csv.zip" 

     import requests, zipfile, StringIO 
     r = requests.get(url, stream=True) 
     z = zipfile.ZipFile(StringIO.StringIO(r.content)) 
     z.extractall() 


     z=pd.read_csv("demo2016q1.csv") # creates pandas dataframe 

    Data_Frame = sqlContext.createDataFrame(z) 
+0

а) почему вы читаете данные локально только для распараллеливания. Это анти-шаблон. б) все столбцы, помеченные как «объект», предлагают некоторые гетерогенные данные, которые не поддерживаются Spark DataFrames. – zero323

+0

Вы правы, это не правильный способ читать локально, но поскольку другие параметры не удались, я надеялся, что фрейм данных из панд будет легко исправить. Как вы сказали, столбцы являются гетерогенными. Есть ли способ обхода, который я могу попробовать? –

+0

Можете ли вы предоставить [mcve]? Некоторые образцы игрушек, которые иллюстрируют, что там происходит ... – zero323

ответ

3

Короткий рассказ не зависит от вывода схемы. Это дорого и сложно в общем. В частности, некоторые столбцы (например, event_dt_num) в ваших данных имеют отсутствующие значения, которые заставляют Pandas представлять их как смешанные типы (строка для отсутствия отсутствует, NaN для отсутствующих значений).

Если у вас есть сомнения, то лучше читать все данные в виде строк и потом отбрасывать. Если у вас есть доступ к кодовой книге, вы всегда должны предоставлять схему, чтобы избежать проблем и снизить общую стоимость.

И, наконец, передача данных из драйвера является анти-шаблоном. Вы должны быть в состоянии прочитать эти данные напрямую, используя csv формат (Спарк 2.0.0+) или spark-csv библиотеку (Спарк 1.6 и ниже):

df = (spark.read.format("csv").options(header="true") 
    .load("/path/tp/demo2016q1.csv")) 

## root 
## |-- primaryid: string (nullable = true) 
## |-- caseid: string (nullable = true) 
## |-- caseversion: string (nullable = true) 
## |-- i_f_code: string (nullable = true) 
## |-- i_f_code_num: string (nullable = true) 
## ... 
## |-- to_mfr: string (nullable = true) 
## |-- occp_cod: string (nullable = true) 
## |-- reporter_country: string (nullable = true) 
## |-- occr_country: string (nullable = true) 
## |-- occp_cod_num: string (nullable = true) 

В данном случае добавления inferSchema="true" варианта должен работать как хорошо, но это еще лучше избежать этого. Вы также можете предоставить схему следующим образом:

from pyspark.sql.types import StructType 

schema = StructType.fromJson({'fields': [{'metadata': {}, 
    'name': 'primaryid', 
    'nullable': True, 
    'type': 'integer'}, 
    {'metadata': {}, 'name': 'caseid', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'caseversion', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'i_f_code', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'i_f_code_num', 
    'nullable': True, 
    'type': 'integer'}, 
    {'metadata': {}, 'name': 'event_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'event_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'mfr_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'mfr_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'init_fda_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 
    'name': 'init_fda_dt_num', 
    'nullable': True, 
    'type': 'string'}, 
    {'metadata': {}, 'name': 'fda_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'fda_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'rept_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'rept_cod_num', 
    'nullable': True, 
    'type': 'integer'}, 
    {'metadata': {}, 'name': 'auth_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'mfr_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'mfr_sndr', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'lit_ref', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'age', 'nullable': True, 'type': 'double'}, 
    {'metadata': {}, 'name': 'age_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'age_grp', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'age_grp_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'sex', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'e_sub', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'wt', 'nullable': True, 'type': 'double'}, 
    {'metadata': {}, 'name': 'wt_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'rept_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'rept_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'to_mfr', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'occp_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'reporter_country', 
    'nullable': True, 
    'type': 'string'}, 
    {'metadata': {}, 'name': 'occr_country', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'occp_cod_num', 
    'nullable': True, 
    'type': 'integer'}], 
'type': 'struct'}) 

непосредственно к читателю:

(spark.read.schema(schema).format("csv").options(header="true") 
    .load("/path/to/demo2016q1.csv")) 
+0

Благодарим вас за отличное объяснение. Фактически, я переключился на Pandas, потому что я не смог добавить библиотеку spark-csv в Jupyter. Я использую HDP 2.4 (Spark 1.6), и я установил Jupyter. Я загрузил spark-csv и commons-csv и указал путь к этим банкам в стартере ноутбука Jupyter, но когда я пытаюсь читать данные csv, он не говорит, что он не может получить библиотеки. Теперь я попробовал искровую раковину, и все в порядке. Вы когда-нибудь использовали библиотеку spark-csv в ноутбуке Jupyter (ipython)? –

+0

Конечно, эти методы должны работать просто отлично http://stackoverflow.com/a/35762809/1560062 – zero323

+0

Это работает как шарм! Спасибо, миллион. Я попробовал много других вариантов и потратил много времени. Ссылка, которую вы предоставили, помогла мне завершить ее за несколько минут. –

Смежные вопросы