2016-12-18 1 views
5

У меня есть 3 файла журнала в моих папках. Каккак вызвать отдельную логику для имени файла diff в искровом

foldera = emplog,deptlog,companylog 
folderb = emplog,deptlog,companylog 
folderc = emplog,deptlog,companylog 

У меня есть 3 дифф файл программы Scala для извлечения данных из каждого из них.

employee.scala 
department.scala 
companylog.scala 

Каждый из них, как показано ниже.

Я хочу объединить все эти файлы и выполнить их параллельно.

package com.sample 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions 
import org.apache.spark.sql.SQLContext 
import org.apache.log4j.{Level, Logger} 

object logparser { 
    def main(args: Array[String]) = { 

     Logger.getLogger("org").setLevel(Level.OFF)  
     Logger.getLogger("akka").setLevel(Level.OFF) 
    //Start the Spark context 
    val conf = new SparkConf() 
     .setAppName("Parser") 
     .setMaster("local") 

     val sc = new SparkContext(conf) 
     val sqlContext= new SQLContext(sc) 

     val test = sc.wholeTextFiles("C:\\mkdir\\*\\*") 
     .map{l => 
      if(l._1.endsWith("emplog.txt")){ 
      empparser(l._2,sc,sqlContext) 
       } 

      l 
     } 
     .foreach{println} 
    } 

    def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = { 
    val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r 

     import sqlContext.implicits._ 
    val indrecs = emppattern.findAllIn(record) 
    .map{ line => 
     val emppattern(eid,ename) = line 

    (eid,ename) 
    } 
    .toSeq 
    .toDF("eid","ename") 

    .show() 


    } 
} 

Я пробовал использовать свой код в каждом методе внутри одного и того же объекта.

Теперь возникает 2 вопроса Q1. Когда я скомпилирую, я получаю

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext 
Serialization stack: 
    - object not serializable (class: org.apache.spark.SparkContext, value: [email protected]) 
    - field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext) 
    - object (class com.sample.logparser$$anonfun$1, <function1>) 

Насколько я знаю (только новичок) Контекст Spark не может быть сериализован. Если я не передаю параметр sc как параметр, я получаю Nullpointer Exception. Как я могу это решить?

Q2: После преобразования в DF я буду вставлять код таблицы hive в метод empparser. Как только это будет сделано, я не хочу ничего делать в своей основной. Но мой код карты не будет выполняться, если после этого у меня не будет действий. вот почему я имею foreacch println после этого. Есть ли способ преодолеть эту проблему?

+0

Если у вас только три, прочитайте каждый в RDD по очереди, сопоставьте это RDD с ключом имени файла, а затем «union» в одном. –

+1

Вы должны рассматривать логику Scala как классы/функции, а не «файлы scala». Тогда вам понадобится некоторый дискриминатор для сопоставления файла данных с некоторой логикой. Может быть, имя файла, может быть, содержимое файла?Я бы не знал, поскольку вопрос не достаточно конкретный. Попробуйте добавить логику обработки и образец данных к вопросу. Кроме того, я предлагаю вам сделать больше исследований перед публикацией. Есть много ресурсов, которые покрывают эту проблему. – maasg

+0

обновил вопрос с помощью логики. – user7264473

ответ

1

Чтобы попытаться ответить на вопрос, я собираюсь предположить, что результат обработки сотрудника или отдела приводит к тому же вид записи. Я бы ожидал, что это будет отличаться для каждого типа данных, поэтому я отдельно обрабатываю обработку разных видов записей, чтобы обеспечить эту «настройку с реальностью».

Сначала мы определяем запись case class и парсеры для разных видов или типов записей. (Здесь я копируя тот же осущ для простоты)

case class Record(id:String, name: String) 

val empParser: String => Option[Record] = { record => 
    val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r 
    record match { 
    case pattern(eid,ename) => Some(Record(eid, ename)) 
    case _ => None 
    } 
} 

val deptParser: String => Option[Record] = { record => 
    val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r 
    record match { 
    case pattern(eid,ename) => Some(Record(eid, ename)) 
    case _ => None 
    } 
} 

val companyParser: String => Option[Record] = { record => 
    val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r 
    record match { 
    case pattern(eid,ename) => Some(Record(eid, ename)) 
    case _ => None 
    } 
} 

Мы загружаем данные, используя wholeFiles:

val dataPath = "/.../data/wholefiles/*/*" 
val logFiles = sc.wholeTextFiles(dataPath) 

И потом, мы обрабатываем различные виды записей с помощью фильтрации файлов для получения необходимых файлов и применения парсера, который мы определили выше. Обратите внимание, как мы практически повторяем тот же процесс. Это можно отвлечь.

val empLogs = logFiles.filter{case (filename, content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> empParser(line))} 
val deptLogs = logFiles.filter{case (filename, content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> deptParser(line))} 
val compLogs = logFiles.filter{case (filename, content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> companyParser(line))} 

теперь преобразовать в DataFrame

val empDF = empLogs.toDF 

И мы могли бы сделать то же самое для других типов записей, а также.

В этом процессе есть много возможностей для уменьшения дублирования кода в зависимости от того, можем ли мы найти общие черты в процессах разных типов данных.

+0

Спасибо maasg за подробное объяснение – user7264473

Смежные вопросы