Итак, я рассмотрел этот вопрос здесь, но предыдущие решения не сработали для меня. У меня есть DataFrame в этом форматеTypeError при преобразовании Pandas в Spark
mdf.head()
dbn boro bus
0 17K548 Brooklyn B41, B43, B44-SBS, B45, B48, B49, B69
1 09X543 Bronx Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A,...
4 28Q680 Queens Q25, Q46, Q65
6 14K474 Brooklyn B24, B43, B48, B60, Q54, Q59
Есть несколько больше столбцов, но я исключил их (линии метро и тесты). Когда я пытаюсь преобразовать этот DataFrame в Spark DataFrame, я получаю ошибку, которая есть.
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-30-1721be5c2987> in <module>()
----> 1 sparkdf = sqlc.createDataFrame(mdf)
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
423 rdd, schema = self._createFromRDD(data, schema, samplingRatio)
424 else:
--> 425 rdd, schema = self._createFromLocal(data, schema)
426 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
427 jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _createFromLocal(self, data, schema)
339
340 if schema is None or isinstance(schema, (list, tuple)):
--> 341 struct = self._inferSchemaFromList(data)
342 if isinstance(schema, (list, tuple)):
343 for i, name in enumerate(schema):
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _inferSchemaFromList(self, data)
239 warnings.warn("inferring schema from dict is deprecated,"
240 "please use pyspark.sql.Row instead")
--> 241 schema = reduce(_merge_type, map(_infer_schema, data))
242 if _has_nulltype(schema):
243 raise ValueError("Some of types cannot be determined after inferring")
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
860 nfs = dict((f.name, f.dataType) for f in b.fields)
861 fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType())))
--> 862 for f in a.fields]
863 names = set([f.name for f in fields])
864 for n in nfs:
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
854 elif type(a) is not type(b):
855 # TODO: type cast (such as int -> long)
--> 856 raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
857
858 # same type
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
Из-за того, что я прочитал, это может быть проблемой при обработке заголовков как данных. Насколько я понимаю, вы не можете удалить заголовки из DataFrame, так как я могу приступить к решению этой ошибки и преобразованию этого DataFrame в Spark?
Редактировать: Вот код, как я создал Pandas DF и проделал свой путь вокруг проблемы.
sqlc = SQLContext(sc)
df = pd.DataFrame(pd.read_csv('hsdir.csv', encoding = 'utf_8_sig'))
df = df[['dbn', 'boro', 'bus', 'subway', 'total_students']]
df1 = pd.DataFrame(pd.read_csv('sat_r.csv', encoding = 'utf_8_sig'))
df1 = df1.rename(columns = {'Num of SAT Test Takers': 'num_test_takers', 'SAT Critical Reading Avg. Score': 'read_avg', 'SAT Math Avg. Score' : 'math_avg', 'SAT Writing Avg. Score' : 'write_avg'})
mdf = pd.merge(df, df1, left_on = 'dbn', right_on = 'DBN', how = 'left')
mdf = mdf[pd.notnull(mdf['DBN'])]
mdf.to_csv('merged.csv', encoding = 'utf-8')
ndf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("merged.csv")
Последняя строка этого кода, загружая его из моей локальной машины в конечном итоге позволил мне преобразовать CSV правильно кадр данных по-прежнему остается, однако мой вопрос. Почему это не сработало в первую очередь?
Я получаю сообщение об ошибке 'AttributeError: объект«DataFrame»не имеет атрибута«map'' –
Oh , 'mdf' - это панда DataFrame? Я ошибочно предположил, что это Spark RDD. Вам нужно использовать панды? Или вы можете создать Spark RDD, а затем преобразовать его в Spark DataFrame, как указано выше? – user4601931
Итак, это проблема, с которой я сталкиваюсь.Если я загружу его как RDD, используя 'com.databricks.spark.csv', чтобы прочитать его как CSV, он полностью игнорирует столбец dbn и перемещает все один столбец влево. Я не уверен, как избежать этой проблемы, поэтому я загрузил ее через Pandas 'read_csv', который сохранил форматирование исходного CSV. –