2016-10-04 8 views
1

Итак, я рассмотрел этот вопрос здесь, но предыдущие решения не сработали для меня. У меня есть DataFrame в этом форматеTypeError при преобразовании Pandas в Spark

mdf.head() 
    dbn  boro  bus 
0 17K548 Brooklyn B41, B43, B44-SBS, B45, B48, B49, B69 
1 09X543 Bronx  Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A,... 
4 28Q680 Queens  Q25, Q46, Q65 
6 14K474 Brooklyn B24, B43, B48, B60, Q54, Q59 

Есть несколько больше столбцов, но я исключил их (линии метро и тесты). Когда я пытаюсь преобразовать этот DataFrame в Spark DataFrame, я получаю ошибку, которая есть.

--------------------------------------------------------------------------- 
TypeError         Traceback (most recent call last) 
<ipython-input-30-1721be5c2987> in <module>() 
----> 1 sparkdf = sqlc.createDataFrame(mdf) 

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio) 
    423    rdd, schema = self._createFromRDD(data, schema, samplingRatio) 
    424   else: 
--> 425    rdd, schema = self._createFromLocal(data, schema) 
    426   jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) 
    427   jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) 

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _createFromLocal(self, data, schema) 
    339 
    340   if schema is None or isinstance(schema, (list, tuple)): 
--> 341    struct = self._inferSchemaFromList(data) 
    342    if isinstance(schema, (list, tuple)): 
    343     for i, name in enumerate(schema): 

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _inferSchemaFromList(self, data) 
    239    warnings.warn("inferring schema from dict is deprecated," 
    240       "please use pyspark.sql.Row instead") 
--> 241   schema = reduce(_merge_type, map(_infer_schema, data)) 
    242   if _has_nulltype(schema): 
    243    raise ValueError("Some of types cannot be determined after inferring") 

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b) 
    860   nfs = dict((f.name, f.dataType) for f in b.fields) 
    861   fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()))) 
--> 862     for f in a.fields] 
    863   names = set([f.name for f in fields]) 
    864   for n in nfs: 

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b) 
    854  elif type(a) is not type(b): 
    855   # TODO: type cast (such as int -> long) 
--> 856   raise TypeError("Can not merge type %s and %s" % (type(a), type(b))) 
    857 
    858  # same type 

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'> 

Из-за того, что я прочитал, это может быть проблемой при обработке заголовков как данных. Насколько я понимаю, вы не можете удалить заголовки из DataFrame, так как я могу приступить к решению этой ошибки и преобразованию этого DataFrame в Spark?

Редактировать: Вот код, как я создал Pandas DF и проделал свой путь вокруг проблемы.

sqlc = SQLContext(sc) 
df = pd.DataFrame(pd.read_csv('hsdir.csv', encoding = 'utf_8_sig')) 
df = df[['dbn', 'boro', 'bus', 'subway', 'total_students']] 
df1 = pd.DataFrame(pd.read_csv('sat_r.csv', encoding = 'utf_8_sig')) 
df1 = df1.rename(columns = {'Num of SAT Test Takers': 'num_test_takers', 'SAT Critical Reading Avg. Score': 'read_avg', 'SAT Math Avg. Score' : 'math_avg', 'SAT Writing Avg. Score' : 'write_avg'}) 
mdf = pd.merge(df, df1, left_on = 'dbn', right_on = 'DBN', how = 'left') 
mdf = mdf[pd.notnull(mdf['DBN'])] 
mdf.to_csv('merged.csv', encoding = 'utf-8') 
ndf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("merged.csv") 

Последняя строка этого кода, загружая его из моей локальной машины в конечном итоге позволил мне преобразовать CSV правильно кадр данных по-прежнему остается, однако мой вопрос. Почему это не сработало в первую очередь?

ответ

1

Вы могли бы использовать отражение, чтобы вывести схему из РДА из Row объектов, например,

from pyspark.sql import Row 
mdfRows = mdf.map(lambda p: Row(dbn=p[0], boro=p[1], bus=p[2])) 
dfOut = sqlContext.createDataFrame(mdfRows) 

ли это достичь желаемого результата?

+0

Я получаю сообщение об ошибке 'AttributeError: объект«DataFrame»не имеет атрибута«map'' –

+0

Oh , 'mdf' - это панда DataFrame? Я ошибочно предположил, что это Spark RDD. Вам нужно использовать панды? Или вы можете создать Spark RDD, а затем преобразовать его в Spark DataFrame, как указано выше? – user4601931

+0

Итак, это проблема, с которой я сталкиваюсь.Если я загружу его как RDD, используя 'com.databricks.spark.csv', чтобы прочитать его как CSV, он полностью игнорирует столбец dbn и перемещает все один столбец влево. Я не уверен, как избежать этой проблемы, поэтому я загрузил ее через Pandas 'read_csv', который сохранил форматирование исходного CSV. –

2

У меня была такая же проблема, и я смог проследить ее до одной записи, которая имела значение длины 0 (или пустое). Команда _inferScheme запускается в каждой строке блока данных и определяет типы. По умолчанию предполагается, что пустым значением является Double, а другое - String. Эти два типа не могут быть объединены командой _merge_type. Вопрос был подан https://issues.apache.org/jira/browse/SPARK-18178, но лучший способ - это, вероятно, поставка схемы в команду createDataFrame.

Код ниже воспроизводит проблему в PySpark 2.0

import pandas as pd 
from io import StringIO 
test_df = pd.read_csv(StringIO(',Scan Options\n15,SAT2\n16,\n')) 
sqlContext.createDataFrame(test_df).registerTempTable('Test') 
o_qry = sqlContext.sql("SELECT * FROM Test LIMIT 1") 
o_qry.first() 
0

Вы можете попробовать это так:

def create_spark_dataframe(file_name): 
    """ 
    will return the spark dataframe input pandas dataframe 
    """ 
    pandas_data_frame = pd.read_csv(file_name, converters= {"PRODUCT": str}) 
    for col in pandas_data_frame.columns: 
    if ((pandas_data_frame[col].dtypes != np.int64) & 
     (pandas_data_frame[col].dtypes != np.float64)): 
    pandas_data_frame[col] = pandas_data_frame[col].fillna('') 

    spark_data_frame = sqlContext.createDataFrame(pandas_data_frame) 
    return spark_data_frame 

Это позволит решить вашу проблему.

0

Проблема: pandas default np.nan (Не число) для пустой строки, что создает путаницу в схеме при преобразовании в spark.df.

Базовый подход конвертировать np.nan в None, что позволит ему работать

К сожалению, панды не позволяет вам fillna с None. Таким образом, вы можете цикл следующие для столбцов с не номером схемы

new_df_1 = new_df_1.fillna('xyzpronaa') 
new_df_1 = new_df_1.apply(lambda x: None if x == 'xyzpronaa' else x) 

Затем display(sqlContext.createDataFrame(new_df_1)) будет работать нормально

+0

Я был бы рад, если бы кто-нибудь мог предложить мне прямой способ конвертировать nan в None –

Смежные вопросы