Я новичок в Scala и Spark, столкнувшись с некоторыми проблемами, написав код миграции от MR job до Spark. В основном мне нужна помощь для реорганизации/оптимизации кода, который я написал до сих пор, а также определения подхода к дальнейшему выполнению моей задачи.Оптимизация пакетной обработки Apache Spark
Мои вопросы, как показано ниже:
Может заявление
if (!usefulData.isEmpty())
для RDD пустой проверки осуществляться каким-либо иным образом, как внутри функции фильтра. Функции отображения или сокращения/агрегации следует вызывать, только если выход фильтра возвращает несколько строк (в основном, используя scala Option class как-то)Если вы видите код, чтобы сгенерировать каждый KPI, мне нужно снова выполнить операцию карты и снова на
usefulData
, который является фильтрованным и обработанным файлом данных RDD. Является ли это хорошим подходом, можем ли мы задействовать некоторые картографические операции в одном?Должен ли я использовать поле Spitted из входного файла в кортеж, имеющий 16 индексов, или есть ли другой лучший/эффективный способ?
Если вы видите KPI,
success_cnt, success_uu, failure_cnt and failure_uu
, у них почти такая же обработка, я хочу создать для них общий метод. Но не могу это сделать, я не могу передать индекс кортежа в качестве ввода для отображения внутри функцииПоследнее, но самое важное, я хочу, чтобы последний подход к клубу всех этих ключевых показателей эффективности, как в случае с HashMap в MR и записывать его в контекст там (в основном сталкиваться с проблемой, потому что в MR карта и функция сокращения работают в одной строке из InputSplit, но здесь весь файл обрабатывается сразу с помощью преобразований, поэтому я не могу создать уровень класса переменные и записать в классе POJO, а затем агрегировать в контексте)
Мой код, как показано ниже:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd._
/**
* Sample Data : 1 Line
* 11685147 com.my.record.xxxxxx 2015-12-27 200 RECOVER <MacBookAir4,2> <Mac OS X;10.11.2;22222> <com.my.site> 0 com.my.os 1451248910234 1 10
*
* Parsed Data :
*
* l._1 : 1685147
* l._2 : com.my.record.xxxxxx
* l._3 : 2015-12-27
* l._4 :
* l._5 : 200
* l._6 : RECOVER
* l._7 : MacBookAir4
* l._8 : 22222
* l._9 : Mac OS X
* l._10 : 10.11.2
* l._11 :
* l._12 : 0/-6015
* l._13 : com.my.os
* l._14 : 1451248910234
* l._15 : 1
* l._16 : 10
*
**/
object MySparkApp {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local").setAppName("My-Spark-App").set("spark.executor.memory", "2G").set("spark.driver.memory", "1G")
val sc = new SparkContext(conf)
val pattern = "[a-zA-Z]+".r
def parseClientInfo(field: String) = {
val fields = field.split(">")
val devType = fields(0).trim.split(",")(0).substring(1).trim
val devVersion = fields(1).trim.split(";")(2).trim
val osType = fields(1).trim.split(";")(0).substring(1).trim
val osVersion = fields(1).trim.split(";")(1).trim
(devType, devVersion, osType, osVersion)
}
def parseFile = {
val fileData = sc.textFile("file:///path_to_my_file")
val filteredData = fileData.map(_.split("\\t")).filter(l => l(5).toLowerCase.matches("recover"))
filteredData.map(l =>
flattenTuple(l(0), l(1), l(2), l(3), l(4), l(5), parseClientInfo(l(6)), l(7), l(8), l(9), l(10), l(11), l(12)))
}
val usefulData = parseFile
//usefulData.take(1).foreach(println)
usefulData.persist(StorageLevel.DISK_ONLY)
if (!usefulData.isEmpty()) {
/**
* KPI : RECOVER.CNT
* AGG_CNT : _._16
*/
val recover_count = usefulData.map(_._16.toDouble).sum.toInt
println("recover_count : " + recover_count)
/**
* KPI : RECOVER.UniqueUser
* PRS_ID : _._1
*/
val recover_uu = usefulData.map(l => l._1).distinct.count
println("recover_uu : " + recover_uu)
val (success, failure) = usefulData.map(l => ((l._1, l._5), l._16)).partitionRDDBy(_._1._2 == "200")
/**
* KPI : RECOVER.SUCCESS.COUNT
* AGG_CNTs of SUCCESS : _._2
*/
val success_cnt = success.map(l => l._2.toDouble).sum.toInt
println("success_cnt : " + success_cnt)
/**
* KPI : RECOVER.SUCCESS.UniqueUser
* PRS_IDs of SUCCESS : _._1._1
*/
val success_uu = success.map(l => l._1._1).distinct.count.toInt
println("success_uu : " + success_uu)
/**
* KPI : RECOVER.FAILURE.COUNT
* AGG_CNTs of FAILURE : _._2
*/
val failure_cnt = failure.map(l => l._2.toDouble).sum.toInt
println("failure_cnt : " + failure_cnt)
/**
* KPI : RECOVER.FAILURE.UniqueUser
* PRS_IDs of FAILURE : _._1._1
*/
val failure_uu = failure.map(l => l._1._1).distinct.count.toInt
println("failure_uu : " + failure_uu)
/**
* KPI : RECOVER.FAILSUCCESS.UniqueUser
* PRS_IDs of SUCCESS and FAILURE : _._1
*/
val fail_success_uu = success.map(l => l._1).join(failure.map(l => l._1)).count
println(fail_success_uu)
/**
* KPI : RECOVER.PCFAILURE.COUNT
* RESPONSEs : l._5
* ERROR_CODEs : l._12
*/
val pc_failure_cnt = usefulData.filter(l => l._5 != "200" && l._12 == "-6015").map(l => l._16.toDouble).sum.toInt
println("pc_failure_cnt : " + pc_failure_cnt)
/**
* KPI : RECOVER.PCFAILURE.UniqueUser
* RESPONSEs : l._5
* ERROR_CODEs : l._12
*/
val pc_failure_uu = usefulData.filter(l => l._5 != "200" && l._12 == "-6015").map(l => l._1).distinct.count.toInt
println("pc_failure_uu : " + pc_failure_uu)
/**
* KPI : RECOVER.RECORD.COUNT
* LABELs : l._2
* PRS_IDs : _._16
*/
val record_cnt = usefulData.filter(l => l._2.contains("record")).map(l => l._16.toDouble).sum.toInt
println("record_cnt : " + record_cnt)
/**
* KPI : RECOVER.RECORD.SUCCESS.UniqueUser
* RESPONSEs : l._5
* LABELs : l._2
* PRS_IDs : _._1
*/
val record_success_uu = usefulData.filter(l => l._5 == "200" && l._2.contains("record")).map(l => l._1).distinct.count.toInt
println("record_success_uu : " + record_success_uu)
/**
* KPI : RECOVER.RECORD.FAILURE.UniqueUser
* RESPONSEs : l._5
* LABELs : l._2
* PRS_IDs : _._1
*/
val record_failure_uu = usefulData.filter(l => l._5 != "200" && l._2.contains("record")).map(l => l._1).distinct.count.toInt
println("record_failure_uu : " + record_failure_uu)
/**
* Group KPI 1 - DVC.VER.UniqueUser
* DEVICE_TYPEs : l._7
* PRS_IDs : l._1
*/
val group_ver_uu = usefulData.map(l => (l._7, l._1)).distinct.aggregateByKey(0)((acc, value) => acc + 1, _ + _)
println("group_ver_uu : ")
group_ver_uu.collect().foreach(println)
if (!group_ver_uu.isEmpty) {
/**
* Group KPI 2 - DVC.UniqueUser
* DEVICE_TYPE WITHOUT VERSION : l._1
* COUNT OF SIMILAR DEVICES IRRESPECTIVE OF VERSIONs : l._2
*/
val group_uu = group_ver_uu.map(l => ((pattern findFirstIn l._1).getOrElse(None), l._2)).aggregateByKey(0)((acc, value) => acc + value, _ + _)
println("group_uu")
group_uu.collect.foreach(println)
val (macData, otherDevices) = group_uu.partitionRDDBy(l => l._1.toString.toLowerCase.contains("mac"))
/**
* KPI : RECOVER.DVC.MAC.UniqueUser
*/
val mac_uu = macData.aggregate(0)((acc, value) => acc + value._2, _ + _)
println("mac_uu : " + mac_uu)
}
/**
* Group KPI 3 - OS.UniqueUser
* OS_TYPEs : l._9
* PRS_IDs : l._1
*/
val group_os_uu = usefulData.map(l => (l._9, l._1)).distinct.aggregateByKey(0)((acc, value) => acc + 1, _ + _)
println("group_os_uu")
group_os_uu.collect().foreach(println)
/**
* Group KPI 2 - OS.VER.UniqueUser
* OS_TYPEs : l._9
* OS_VERSIONS : l._10
* PRS_IDS : l._1
*/
val group_os_ver_uu = usefulData.map(l => ((l._9, l._10), l._1)).distinct.aggregateByKey(0)((acc, value) => acc + 1, _ + _)
println("group_os_ver_uu")
group_os_ver_uu.collect().foreach(println)
}
}
implicit class RDDOps[T](rdd: RDD[T]) {
def partitionRDDBy(f: T => Boolean): (RDD[T], RDD[T]) = {
val passes = rdd.filter(f)
val fails = rdd.filter(e => !f(e))
(passes, fails)
}
}
implicit def flattenTuple[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P](t: (A, B, C, D, E, F, (G, H, I, J), K, L, M, N, O, P)): (A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P) = {
(t._1, t._2, t._3, t._4, t._5, t._6, t._7._1, t._7._2, t._7._3, t._7._4, t._8, t._9, t._10, t._11, t._12, t._13)
}
}
Спасибо за ваш быстрый ответ .. – Ashu
1. Предикат, который вы упомянули, отлично работает. На самом деле, я не проверял ранее, если я использую это условие в функции фильтра, а затем выполняю карту после этого, выкинет ли это исключение, если после фильтра не будут найдены никакие элементы. Теперь я проверил после изменения кода, и агрегация также возвращает '0', если нет фильтрованных строк (а не исключение, как я боялся). Но учитывая, что мы используем 'usefulData' во всех RDD, я думаю, что было бы лучше сделать проверку один раз в условии' if', как я сделал, вместо каждого фильтра RDD. – Ashu
В принципе, я имел в виду, что рассмотрим эти два RDD: 'val success_cnt = success.map (l => l._2.toDouble) .sum.toInt' и' val success_uu = success.map (l => l ._1._1) .distinct.count.toInt' – Ashu