У меня есть программа, которая: 1. читает некоторые данные 2. выполнять некоторые операции 3. Сохраняет файл CSV 4. Транспортировка этот файл FTPtoPandas() работают с Jupyter IPython ноутбуков, но не на представить - AWS ЭМИ
Я использую кластер EMR Amazon и PySpark для выполнения этой задачи.
Для шага 4 мне нужно сохранить CSV на локальном хранилище, а не на HDFS. Для этого я конвертирую Spark Dataframe в рамку данных Pandas.
фрагмент может быть:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.mllib.evaluation import *
from pyspark.sql.functions import *
from pyspark.sql import Row
from time import time
import timeit
from datetime import datetime, timedelta
import numpy as np
import random as rand
import pandas as pd
from itertools import combinations, permutations
from collections import defaultdict
from ftplib import FTP
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("Recommendation").set('spark.driver.memory', '8G').set('spark.executor.memory', '4G')
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
readRdd = sqlContext.read.format('com.databricks.spark.csv').load('s3n://my-bucket/myfile' + path)
df = readRdd.toPandas() # <---------- PROBLEM
print('toPandas() completed')
df.to_csv('./myFile')
Проблема заключается в том:
, когда я запускаю этот код из Jpyter IPython ноутбука на том же кластере, он работает как шарм. Но когда я запускаю этот код, используя Спарк Submit, или добавить его в качестве шага к ОМУ, коду не будет работать на следующей строке:
df = readRdd.toPandas()
«toPandas() завершено» никогда не печатается
В искре монитор работы, я вижу, что метод toPandas() запускается, но сразу после этого я получаю ошибку.
16/10/10 13:17:47 INFO YarnAllocator: Driver requested a total number of 1 executor(s).
16/10/10 13:17:47 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:47 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:47 INFO TaskSetManager: Finished task 1462.0 in stage 17.0 (TID 10624) in 2089 ms on ip-172-31-38-70.eu-west-1.compute.internal (1515/1516)
16/10/10 13:17:47 INFO TaskSetManager: Finished task 1485.0 in stage 17.0 (TID 10647) in 2059 ms on ip-172-31-38-70.eu-west-1.compute.internal (1516/1516)
16/10/10 13:17:47 INFO YarnClusterScheduler: Removed TaskSet 17.0, whose tasks have all completed, from pool
16/10/10 13:17:47 INFO DAGScheduler: ResultStage 17 (toPandas at 20161007_RecPipeline.py:182) finished in 12.609 s
16/10/10 13:17:47 INFO DAGScheduler: Job 4 finished: toPandas at 20161007_RecPipeline.py:182, took 14.646644 s
16/10/10 13:17:47 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
16/10/10 13:17:47 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:47 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:50 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:50 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:53 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:53 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:56 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:56 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:17:59 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:17:59 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:02 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:02 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:05 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:05 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:08 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:08 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:11 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:11 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:14 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:14 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:17 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:17 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:20 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:20 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:23 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:23 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:26 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:26 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:29 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:29 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:32 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:32 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:35 INFO YarnAllocator: Canceling requests for 0 executor containers
16/10/10 13:18:35 WARN YarnAllocator: Expected to find pending requests, but found none.
16/10/10 13:18:36 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/10/10 13:18:36 INFO SparkContext: Invoking stop() from shutdown hook
16/10/10 13:18:36 INFO SparkUI: Stopped Spark web UI at http://172.31.37.28:45777
16/10/10 13:18:36 INFO YarnClusterSchedulerBackend: Shutting down all executors
16/10/10 13:18:36 INFO YarnClusterSchedulerBackend: Asking each executor to shut down
16/10/10 13:18:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/10/10 13:18:36 ERROR PythonRDD: Error while sending iterator
java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118)
at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:648)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649)
16/10/10 13:18:36 ERROR ApplicationMaster: User application exited with status 143
16/10/10 13:18:36 INFO ApplicationMaster: Final app status: FAILED, exitCode: 143, (reason: User application exited with status 143)
16/10/10 13:18:36 INFO MemoryStore: MemoryStore cleared
16/10/10 13:18:36 INFO BlockManager: BlockManager stopped
16/10/10 13:18:36 INFO BlockManagerMaster: BlockManagerMaster stopped
16/10/10 13:18:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/10/10 13:18:36 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/10/10 13:18:36 INFO SparkContext: Successfully stopped SparkContext
16/10/10 13:18:36 INFO ShutdownHookManager: Shutdown hook called
16/10/10 13:18:36 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt3/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-eab43d4e-7201-4bcb-8ee7-0e7b546e8fd8
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1d88398f-ecd5-4d94-a42a-a406b3d566af/pyspark-34bec23c-a686-475d-85c9-9e9228b23239
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1d88398f-ecd5-4d94-a42a-a406b3d566af
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt3/yarn/usercache/hadoop/appcache/application_1476100925559_0002/container_1476100925559_0002_01_000001/tmp/spark-96cdee47-e3f3-45f4-8bc7-0df5928ef53c
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt2/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-f6821ea1-6f37-4cc6-8bba-049ac0215786
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt1/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1827cae8-8a60-4b29-a4e5-368a8e1856fd
Моя конфигурация кластера выглядит следующим образом:
spark-defaults spark.driver.maxResultSize 8G
spark-defaults spark.driver.memory 8G
spark-defaults spark.executor.memory 4G
Искра Submit команды выглядит следующим образом:
spark-submit --deploy-mode cluster s3://my-bucket/myPython.py
Это убивает меня! Кто-нибудь, пожалуйста, дайте мне какие-либо указания на то, в каком направлении я могу смотреть?