У меня есть 3 файла журнала в моих папках. Каккак вызвать отдельную логику для имени файла diff в искровом
foldera = emplog,deptlog,companylog
folderb = emplog,deptlog,companylog
folderc = emplog,deptlog,companylog
У меня есть 3 дифф файл программы Scala для извлечения данных из каждого из них.
employee.scala
department.scala
companylog.scala
Каждый из них, как показано ниже.
Я хочу объединить все эти файлы и выполнить их параллельно.
package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}
object logparser {
def main(args: Array[String]) = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
//Start the Spark context
val conf = new SparkConf()
.setAppName("Parser")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext= new SQLContext(sc)
val test = sc.wholeTextFiles("C:\\mkdir\\*\\*")
.map{l =>
if(l._1.endsWith("emplog.txt")){
empparser(l._2,sc,sqlContext)
}
l
}
.foreach{println}
}
def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = {
val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r
import sqlContext.implicits._
val indrecs = emppattern.findAllIn(record)
.map{ line =>
val emppattern(eid,ename) = line
(eid,ename)
}
.toSeq
.toDF("eid","ename")
.show()
}
}
Я пробовал использовать свой код в каждом методе внутри одного и того же объекта.
Теперь возникает 2 вопроса Q1. Когда я скомпилирую, я получаю
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: [email protected])
- field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class com.sample.logparser$$anonfun$1, <function1>)
Насколько я знаю (только новичок) Контекст Spark не может быть сериализован. Если я не передаю параметр sc как параметр, я получаю Nullpointer Exception. Как я могу это решить?
Q2: После преобразования в DF я буду вставлять код таблицы hive в метод empparser. Как только это будет сделано, я не хочу ничего делать в своей основной. Но мой код карты не будет выполняться, если после этого у меня не будет действий. вот почему я имею foreacch println после этого. Есть ли способ преодолеть эту проблему?
Если у вас только три, прочитайте каждый в RDD по очереди, сопоставьте это RDD с ключом имени файла, а затем «union» в одном. –
Вы должны рассматривать логику Scala как классы/функции, а не «файлы scala». Тогда вам понадобится некоторый дискриминатор для сопоставления файла данных с некоторой логикой. Может быть, имя файла, может быть, содержимое файла?Я бы не знал, поскольку вопрос не достаточно конкретный. Попробуйте добавить логику обработки и образец данных к вопросу. Кроме того, я предлагаю вам сделать больше исследований перед публикацией. Есть много ресурсов, которые покрывают эту проблему. – maasg
обновил вопрос с помощью логики. – user7264473