Ниже код работает нормально, но требуется время, чтобы написать в cassandra, когда у нас огромный приток транзакций.Spark Streaming - использование foreachPartition и saveToCassandra для лучшего распараллеливания
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val parsedStream = stream.map(_._2).map(EmpParser.parse(_)).cache()
Ниже код записывается в cassandra в последовательном порядке и выполняется в одном исполнителе.
parsedStream.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept"))
Но я хотел распараллелить запись в cassandra, выполнив foreachPartition. Но я не вижу вариант saveToCassandra в foreachPartition.
parsedStream.foreachRDD{rdd =>
rdd.foreachPartition { partition =>
partition.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept"))
}
}
Любой способ добиться этого?
'saveToCassandra' определен на уровне' RDD'/'DStream', а' partition' - простой scala 'Iterator', поэтому вы не видите его определенного. –
ОК. Любая идея, как мы могли добиться письменности к кассандре в параллельных исполнениях всеми моими исполнителями? – JKPEAK
вы можете сделать 'parseStream.repartition (num) .saveToCassandra' – Knight71