2016-04-30 3 views
6

У меня есть некоторый код, как это:Спарк Scala Получить данные от rdd.foreachPartition

 println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass) 
     val lastRevs = distinctFileGidsRDD. 
     foreachPartition(iter => { 
      SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) 
      while(iter.hasNext) { 
      val item = iter.next() 
      //println(item(0)) 
      println("String: "+item(0).toString()) 
      val jsonStr = DB.readOnly { implicit session => 
       sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar". 
       map { resultSet => resultSet.string(1) }.single.apply() 
      } 
      println("\nJSON: "+jsonStr) 
      } 
     }) 
     println("\nEND Last Revs Class: "+ lastRevs.getClass) 

кода выводит (с тяжелыми правок) что-то вроде:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD 
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM 
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...) 
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw 
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...) 
... 
JSON: None() 
END Last Revs Class: void 

ВОПРОС 1: Как можно Я получаю значение lastRevs в полезном формате, таком как строка/нуль JSON или параметр Some/None?

ВОПРОС 2: Мое предпочтение: есть ли другой способ получить данные разделов, подобные RDD-формату (а не формат итератора)?

dstream.foreachRDD { (rdd, time) => 
    rdd.foreachPartition { partitionIterator => 
    val partitionId = TaskContext.get.partitionId() 
    val uniqueId = generateUniqueId(time.milliseconds, partitionId) 
    // use this uniqueId to transactionally commit the data in partitionIterator 
    } 
} 

из http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

ВОПРОС 3: Является ли метод получения данных, которые я использую в здравом уме метод (с учетом я следую по ссылке выше)? (Отложите в сторону тот факт, что это сейчас система JDBC scalikejdbc. Это будет ключ, хранилище значений другого типа, кроме этого прототипа.)

+0

Я не понял вопрос. 'lastRevs' должен быть' Unit', потому что '.forEachPartition' используется только для его побочного эффекта (функция T => Unit). Я думаю, вы хотите преобразовать данные, например, вместо «mapPartitions». Я хотел бы понять, какова главная цель здесь, потому что отдельные вопросы не имеют большого смысла (для меня) – maasg

+0

@maasg: Да. Это ответ, который я ищу - mapPartitions. Я нашел еще один пример в http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine. – codeaperature

ответ

4

Чтобы создать преобразование, использующее ресурсы, локальные для исполнителя (например, DB или сетевое соединение), вы должны использовать rdd.mapPartitions. Он позволяет инициализировать код локально исполнителю и использовать эти локальные ресурсы для обработки данных в разделе.

Код должен выглядеть следующим образом:

val lastRevs = distinctFileGidsRDD. 
     mapPartitions{iter => 
      SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) 
      iter.map{ element => 
      DB.readOnly { implicit session => 
       sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar" 
       .map { resultSet => resultSet.string(1) }.single.apply() 
      } 
      } 
     } 
+0

Вы имеете в виду, что он отличается от 'foreachPartition' тем, что использует ресурсы Executor вместо ресурсов Драйвера? То есть. код 'foreachPartition' выполняется на Driver, тогда как' mapPartitions' на Executor ... правильно? – lisak

+2

@lisak Нет. Оба 'foreachPartition' и' mapPartitions' позволят вам запускать код для исполнителей. Разница заключается в том, что 'foreachPartition' выполняет только побочные эффекты (например, записывать в db), а' mapPartitions' возвращает значение. Ключом этого вопроса является «как получить данные назад», поэтому «mapPartitions» - это путь. – maasg

Смежные вопросы