2016-04-13 7 views
13

Я новый, чтобы искриться & pyspark.pyspark EOFError по телефону

Я читаю небольшой файл csv (~ 40k) в dataframe.

from pyspark.sql import functions as F 
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv') 
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0)) 
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF() 

Я получаю некоторые странные ошибки, что не происходит каждый раз, но случается довольно регулярно

>>> df2.show(1) 
+--------------------+---------+ 
|   features| label| 
+--------------------+---------+ 
|[0.0,0.0,0.0,0.0,...|4700734.0| 
+--------------------+---------+ 
only showing top 1 row 

>>> df2.count() 
41999                   
>>> df2.show(1) 
+--------------------+---------+ 
|   features| label| 
+--------------------+---------+ 
|[0.0,0.0,0.0,0.0,...|4700734.0| 
+--------------------+---------+ 
only showing top 1 row 

>>> df2.count() 
41999                   
>>> df2.show(1) 
Traceback (most recent call last): 
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager 
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker  
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main 
    if read_int(infile) == SpecialLengths.END_OF_STREAM: 
    File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int 
    raise EOFError 
EOFError 
+--------------------+---------+ 
|   features| label| 
+--------------------+---------+ 
|[0.0,0.0,0.0,0.0,...|4700734.0| 
+--------------------+---------+ 
only showing top 1 row 

После того, что EOFError был поднят, я не буду видеть его снова, пока я не сделаю то, что требует взаимодействия с искровым сервером

Когда я вызываю df2.count(), он показывает, что приглашение [Stage xxx], что я подразумеваю под этим, идет к искровому серверу. Все, что срабатывает, похоже, в конечном итоге снова дает EOFError, когда я что-то делаю с df2.

Это не похоже на df (vs. df2), поэтому похоже, что это что-то происходит с линией df.map().

+1

Я слышал от списка искроберителей, что это сообщение немного чересчур многословно и может быть проигнорировано. – Pete

+0

Пит, можете ли вы указать нам на архивы? – rjurney

+0

Я искал список искровых пользователей и не могу найти ничего об этом в отношении EOFError :( – rjurney

ответ

0

Не могли бы вы попытаться сделать карту после преобразования dataframe в rdd. Вы подаете функцию карты на dataframe, а затем снова создавая dataframe из that.Syntax будет как

df.rdd.map().toDF() 

Пожалуйста, дайте мне знать, если это работает. Благодарю.

0

Я считаю, что вы используете Spark 2.x и выше. Ниже код должен создать свой dataframe из CSV:

df = spark.read.format("csv").option("header", "true").load("csvfile.csv") 

, то вы можете иметь код ниже:

df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0)) 

, а затем вы можете создать df2 без Роу и toDF()

Позвольте мне знать, если это работает, или если вы используете Spark 1.6 ... спасибо.

Смежные вопросы