Я пытаюсь фильтровать потоковые данные, и на основе значения столбца идентификаторов я хочу, чтобы сохранить данные в разных таблицахСпарк Streaming Фильтрация данных Streaming
У меня есть две таблицы
- testTable_odd (идентификатор, data1, data2)
- testTable_even (идентификатор, data1)
если значение ID нечетно, то я хочу, чтобы сохранить запись в таблице testTable_odd и если значение даже тогда Я хочу сохранить запись в testTable_even.
сложная часть здесь - мои две таблицы имеют разные столбцы. попробовал несколько способов, считал функции Scala с типом возврата. Либо [obj1, obj2], но я не смог добиться успеха, любые указатели были бы очень благодарны.
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import java.util.Formatter.DateTime
object StreamProcessor extends Serializable {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = args.toSet
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream
.map {
case (_, msg) =>
val result = msgParseMaster(msg)
(result.id, result.data)
}.foreachRDD(rdd => if (!rdd.isEmpty) rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data")))
}
}
ssc.start()
ssc.awaitTermination()
}
import org.json4s._
import org.json4s.native.JsonMethods._
case class wordCount(id: Long, data1: String, data2: String) extends serializable
implicit val formats = DefaultFormats
def msgParseMaster(msg: String): wordCount = {
val m = parse(msg).extract[wordCount]
return m
}
}
Спасибо. Я решил это по-разному с однократной фильтрацией. – Suresh
Эй, не возражаете ли вы опубликовать свое решение, чтобы вы могли отметить его как правильный ответ? И я тоже хочу это увидеть: P –
Конечно, это будет сделано сегодня. просто перечисляя процесс. Я выполнил следующие шаги. 1) извлек детали из необработанной строки JSON и с классом case 2) создал супер JSON (который имеет данные, необходимые для обоих критериев фильтра) 3) преобразовал JSON в DataFrame 4) выполнил предложение select и where на этом JSON 5) сохранить в cassandra – Suresh