2017-01-06 2 views
1

У меня есть 2 файла с разными данными. Я пытаюсь прочитать их в 2 diff RDD &, а затем преобразовать их в Dataframe & вставить в улей. Я смог сделать этот нормальный код. Но искра обработала одно вычисление RDD за другим. Таким образом, 2-й ожидал, что 1-й пройдет, хотя у меня достаточно ресурсов в кластере. Я узнал, что вычисление RDD может быть распараллелировано с использованием методов Async. Поэтому я пытаюсь foreachPartitionAsync. Но это порождает ошибку, которую я не могу отлаживать дальше. Код примера:foreachPartitionAsync throws не может вызвать методы при остановленном SparkContext

object asynccode { 
    def main(args: Array[String]) = { 
    val conf = new SparkConf() 
     .setAppName("Parser") 
    val sc = new SparkContext(conf) 
    val hiveContext = new HiveContext(sc) 
    import hiveContext.implicits._ 

    val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt") 
    val test = ercs.map { k => 
     var rc = method1(k._2, k._1).toSeq 
     rc 
    } 
     .flatMap(identity) 
     .foreachPartitionAsync { f => 
     f.toSeq.toDF() 
      .write.insertInto("dbname.tablename1") 
     } 

    val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt") 
    val test2 = ercs2.map { k => 
     var rs = method2(k._2, k._1) 
     rs 
    } 
     .flatMap(identity) 
     .foreachPartitionAsync(f => f.toSeq.toDF() 
     .write.insertInto("dbname.tablename2") 

    ) 
    sc.stop() 
    } 

    def method1 = ??? 
    def method2 = ??? 
} 

Но оно выбрасывается ниже сообщения об ошибке. Если я удалю foreachPartitionAsync из кода, он отлично работает. Не уверен, что я делаю неправильно в отношении foreachPartitionAsync.

Не удалось выполнить сериализацию задачи: java.lang.IllegalStateException: Невозможно вызвать методы на остановленном SparkContext.

ОБНОВЛЕНИЕ: Спасибо за ваше предложение. Я обновил его, как показано ниже. Но теперь он ничего не делает. Искры веб-интерфейса, я не вижу, что ни одна сцена не запускается (ее пустая). Ни одна из моих таблиц не обновляется. Но работа завершена без ошибок.

val ercs = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file1.txt") 
    val test = ercs.map { k => 
     var rc = method1(k._2, k._1).toSeq 
     rc 
    } 
     .flatMap(identity) 
    toDF() 
    val f1 = Future(test.write.insertInto("dbname.tablename1")) 
     } 

    val ercs2 = sc.wholeTextFiles("hdfs://x.x.x.x:8020/file2.txt") 
    val test2 = ercs2.map { k => 
     var rs = method2(k._2, k._1) 
     rs 
    } 
     .flatMap(identity) 
     toSeq.toDF() 

val f2 = Future(test2.write.insertInto("dbname.tablename2")) 

    ) 
     Future.sequence(Seq(f1,f2)).onComplete(_ => sc.stop) 

Является ли что-нибудь, что я пропустил?

ответ

1

Вы останавливаете SparkContext, не дожидаясь окончания FutureActions. Вы должны ждать действий, чтобы закончить и остановить контекст в ответ:

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 
import scala.util.{Success, Failure} 

val f1: Future[Unit] = sc.range(1, 200).foreachAsync(_ => Thread.sleep(10)) 
val f2: Future[Unit] = sc.range(1, 200).foreachAsync(_ => Thread.sleep(10)) 

Future.sequence(Seq(f1, f2)).onComplete { 
    case Success(_) => sc.stop 
    case Failure(e) => 
    e.printStackTrace // or some other appropriate actions 
    sc.stop 
} 

Это, как говорится, что ваш код недействителен, даже если мы будем игнорировать асинхронное действия. Вы не используют распределенные структуры данных внутри действия или трансформации:

.foreachPartitionAsync(
    f => f.toSeq.toDF().write.insertInto("dbname.tablename2") 
) 

Если вы хотите асинхронные действия записи использовать Futures непосредственно:

val df1: Dataframe = ??? 
val df2: Dataframe = ??? 

val f1: Future[Unit] = Future(df1.write.insertInto("dbname.tablename1")) 
val f2: Future[Unit] = Future(df2.write.insertInto("dbname.tablename2")) 

и дождитесь завершения действий как s сверху.

Смежные вопросы