2016-01-29 5 views
1

Я новичок в Scala и Spark, столкнувшись с некоторыми проблемами, написав код миграции от MR job до Spark. В основном мне нужна помощь для реорганизации/оптимизации кода, который я написал до сих пор, а также определения подхода к дальнейшему выполнению моей задачи.Оптимизация пакетной обработки Apache Spark

Мои вопросы, как показано ниже:

  1. Может заявление if (!usefulData.isEmpty()) для RDD пустой проверки осуществляться каким-либо иным образом, как внутри функции фильтра. Функции отображения или сокращения/агрегации следует вызывать, только если выход фильтра возвращает несколько строк (в основном, используя scala Option class как-то)

  2. Если вы видите код, чтобы сгенерировать каждый KPI, мне нужно снова выполнить операцию карты и снова на usefulData, который является фильтрованным и обработанным файлом данных RDD. Является ли это хорошим подходом, можем ли мы задействовать некоторые картографические операции в одном?

  3. Должен ли я использовать поле Spitted из входного файла в кортеж, имеющий 16 индексов, или есть ли другой лучший/эффективный способ?

  4. Если вы видите KPI, success_cnt, success_uu, failure_cnt and failure_uu, у них почти такая же обработка, я хочу создать для них общий метод. Но не могу это сделать, я не могу передать индекс кортежа в качестве ввода для отображения внутри функции

  5. Последнее, но самое важное, я хочу, чтобы последний подход к клубу всех этих ключевых показателей эффективности, как в случае с HashMap в MR и записывать его в контекст там (в основном сталкиваться с проблемой, потому что в MR карта и функция сокращения работают в одной строке из InputSplit, но здесь весь файл обрабатывается сразу с помощью преобразований, поэтому я не могу создать уровень класса переменные и записать в классе POJO, а затем агрегировать в контексте)

    Мой код, как показано ниже:


import org.apache.log4j.{Level, Logger} 
    import org.apache.spark.storage.StorageLevel 
    import org.apache.spark.{SparkConf, SparkContext} 
    import org.apache.spark.sql.SQLContext 
    import org.apache.spark.rdd._ 

    /** 
     * Sample Data : 1 Line 
     * 11685147 com.my.record.xxxxxx 2015-12-27  200 RECOVER <MacBookAir4,2> <Mac OS X;10.11.2;22222> <com.my.site>  0 com.my.os 1451248910234 1 10 
     * 
     * Parsed Data : 
     * 
     * l._1 : 1685147 
     * l._2 : com.my.record.xxxxxx 
     * l._3 : 2015-12-27 
     * l._4 : 
     * l._5 : 200 
     * l._6 : RECOVER 
     * l._7 : MacBookAir4 
     * l._8 : 22222 
     * l._9 : Mac OS X 
     * l._10 : 10.11.2 
     * l._11 : 
     * l._12 : 0/-6015 
     * l._13 : com.my.os 
     * l._14 : 1451248910234 
     * l._15 : 1 
     * l._16 : 10 
     * 
     **/ 

    object MySparkApp { 

     def main(args: Array[String]) { 

     Logger.getLogger("org").setLevel(Level.OFF) 
     Logger.getLogger("akka").setLevel(Level.OFF) 

     val conf = new SparkConf().setMaster("local").setAppName("My-Spark-App").set("spark.executor.memory", "2G").set("spark.driver.memory", "1G") 
     val sc = new SparkContext(conf) 

     val pattern = "[a-zA-Z]+".r 

     def parseClientInfo(field: String) = { 
      val fields = field.split(">") 

      val devType = fields(0).trim.split(",")(0).substring(1).trim 
      val devVersion = fields(1).trim.split(";")(2).trim 

      val osType = fields(1).trim.split(";")(0).substring(1).trim 
      val osVersion = fields(1).trim.split(";")(1).trim 

      (devType, devVersion, osType, osVersion) 
     } 

     def parseFile = { 
      val fileData = sc.textFile("file:///path_to_my_file") 
      val filteredData = fileData.map(_.split("\\t")).filter(l => l(5).toLowerCase.matches("recover")) 
      filteredData.map(l => 
      flattenTuple(l(0), l(1), l(2), l(3), l(4), l(5), parseClientInfo(l(6)), l(7), l(8), l(9), l(10), l(11), l(12))) 
     } 

     val usefulData = parseFile 
     //usefulData.take(1).foreach(println) 
     usefulData.persist(StorageLevel.DISK_ONLY) 


     if (!usefulData.isEmpty()) { 

      /** 
      * KPI : RECOVER.CNT 
      * AGG_CNT : _._16 
      */ 
      val recover_count = usefulData.map(_._16.toDouble).sum.toInt 
      println("recover_count : " + recover_count) 

      /** 
      * KPI : RECOVER.UniqueUser 
      * PRS_ID : _._1 
      */ 
      val recover_uu = usefulData.map(l => l._1).distinct.count 
      println("recover_uu : " + recover_uu) 

      val (success, failure) = usefulData.map(l => ((l._1, l._5), l._16)).partitionRDDBy(_._1._2 == "200") 

      /** 
      * KPI : RECOVER.SUCCESS.COUNT 
      * AGG_CNTs of SUCCESS : _._2 
      */ 
      val success_cnt = success.map(l => l._2.toDouble).sum.toInt 
      println("success_cnt : " + success_cnt) 

      /** 
      * KPI : RECOVER.SUCCESS.UniqueUser 
      * PRS_IDs of SUCCESS : _._1._1 
      */ 
      val success_uu = success.map(l => l._1._1).distinct.count.toInt 
      println("success_uu : " + success_uu) 

      /** 
      * KPI : RECOVER.FAILURE.COUNT 
      * AGG_CNTs of FAILURE : _._2 
      */ 
      val failure_cnt = failure.map(l => l._2.toDouble).sum.toInt 
      println("failure_cnt : " + failure_cnt) 

      /** 
      * KPI : RECOVER.FAILURE.UniqueUser 
      * PRS_IDs of FAILURE : _._1._1 
      */ 
      val failure_uu = failure.map(l => l._1._1).distinct.count.toInt 
      println("failure_uu : " + failure_uu) 

      /** 
      * KPI : RECOVER.FAILSUCCESS.UniqueUser 
      * PRS_IDs of SUCCESS and FAILURE : _._1 
      */ 
      val fail_success_uu = success.map(l => l._1).join(failure.map(l => l._1)).count 
      println(fail_success_uu) 

      /** 
      * KPI : RECOVER.PCFAILURE.COUNT 
      * RESPONSEs : l._5 
      * ERROR_CODEs : l._12 
      */ 
      val pc_failure_cnt = usefulData.filter(l => l._5 != "200" && l._12 == "-6015").map(l => l._16.toDouble).sum.toInt 
      println("pc_failure_cnt : " + pc_failure_cnt) 

      /** 
      * KPI : RECOVER.PCFAILURE.UniqueUser 
      * RESPONSEs : l._5 
      * ERROR_CODEs : l._12 
      */ 
      val pc_failure_uu = usefulData.filter(l => l._5 != "200" && l._12 == "-6015").map(l => l._1).distinct.count.toInt 
      println("pc_failure_uu : " + pc_failure_uu) 

      /** 
      * KPI : RECOVER.RECORD.COUNT 
      * LABELs : l._2 
      * PRS_IDs : _._16 
      */ 
      val record_cnt = usefulData.filter(l => l._2.contains("record")).map(l => l._16.toDouble).sum.toInt 
      println("record_cnt : " + record_cnt) 

      /** 
      * KPI : RECOVER.RECORD.SUCCESS.UniqueUser 
      * RESPONSEs : l._5 
      * LABELs : l._2 
      * PRS_IDs : _._1 
      */ 
      val record_success_uu = usefulData.filter(l => l._5 == "200" && l._2.contains("record")).map(l => l._1).distinct.count.toInt 
      println("record_success_uu : " + record_success_uu) 

      /** 
      * KPI : RECOVER.RECORD.FAILURE.UniqueUser 
      * RESPONSEs : l._5 
      * LABELs : l._2 
      * PRS_IDs : _._1 
      */ 
      val record_failure_uu = usefulData.filter(l => l._5 != "200" && l._2.contains("record")).map(l => l._1).distinct.count.toInt 
      println("record_failure_uu : " + record_failure_uu) 

      /** 
      * Group KPI 1 - DVC.VER.UniqueUser 
      * DEVICE_TYPEs : l._7 
      * PRS_IDs : l._1 
      */ 
      val group_ver_uu = usefulData.map(l => (l._7, l._1)).distinct.aggregateByKey(0)((acc, value) => acc + 1, _ + _) 
      println("group_ver_uu : ") 
      group_ver_uu.collect().foreach(println) 

      if (!group_ver_uu.isEmpty) { 

      /** 
       * Group KPI 2 - DVC.UniqueUser 
       * DEVICE_TYPE WITHOUT VERSION : l._1 
       * COUNT OF SIMILAR DEVICES IRRESPECTIVE OF VERSIONs : l._2 
       */ 
      val group_uu = group_ver_uu.map(l => ((pattern findFirstIn l._1).getOrElse(None), l._2)).aggregateByKey(0)((acc, value) => acc + value, _ + _) 
      println("group_uu") 
      group_uu.collect.foreach(println) 

      val (macData, otherDevices) = group_uu.partitionRDDBy(l => l._1.toString.toLowerCase.contains("mac")) 

      /** 
       * KPI : RECOVER.DVC.MAC.UniqueUser 
       */ 
      val mac_uu = macData.aggregate(0)((acc, value) => acc + value._2, _ + _) 
      println("mac_uu : " + mac_uu) 

      } 

      /** 
      * Group KPI 3 - OS.UniqueUser 
      * OS_TYPEs : l._9 
      * PRS_IDs : l._1 
      */ 
      val group_os_uu = usefulData.map(l => (l._9, l._1)).distinct.aggregateByKey(0)((acc, value) => acc + 1, _ + _) 
      println("group_os_uu") 
      group_os_uu.collect().foreach(println) 

      /** 
      * Group KPI 2 - OS.VER.UniqueUser 
      * OS_TYPEs : l._9 
      * OS_VERSIONS : l._10 
      * PRS_IDS : l._1 
      */ 
      val group_os_ver_uu = usefulData.map(l => ((l._9, l._10), l._1)).distinct.aggregateByKey(0)((acc, value) => acc + 1, _ + _) 
      println("group_os_ver_uu") 
      group_os_ver_uu.collect().foreach(println) 
     } 
     } 

     implicit class RDDOps[T](rdd: RDD[T]) { 
     def partitionRDDBy(f: T => Boolean): (RDD[T], RDD[T]) = { 
      val passes = rdd.filter(f) 
      val fails = rdd.filter(e => !f(e)) 
      (passes, fails) 
     } 
     } 

     implicit def flattenTuple[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P](t: (A, B, C, D, E, F, (G, H, I, J), K, L, M, N, O, P)): (A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P) = { 
     (t._1, t._2, t._3, t._4, t._5, t._6, t._7._1, t._7._2, t._7._3, t._7._4, t._8, t._9, t._10, t._11, t._12, t._13) 
     } 
    } 

ответ

0

Не уверен, что если я точно понял вопросы, но здесь это attemp:

  1. Вы можете использовать предикат: usefulData.count==0. Еще одно решение:

    rdd.mapPartitions(iter => Iterator(! iter.hasNext)).reduce(_&&_)

  2. Это полностью зависит от логики. В основном для каждого KPI вы создаете новую ветвь в DAG. В случае, если вы видите перекрывающуюся логику, вы можете определенно сохранить общие трансакции, чтобы произойти несколько раз.

  3. Не уверен, я тоже использую Tuple для до 9-10 полей. Хотя максимальный размер составляет 22.

  4. У меня была аналогичная проблема несколько дней назад. Так что сейчас самое время задать вопрос: SO Link

  5. Вы можете выходить на улицу. Ваша стратегия должна включать 1) создание нового списка/массива индексов ваших записей. 2) Выполняйте преобразования на вашем RDD, предпочтительно используя mapPartions. Результаты каждого KPI должны иметь такую ​​же мощность, как и входной RDD (используйте опции, чтобы избежать сбоев). 3.Используйте zip/zipWithIndex RDD KPI. Наверное, это то, что вы ищете.

Обновите/ответьте на эту тему, если найдете оставшиеся ответы.

+0

Спасибо за ваш быстрый ответ .. – Ashu

+0

1. Предикат, который вы упомянули, отлично работает. На самом деле, я не проверял ранее, если я использую это условие в функции фильтра, а затем выполняю карту после этого, выкинет ли это исключение, если после фильтра не будут найдены никакие элементы. Теперь я проверил после изменения кода, и агрегация также возвращает '0', если нет фильтрованных строк (а не исключение, как я боялся). Но учитывая, что мы используем 'usefulData' во всех RDD, я думаю, что было бы лучше сделать проверку один раз в условии' if', как я сделал, вместо каждого фильтра RDD. – Ashu

+0

В принципе, я имел в виду, что рассмотрим эти два RDD: 'val success_cnt = success.map (l => l._2.toDouble) .sum.toInt' и' val success_uu = success.map (l => l ._1._1) .distinct.count.toInt' – Ashu

Смежные вопросы