2016-06-10 1 views
7

ЭкспериментSpark SQL: Почему два задания для одного запроса?

Я попробовал следующий фрагмент кода на Spark 1.6.1.

val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files 
soDF.registerTempTable("so") 
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/") 

Physical Plan является:

== Physical Plan == 
Sort [cnt#59L ASC], true, 0 
+- ConvertToUnsafe 
    +- Exchange rangepartitioning(cnt#59L ASC,200), None 
     +- ConvertToSafe 
     +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L]) 
      +- TungstenExchange hashpartitioning(dpHour#38,200), None 
       +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L]) 
        +- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder 

Для этого запроса я получил два задания: Job 9 и Job 10 enter image description here

Для Job 9, то DAG является:

enter image description here

Для Job 10, то DAG является:

enter image description here

Наблюдения

  1. По-видимому, есть два jobs для одного запроса.
  2. Stage-16 (обозначен как Stage-14 в Job 9) пропущен в Job 10.
  3. Stage-15 последние RDD[48], такие же как Stage-17 последние RDD[49]. Как? Я видел в журналах, что после Stage-15 исполнения, RDD[48] зарегистрирован в качестве RDD[49]
  4. Stage-17 показан в driver-logs, но никогда не был исполненных в Executors. На этапе driver-logs показано выполнение задачи, но когда я просмотрел журналы контейнеров Yarn, не было никаких доказательств получения task от Stage-17.

журналы, поддерживающие эти наблюдения (только driver-logs, я потерял executor журналы из-за поздней аварии). Видно, что перед началом Stage-17, RDD[49] зарегистрирован:

16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200) 
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200) 
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool 
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s 
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s 
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter 
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26 
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26) 
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions 
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26) 
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17) 
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17) 
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents 
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB) 
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB) 
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB) 
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006 
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26) 
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes) 

Вопросы

  1. Почему два Jobs? Каково намерение здесь, разбив DAG на два jobs?
  2. Job 10DAG выглядит полностью для выполнения запроса. Есть ли что-то конкретное Job 9?
  3. Почему Stage-17 не пропущен? Похоже, что создаются пустые tasks, есть ли у них какие-либо цели.
  4. Позже я попробовал еще один более простой запрос.Неожиданно было создано 3 Jobs.

    sqlContext.sql ("выберите dpHour из так приказа dphour"). Write.parquet ("/ out2 /")

ответ

4

При использовании высокого уровня dataframe/набора данных API-интерфейсов, вам оставьте его до Spark, чтобы определить план выполнения, включая работу/этап. Они зависят от многих факторов, таких как параллелизм выполнения, кэшированные/сохраняемые структуры данных и т. Д. В будущих версиях Spark, по мере увеличения сложности оптимизатора, вы можете видеть еще больше заданий на запрос, например, например, некоторые источники данных отбираются для параметризации оптимизация исполнения на основе затрат.

Например, я часто, но не всегда, видел, как создаются отдельные задания от обработки, которая включает в себя тасования.

В нижней строке, если вы используете API высокого уровня, если вам не нужно делать чрезвычайно подробную оптимизацию с огромными объемами данных, она редко платит, чтобы вникать в конкретный фрагмент. Затраты на запуск работы чрезвычайно низки по сравнению с обработкой/выходом.

Если, с другой стороны, вам интересны внутренние детали Spark, прочитайте код оптимизатора и включите в список рассылки разработчиков Spark.

+1

Любопытно, почему вторая стадия работы не может быть в первой работе? –

+1

Хороший вопрос. Возможно, это связано с промежуточным результатом. Важный вопрос: почему имеет значение, как DAG сопоставляется с этапами и рабочими местами? – Sim

+1

Да, сложно понять, как Spark делает это, сочетание доступных ресурсов, данных ... –

Смежные вопросы