0

Я пытаюсь фильтровать потоковые данные, и на основе значения столбца идентификаторов я хочу, чтобы сохранить данные в разных таблицахСпарк Streaming Фильтрация данных Streaming

У меня есть две таблицы

  1. testTable_odd (идентификатор, data1, data2)
  2. testTable_even (идентификатор, data1)

если значение ID нечетно, то я хочу, чтобы сохранить запись в таблице testTable_odd и если значение даже тогда Я хочу сохранить запись в testTable_even.

сложная часть здесь - мои две таблицы имеют разные столбцы. попробовал несколько способов, считал функции Scala с типом возврата. Либо [obj1, obj2], но я не смог добиться успеха, любые указатели были бы очень благодарны.

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SaveMode 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka.KafkaUtils 
import com.datastax.spark.connector._ 

import kafka.serializer.StringDecoder 
import org.apache.spark.rdd.RDD 
import com.datastax.spark.connector.SomeColumns 
import java.util.Formatter.DateTime 

object StreamProcessor extends Serializable { 
    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 

    val sc = new SparkContext(sparkConf) 

    val ssc = new StreamingContext(sc, Seconds(2)) 

    val sqlContext = new SQLContext(sc) 

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 

    val topics = args.toSet 

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics) 


     stream 
    .map { 
    case (_, msg) => 
     val result = msgParseMaster(msg) 
     (result.id, result.data) 
    }.foreachRDD(rdd => if (!rdd.isEmpty)  rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data"))) 

     } 
    } 

    ssc.start() 
    ssc.awaitTermination() 

    } 

    import org.json4s._ 
    import org.json4s.native.JsonMethods._ 
    case class wordCount(id: Long, data1: String, data2: String) extends serializable 
    implicit val formats = DefaultFormats 
    def msgParseMaster(msg: String): wordCount = { 
    val m = parse(msg).extract[wordCount] 
    return m 

    } 

} 

ответ

0

Я выполнил следующие шаги. 1) извлек детали из необработанной строки JSON и класса case 2) создал супер JSON (который имеет детали, необходимые для обоих критериев фильтра) 3) преобразованный, что JSON в DataFrame 4) выполнил предложение select и where on что JSON 5) Сохранить на Cassandra

1

Я думаю, что вы просто хотите использовать функцию фильтра дважды. Вы можете сделать что-то вроде

val evenstream = stream.map { case (_, msg) => 
    val result = msgParseMaster(msg) 
    (result.id, result.data) 
}.filter{ k => 
    k._1 % 2 == 0 
} 

evenstream.foreachRDD{rdd=> 
    //Do something with even stream 
} 

val oddstream = stream.map { case (_, msg) => 
    val result = msgParseMaster(msg) 
    (result.id, result.data) 
}.filter{ k => 
    k._1 % 2 == 1 
} 

oddstream.foreachRDD{rdd=> 
    //Do something with odd stream 
} 

Когда я сделал что-то похожее на это в проекте here я использовал функцию фильтра в два раза, если вы посмотрите вниз вблизи линии 191. В том, что я классификации и сохранения кортежи на основе их значения между 0 и 1, поэтому не стесняйтесь проверить это.

+0

Спасибо. Я решил это по-разному с однократной фильтрацией. – Suresh

+0

Эй, не возражаете ли вы опубликовать свое решение, чтобы вы могли отметить его как правильный ответ? И я тоже хочу это увидеть: P –

+0

Конечно, это будет сделано сегодня. просто перечисляя процесс. Я выполнил следующие шаги. 1) извлек детали из необработанной строки JSON и с классом case 2) создал супер JSON (который имеет данные, необходимые для обоих критериев фильтра) 3) преобразовал JSON в DataFrame 4) выполнил предложение select и where на этом JSON 5) сохранить в cassandra – Suresh

Смежные вопросы