Я хочу выполнять 2 операции на одном RDD одновременно. Я написал код, подобный этомуИспользование Будущее внутри искрового задания
val conf = new SparkConf().setAppName("Foo")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
val inputPath = path
val rdd = sc.textFile(inputPath).cache()
val f1 = Future {
val schama1 = StructType(List(StructField("a", StringType, true), StructField("b", StringType, true), StructField("c", LongType, true)))
val rdd1 = rdd.map(func1).filter(_.isDefined).flatMap(x => x)
val df1 = sqlSc.createDataFrame(rdd, schema)
formSubmissionDataFrame.save("/foo/", "com.databricks.spark.avro")
0
}
val f2 = Future {
val schema2 = StructType(List(StructField("d", StringType, true), StructField("e", StringType, true)))
val rdd2 = rdd.map(func2).filter(_.isDefined).flatMap(x => x)
val df2 = sqlSc.createDataFrame(rdd2, schema2)
pageViewDataFrame.save("/bar/", "com.databricks.spark.avro")
0
}
val result = for {
r1 <- f1
r2 <- f2
} yield(r1 + r2)
result onSuccess{
case r => println("done")
}
Await.result(result, Duration.Inf)
Когда я запускаю этот код, я не вижу желаемого эффекта. в строке каталога много временных файлов и т. д., но foo ничего не имеет ... поэтому кажется, что два набора данных не создаются параллельно.
Это хорошая идея использовать будущее внутри искрового драйвера? я делаю это правильно? я должен делать что-то по-другому.
Шансы, что вы использовать все ресурсы на кластере, поэтому второе будущее не может закончить до первый делает. Возможно, попробуйте установить секционирование для RDD явно. –
Можете ли вы показать мне, как? Моя работа работала в течение последних 24 часов ... и много активности в одном из каталогов назначения ... но ничего на другом. –