2016-03-14 2 views
0

У меня есть работа Spark, которая читается в паркетном файле с примерно 150.000.000 ключами/значениями.Лучший способ итерации/потока a Spark Dataframe

SparkConf conf = new SparkConf(); 
    conf.setAppName("Job"); 
    JavaSparkContext jsc = new JavaSparkContext(conf); 
    SQLContext sql = new SQLContext(jsc); 
    DataFrame df = sql.read().parquet(path); 

Моя цель состоит в том, чтобы написать пар ключ/значение для HBase, но я получаю проблемы кучи памяти, и я подозреваю, что это не самый лучший способ сделать это. Я хотел бы вывести вычисления в кластер, но я не могу понять, как пропустить сборку. Сейчас мой код выглядит следующим образом:

HBaseClient client = HbaseWrapper.initClient(); 
    df.collectAsList().stream().forEach(row -> { 
      try { 
       HbaseWrapper.putRows(client, row); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     }); 
    jsc.stop(); 

И я попытался собрать как список первых без потоковой передачи, а затем записать это, но это также принимает навсегда.

Любое понимание оценено.

ответ

2

Вы получаете ошибку OOM, потому что collectAsList отправляет все данные драйверу.

Чтобы решить проблему, вы можете использовать foreachPartitions, так что вы будете передавать в Hbase параллельно.

df.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() { 

     @Override 
     public void call(Iterator<Row> t) throws Exception { 
      try { 
       HBaseClient client = HbaseWrapper.initClient(); 
       while(t.hasNext()){ 
        Row row = t.next(); 
        HbaseWrapper.putRows(client, row); 
       } 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 

     } 
    }); 
+0

Awesome, спасибо! –

Смежные вопросы