2016-07-05 6 views
1

При использовании rdd.pipe(command) ошибка подпроцесса не возвращается к мастеру. Например, если один делает:Получение ошибки подпроцесса ведущему при использовании rdd.pipe

sc.parallelize(Range(0, 10)).pipe("ls fileThatDontExist").collect 

StackTrace тогда следующее:

java.lang.Exception: Subprocess exited with status 1 
    at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:161) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at org.apache.spark.rdd.PipedRDD$$anon$1.foreach(PipedRDD.scala:153) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at org.apache.spark.rdd.PipedRDD$$anon$1.to(PipedRDD.scala:153) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at org.apache.spark.rdd.PipedRDD$$anon$1.toBuffer(PipedRDD.scala:153) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at org.apache.spark.rdd.PipedRDD$$anon$1.toArray(PipedRDD.scala:153) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Нет упоминания здесь ошибки, которая произошла в команде, нужно искать в журналах ИСПОЛНИТЕЛЬ найти :

ls: fileThatDontExist: No such file or directory 

Проверка кода PipedRDD, кажется, что можно было бы добавить больше информации при метании исключения (например, добавления содержимого proc.getErrorStream в сообщении):

val exitStatus = proc.waitFor() 
if (exitStatus != 0) { 
    throw new Exception("Subprocess exited with status " + exitStatus) 
} 

У меня есть два вопроса относительно этого. Есть ли причина не делать этого? Также кто-нибудь знает короткое время?

На данный момент я инкапсулировал выполнение процесса, чтобы при возникновении ошибки в процессе я возвращал 0 и выводил stderr процесса плюс маркер. Затем RDD отображается, а строки, содержащие маркер, генерируют исключение из stderr.

ответ

1

На данный момент (Искра 1.6) текущее поведение print the stderr процесса, порожденного стандартным выводом ошибки исполнителя. Это, по-видимому, очень ранний выбор от самого собственного создателя Spark, Matei Zaharia, как вы можете видеть here, начиная с 2011 года. Я не вижу другого способа собрать stderr в текущей реализации.

Недавно было изменено на Spark 2.0 для распространения любого исключения из дочернего процесса на вызывающий процесс (см. SPARK-13793), а незначительное изменение было добавлено в исключение, возникшее, когда статус выхода отличается от 0 (см. это line).

Это может быть предложено в качестве улучшения, сообщите мне, если вам нужна помощь, чтобы предложить это как усовершенствование Spark.

+0

Спасибо за ваш ответ, должен ли я просто зарегистрировать JIRA для этого с возможным исправлением? – geoalgo

+0

@geoalgo Да, вы можете создать JIRA в качестве улучшения, однако вы можете подождать, прежде чем делать любой запрос на растяжение, пока опытный разработчик Spark не столкнется с вашей проблемой, чтобы узнать, является ли это желаемым поведением или нет. Затем вы можете предложить исправление, если мы точно знаем. Я рассмотрю вашу проблему на JIRA! –

Смежные вопросы