У меня есть 2 файла с разными данными. Я пытаюсь прочитать их в 2 diff RDD &, а затем преобразовать их в Dataframe & вставить в улей. Я смог сделать этот нормальный код. Но искра обработала одно вычисление RDD за другим. Таким образом, 2-й ожидал, что 1-й пройдет, хотя у меня достаточно ресурсов в кластере. Я узнал, что вычисление RDD может быть распараллелировано с использованием методов Async. Поэтому я пытаюсь foreachPartitionAsync. Но это порождает ошибку, которую я не могу отлаживать дальше. Код примера:foreachPartitionAsync throws не может вызвать методы при остановленном SparkContext
object asynccode {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("Parser")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt")
val test = ercs.map { k =>
var rc = method1(k._2, k._1).toSeq
rc
}
.flatMap(identity)
.foreachPartitionAsync { f =>
f.toSeq.toDF()
.write.insertInto("dbname.tablename1")
}
val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt")
val test2 = ercs2.map { k =>
var rs = method2(k._2, k._1)
rs
}
.flatMap(identity)
.foreachPartitionAsync(f => f.toSeq.toDF()
.write.insertInto("dbname.tablename2")
)
sc.stop()
}
def method1 = ???
def method2 = ???
}
Но оно выбрасывается ниже сообщения об ошибке. Если я удалю foreachPartitionAsync из кода, он отлично работает. Не уверен, что я делаю неправильно в отношении foreachPartitionAsync.
Не удалось выполнить сериализацию задачи: java.lang.IllegalStateException: Невозможно вызвать методы на остановленном SparkContext.
ОБНОВЛЕНИЕ: Спасибо за ваше предложение. Я обновил его, как показано ниже. Но теперь он ничего не делает. Искры веб-интерфейса, я не вижу, что ни одна сцена не запускается (ее пустая). Ни одна из моих таблиц не обновляется. Но работа завершена без ошибок.
val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt")
val test = ercs.map { k =>
var rc = method1(k._2, k._1).toSeq
rc
}
.flatMap(identity)
toDF()
val f1 = Future(test.write.insertInto("dbname.tablename1"))
}
val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt")
val test2 = ercs2.map { k =>
var rs = method2(k._2, k._1)
rs
}
.flatMap(identity)
toSeq.toDF()
val f2 = Future(test2.write.insertInto("dbname.tablename2"))
)
Future.sequence(Seq(f1,f2)).onComplete(_ => sc.stop)
Является ли что-нибудь, что я пропустил?