Как уже отмечалось, в то время как таблица Cassandra ожидает чего-то вида (Int, String, Int)
, то WordCount DStream имеет тип DStream[(String, Int)]
, поэтому для вызова saveToCassandra(...)
работать, нам нужно DStream
типа DStream[(Int, String, Int)]
.
Сложная часть этого вопроса заключается в том, как принести локальный счетчик, который по определению известен только в драйвере, до уровня DStream.
Для этого нам нужно сделать две вещи: «поднять» счетчик на распределенный уровень (в Spark, мы имеем в виду «RDD» или «DataFrame») и соединить это значение с существующими данными DStream
.
Уходя от классического Streaming количество слов пример:
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
Мы добавить локальный вар провести подсчет microbatches:
@transient var batchCount = 0
Он объявлен переходным, так что искры не попытайтесь закрыть его значение, когда мы объявляем преобразования, которые его используют.
Теперь хитрая бит: В контексте DStream transform
Ц И А Ц, мы делаем RDD из этого одного var
iable и присоединиться к нему с подстилающим РДОМ в DStream с использованием декартова продукта:
val batchWordCounts = wordCounts.transform{ rdd =>
batchCount = batchCount + 1
val localCount = sparkContext.parallelize(Seq(batchCount))
rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}
(Примечание что простая функция map
не будет работать, так как только начальное значение var
iable будет захвачено и сериализовано. Поэтому было бы похоже, что счетчик никогда не увеличивался при просмотре данных DStream.
Наконец, теперь, что данные в нужной форме, сохраните его в Кассандре:
batchWordCounts.saveToCassandra("keyspace", "wordcounts")
Так как вы пытаетесь сохранить RDD к существующей таблице, вам необходимо включить все столбцы первичного ключа. – Shankar
Как включить все столбцы первичного ключа в один и тот же оператор? 'a ++ b' работает для конкатенации списков, но' step ++ stream' терпит неудачу с несоответствием типа. – p3zo
Поскольку C * - timeseries db, почему бы не создать временную метку вместо указателя шага? – Knight71