При использовании rdd.pipe(command)
ошибка подпроцесса не возвращается к мастеру. Например, если один делает:Получение ошибки подпроцесса ведущему при использовании rdd.pipe
sc.parallelize(Range(0, 10)).pipe("ls fileThatDontExist").collect
StackTrace тогда следующее:
java.lang.Exception: Subprocess exited with status 1
at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.rdd.PipedRDD$$anon$1.foreach(PipedRDD.scala:153)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at org.apache.spark.rdd.PipedRDD$$anon$1.to(PipedRDD.scala:153)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at org.apache.spark.rdd.PipedRDD$$anon$1.toBuffer(PipedRDD.scala:153)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at org.apache.spark.rdd.PipedRDD$$anon$1.toArray(PipedRDD.scala:153)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Нет упоминания здесь ошибки, которая произошла в команде, нужно искать в журналах ИСПОЛНИТЕЛЬ найти :
ls: fileThatDontExist: No such file or directory
Проверка кода PipedRDD, кажется, что можно было бы добавить больше информации при метании исключения (например, добавления содержимого proc.getErrorStream в сообщении):
val exitStatus = proc.waitFor()
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}
У меня есть два вопроса относительно этого. Есть ли причина не делать этого? Также кто-нибудь знает короткое время?
На данный момент я инкапсулировал выполнение процесса, чтобы при возникновении ошибки в процессе я возвращал 0 и выводил stderr процесса плюс маркер. Затем RDD отображается, а строки, содержащие маркер, генерируют исключение из stderr.
Спасибо за ваш ответ, должен ли я просто зарегистрировать JIRA для этого с возможным исправлением? – geoalgo
@geoalgo Да, вы можете создать JIRA в качестве улучшения, однако вы можете подождать, прежде чем делать любой запрос на растяжение, пока опытный разработчик Spark не столкнется с вашей проблемой, чтобы узнать, является ли это желаемым поведением или нет. Затем вы можете предложить исправление, если мы точно знаем. Я рассмотрю вашу проблему на JIRA! –