2016-10-17 1 views
0

У меня есть программа, которая: 1. читает некоторые данные 2. выполнять некоторые операции 3. Сохраняет файл CSV 4. Транспортировка этот файл FTPtoPandas() работают с Jupyter IPython ноутбуков, но не на представить - AWS ЭМИ

Я использую кластер EMR Amazon и PySpark для выполнения этой задачи.

Для шага 4 мне нужно сохранить CSV на локальном хранилище, а не на HDFS. Для этого я конвертирую Spark Dataframe в рамку данных Pandas.

фрагмент может быть:

from pyspark import SparkContext 
from pyspark.conf import SparkConf 
from pyspark.sql import HiveContext 
from pyspark.sql.types import StructType, StructField, LongType, StringType 
from pyspark.mllib.evaluation import * 
from pyspark.sql.functions import * 
from pyspark.sql import Row 
from time import time 
import timeit 
from datetime import datetime, timedelta 
import numpy as np 
import random as rand 
import pandas as pd 
from itertools import combinations, permutations 
from collections import defaultdict 
from ftplib import FTP 
from pyspark.sql import SQLContext 

conf = SparkConf().setAppName("Recommendation").set('spark.driver.memory', '8G').set('spark.executor.memory', '4G') 
sc = SparkContext(conf = conf) 
sqlContext = SQLContext(sc) 


readRdd = sqlContext.read.format('com.databricks.spark.csv').load('s3n://my-bucket/myfile' + path) 

df = readRdd.toPandas() # <---------- PROBLEM 
print('toPandas() completed') 

df.to_csv('./myFile') 

Проблема заключается в том:

, когда я запускаю этот код из Jpyter IPython ноутбука на том же кластере, он работает как шарм. Но когда я запускаю этот код, используя Спарк Submit, или добавить его в качестве шага к ОМУ, коду не будет работать на следующей строке:

df = readRdd.toPandas() 

«toPandas() завершено» никогда не печатается

В искре монитор работы, я вижу, что метод toPandas() запускается, но сразу после этого я получаю ошибку.

16/10/10 13:17:47 INFO YarnAllocator: Driver requested a total number of 1 executor(s). 
16/10/10 13:17:47 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:17:47 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:17:47 INFO TaskSetManager: Finished task 1462.0 in stage 17.0 (TID 10624) in 2089 ms on ip-172-31-38-70.eu-west-1.compute.internal (1515/1516) 
16/10/10 13:17:47 INFO TaskSetManager: Finished task 1485.0 in stage 17.0 (TID 10647) in 2059 ms on ip-172-31-38-70.eu-west-1.compute.internal (1516/1516) 
16/10/10 13:17:47 INFO YarnClusterScheduler: Removed TaskSet 17.0, whose tasks have all completed, from pool 
16/10/10 13:17:47 INFO DAGScheduler: ResultStage 17 (toPandas at 20161007_RecPipeline.py:182) finished in 12.609 s 
16/10/10 13:17:47 INFO DAGScheduler: Job 4 finished: toPandas at 20161007_RecPipeline.py:182, took 14.646644 s 
16/10/10 13:17:47 INFO YarnAllocator: Driver requested a total number of 0 executor(s). 
16/10/10 13:17:47 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:17:47 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:17:50 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:17:50 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:17:53 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:17:53 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:17:56 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:17:56 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:17:59 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:17:59 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:02 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:02 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:05 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:05 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:08 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:08 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:11 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:11 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:14 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:14 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:17 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:17 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:20 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:20 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:23 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:23 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:26 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:26 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:29 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:29 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:32 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:32 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:35 INFO YarnAllocator: Canceling requests for 0 executor containers 
16/10/10 13:18:35 WARN YarnAllocator: Expected to find pending requests, but found none. 
16/10/10 13:18:36 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 
16/10/10 13:18:36 INFO SparkContext: Invoking stop() from shutdown hook 
16/10/10 13:18:36 INFO SparkUI: Stopped Spark web UI at http://172.31.37.28:45777 
16/10/10 13:18:36 INFO YarnClusterSchedulerBackend: Shutting down all executors 
16/10/10 13:18:36 INFO YarnClusterSchedulerBackend: Asking each executor to shut down 
16/10/10 13:18:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/10/10 13:18:36 ERROR PythonRDD: Error while sending iterator 
java.net.SocketException: Connection reset 
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) 
    at java.net.SocketOutputStream.write(SocketOutputStream.java:159) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) 
    at java.io.DataOutputStream.write(DataOutputStream.java:107) 
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:648) 
    at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648) 
    at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:648) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250) 
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:649) 
16/10/10 13:18:36 ERROR ApplicationMaster: User application exited with status 143 
16/10/10 13:18:36 INFO ApplicationMaster: Final app status: FAILED, exitCode: 143, (reason: User application exited with status 143) 
16/10/10 13:18:36 INFO MemoryStore: MemoryStore cleared 
16/10/10 13:18:36 INFO BlockManager: BlockManager stopped 
16/10/10 13:18:36 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/10/10 13:18:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/10/10 13:18:36 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 
16/10/10 13:18:36 INFO SparkContext: Successfully stopped SparkContext 
16/10/10 13:18:36 INFO ShutdownHookManager: Shutdown hook called 
16/10/10 13:18:36 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt3/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-eab43d4e-7201-4bcb-8ee7-0e7b546e8fd8 
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1d88398f-ecd5-4d94-a42a-a406b3d566af/pyspark-34bec23c-a686-475d-85c9-9e9228b23239 
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1d88398f-ecd5-4d94-a42a-a406b3d566af 
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt3/yarn/usercache/hadoop/appcache/application_1476100925559_0002/container_1476100925559_0002_01_000001/tmp/spark-96cdee47-e3f3-45f4-8bc7-0df5928ef53c 
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt2/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-f6821ea1-6f37-4cc6-8bba-049ac0215786 
16/10/10 13:18:36 INFO ShutdownHookManager: Deleting directory /mnt1/yarn/usercache/hadoop/appcache/application_1476100925559_0002/spark-1827cae8-8a60-4b29-a4e5-368a8e1856fd 

Моя конфигурация кластера выглядит следующим образом:

spark-defaults spark.driver.maxResultSize 8G 
spark-defaults spark.driver.memory 8G 
spark-defaults spark.executor.memory 4G 

Искра Submit команды выглядит следующим образом:

spark-submit --deploy-mode cluster s3://my-bucket/myPython.py 

Это убивает меня! Кто-нибудь, пожалуйста, дайте мне какие-либо указания на то, в каком направлении я могу смотреть?

ответ

0

Здесь была проблема:

spark-submit --deploy-mode cluster s3://my-bucket/myPython.py 

В приведенной выше команде, режим развертывания устанавливается в кластер, который означает, что узел будет выбран из основных узлов, чтобы запустить программу драйвера. Так как разрешенная память драйверов - 8G, а основные узлы - меньшие физические экземпляры, у них всегда будет нехватка требуемой памяти.

Решение заключалось в развертывании в клиентском режиме, когда драйвер всегда запускался на главном узле (больший физический экземпляр с большим количеством ресурсов в моем случае) не исчерпывал бы требуемую память для всего процесса.

Поскольку это был выделенный кластер, это решение работало в моем случае.

В случае совместного кластера, где режим развертывания должен быть кластером, использование больших экземпляров должно работать.

Смежные вопросы