2016-03-13 2 views
3

Я хочу выполнять 2 операции на одном RDD одновременно. Я написал код, подобный этомуИспользование Будущее внутри искрового задания

val conf = new SparkConf().setAppName("Foo") 
val sc = new SparkContext(conf) 
val sqlSc = new SQLContext(sc) 
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true") 
val inputPath = path 
val rdd = sc.textFile(inputPath).cache() 

val f1 = Future { 
    val schama1 = StructType(List(StructField("a", StringType, true), StructField("b", StringType, true), StructField("c", LongType, true))) 
    val rdd1 = rdd.map(func1).filter(_.isDefined).flatMap(x => x) 
    val df1 = sqlSc.createDataFrame(rdd, schema) 
    formSubmissionDataFrame.save("/foo/", "com.databricks.spark.avro") 
    0 
} 

val f2 = Future { 
    val schema2 = StructType(List(StructField("d", StringType, true), StructField("e", StringType, true))) 
    val rdd2 = rdd.map(func2).filter(_.isDefined).flatMap(x => x) 
    val df2 = sqlSc.createDataFrame(rdd2, schema2) 
    pageViewDataFrame.save("/bar/", "com.databricks.spark.avro") 
    0 
} 

val result = for { 
    r1 <- f1 
    r2 <- f2 
} yield(r1 + r2) 

result onSuccess{ 
    case r => println("done") 
} 

Await.result(result, Duration.Inf) 

Когда я запускаю этот код, я не вижу желаемого эффекта. в строке каталога много временных файлов и т. д., но foo ничего не имеет ... поэтому кажется, что два набора данных не создаются параллельно.

Это хорошая идея использовать будущее внутри искрового драйвера? я делаю это правильно? я должен делать что-то по-другому.

+0

Шансы, что вы использовать все ресурсы на кластере, поэтому второе будущее не может закончить до первый делает. Возможно, попробуйте установить секционирование для RDD явно. –

+0

Можете ли вы показать мне, как? Моя работа работала в течение последних 24 часов ... и много активности в одном из каталогов назначения ... но ничего на другом. –

ответ

0

Для одновременного выполнения двух или более Spark JOBS (действий), Spark Context должен работать в режиме планировщика FAIR.

В программе драйвера для всех преобразований создается только граф зависимостей, однако фактическое выполнение происходит только при вызове действия. Обычно драйвер ждет, когда выполнение происходит через узлы, управляемые ведомыми устройствами Spark. В вашем случае Spark Master не начинает выполнять второе задание до окончания первого, потому что по умолчанию Spark Scheduling является FIFO.

Вы можете установить конф следующим образом, чтобы включить параллельное выполнение

val conf = new SparkConf().setMaster(...).setAppName(...) 
conf.set("spark.scheduler.mode", "FAIR") 
val sc = new SparkContext(conf) 

Для подробной информации посетите Spark Scheduling within an application

Смежные вопросы