2016-06-29 2 views
4

Я искрой установлен в местном, с питоном, и при выполнении следующего кода:Чтение большого файла в выпуске Spark - питон

data=sc.textFile('C:\\Users\\xxxx\\Desktop\\train.csv') 
data.first() 

Я получаю следующее сообщение об ошибке:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-11-fca93c6aedeb> in <module>() 
----> 1 data.first() 

C:\Spark\python\pyspark\rdd.pyc in first(self) 
    1313   ValueError: RDD is empty 
    1314   """ 
-> 1315   rs = self.take(1) 
    1316   if rs: 
    1317    return rs[0] 

C:\Spark\python\pyspark\rdd.pyc in take(self, num) 
    1295 
    1296    p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) 
-> 1297    res = self.context.runJob(self, takeUpToNumLeft, p) 
    1298 
    1299    items += res 

C:\Spark\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 
    937   # SparkContext#runJob. 
    938   mappedRDD = rdd.mapPartitions(partitionFunc) 
--> 939   port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 
    940   return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 
    941 

C:\Anaconda2\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args) 
    1024   answer = self.gateway_client.send_command(command) 
    1025   return_value = get_return_value(
-> 1026    answer, self.gateway_client, self.target_id, self.name) 
    1027 
    1028   for temp_arg in temp_args: 

C:\Spark\python\pyspark\sql\utils.pyc in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

C:\Anaconda2\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 
    314     raise Py4JJavaError(
    315      "An error occurred while calling {0}{1}{2}.\n". 
--> 316      format(target_id, ".", name), value) 
    317    else: 
    318     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.net.SocketException: Connection reset by peer: socket write error 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(Unknown Source) 
    at java.net.SocketOutputStream.write(Unknown Source) 
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source) 
    at java.io.BufferedOutputStream.write(Unknown Source) 
    at java.io.DataOutputStream.write(Unknown Source) 
    at java.io.FilterOutputStream.write(Unknown Source) 
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622) 
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393) 
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.net.SocketException: Connection reset by peer: socket write error 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(Unknown Source) 
    at java.net.SocketOutputStream.write(Unknown Source) 
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source) 
    at java.io.BufferedOutputStream.write(Unknown Source) 
    at java.io.DataOutputStream.write(Unknown Source) 
    at java.io.FilterOutputStream.write(Unknown Source) 
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622) 
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

Я нахожусь убедитесь, что путь правильный, поскольку я попытался с другими файлами в той же папке. Я думаю, проблема связана с размером файла, который составляет около 3,4 гигабайта.

Любая помощь, пожалуйста?

+1

Это стоит попробовать использовать Scala, чтобы тест –

+1

Как сказал @RockieYang, вы можете запустить искровую скорлупу и попробуйте свой код в нем, чтобы понять, почему он терпит неудачу. –

+2

Вы уверены, что путь прав? Я прочитал еще большие файлы с искру без каких-либо проблем. – Himaprasoon

ответ

3

Если вы используете Spark в автономном режиме или режиме кластера, то spark.driver.memory и spark.executor.memory по умолчанию используется для 1 ГБ памяти. Вы можете добавить больше памяти как для driver, так и для executors, изменив эту конфигурацию при запуске своего ноутбука Jupyter или в файле Spark Conf. С этим вы должны будете прочитать CSV-файл 3.4GB, если у вас есть необходимая оперативная память на вашем компьютере.

+0

@ KartikKannaput поблагодарить за ответ, но можете ли вы показать мне, как это сделать? – Escachator

+0

Вы можете запустить '$ pyspark -executor-memory 6G -driver-memory 6G' из вашей оболочки. Это выделило бы «6 ГБ» ОЗУ для «spark.driver.memory» и «6 ГБ» ОЗУ для «spark.executor.memory». Вы можете выделить разные значения памяти в зависимости от конфигурации системы и требований приложения. – KartikKannapur

2

Как видно из spark documentation, максимальные значения по умолчанию для использования памяти установлены в 1 ГБ.

Вы можете увидеть по умолчанию в файле конфигурации свече, если в Linux может находиться в:

/etc/spark/conf/spark-defaults.conf 

под линиями

spark.driver.memory 
spark.executor.memory 

Но эта конфигурация не должна быть установлена ​​через SparkConf прямо в вашем приложении, потому что JVM драйвера уже начался в этот момент.

Когда lauching искрового Master/Slave вы Шоули добавить память набора arguments:

ex: ./sbin/start-master.sh --memory 2G 
Смежные вопросы