2015-05-30 4 views
3

Я вставляю в HBase с помощью Spark, но он медленный. Для 60 000 записей требуется 2-3 минуты. У меня осталось около 10 миллионов записей.Spark insert to HBase slow

object WriteToHbase extends Serializable { 
    def main(args: Array[String]) { 
     val csvRows: RDD[Array[String] = ... 
     val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") 
     val usersRDD = csvRows.map(row => { 
      new UserTable(row(0), row(1), row(2), row(9), row(10), row(11)) 
     }) 
     processUsers(sc: SparkContext, usersRDD, dateFormatter) 
    }) 
} 

def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = { 

    usersRDD.foreachPartition(part => { 
     val conf = HBaseConfiguration.create() 
     val table = new HTable(conf, tablename) 

     part.foreach(userRow => { 
      val id = userRow.id 
      val name = userRow.name 
      val date1 = dateFormatter.parseDateTime(userRow.date1) 
      val hRow = new Put(Bytes.toBytes(id)) 
      hRow.add(cf, q, Bytes.toBytes(date1)) 
      hRow.add(cf, q, Bytes.toBytes(name)) 
      ... 
      table.put(hRow) 
     }) 
     table.flushCommits() 
     table.close() 
    }) 
} 

Я использую это в искру подать:

--num-executors 2 --driver-memory 2G --executor-memory 2G --executor-cores 2 

ответ

3

Это медленно, потому что реализация не использовать близость данных; часть Spark RDD на сервере может быть перенесена на серверный сервер HBase, работающий на другом сервере.

В настоящее время нет операции RRD от Spark для эффективного использования хранилища данных HBase.

+0

Если вы читаете HBase, Spark действительно может использовать данные. –

0

В Htable есть пакетный api, вы можете попробовать отправить запросы на отправку как 100-500 пакетов. Я думаю, что это может немного ускорить вас. Он возвращает индивидуальный результат для каждой операции, поэтому вы можете проверить неудачные попытки, если хотите.

public void batch(List<? extends Row> actions, Object[] results) 

https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/HTable.html#batch%28java.util.List,%20java.lang.Object[]%29

0

Вы должны смотреть на подход, при котором вы можете распространять входящие данные на Спарк Иова. В вашем текущем подходе foreachPartition вместо этого вы должны посмотреть на Трансформации, такие как map, mapToPair. Вам необходимо оценить весь жизненный цикл DAG и где вы можете сэкономить больше времени.

После этого на основе достигнутого параллелизма вы можете позвонить saveAsNewAPIHadoopDataset Действие искры для записи внутри HBase быстрее и параллельно. Как:

JavaPairRDD<ImmutableBytesWritable, Put> yourFinalRDD = yourRDD.<SparkTransformation>{()};  
yourFinalRDD.saveAsNewAPIHadoopDataset(yourHBaseConfiguration); 

Примечание: Где yourHBaseConfiguration будет одноэлементно и будет один объект на узле палачу, чтобы разделить между задачами

Пожалуйста, дайте мне знать, если это псевдо-код не работает для вас или найти какие-либо трудности на одном и том же.