1

У меня есть DStream[String, Int] с парами подсчетов слов, например. ("hello" -> 10). Я хочу написать эти подсчеты в cassandra с указателем шага. Индекс инициализируется как var step = 1 и увеличивается с каждой обработкой микропакетов.слить искру dStream с переменной для сохраненияToCassandra()

В таблице Cassandra создан как:

CREATE TABLE wordcounts (
    step int, 
    word text, 
    count int, 
primary key (step, word) 
); 

При попытке записать поток в таблицу ...

stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count")) 

... Я получаю java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step.

Как я могу добавить индекс step к потоку, чтобы написать три столбца вместе?

Я использую искру 2.0.0, scala 2.11.8, cassandra 3.4.0 и spark-cassandra-connector 2.0.0-M3.

+0

Так как вы пытаетесь сохранить RDD к существующей таблице, вам необходимо включить все столбцы первичного ключа. – Shankar

+0

Как включить все столбцы первичного ключа в один и тот же оператор? 'a ++ b' работает для конкатенации списков, но' step ++ stream' терпит неудачу с несоответствием типа. – p3zo

+0

Поскольку C * - timeseries db, почему бы не создать временную метку вместо указателя шага? – Knight71

ответ

1

Как уже отмечалось, в то время как таблица Cassandra ожидает чего-то вида (Int, String, Int), то WordCount DStream имеет тип DStream[(String, Int)], поэтому для вызова saveToCassandra(...) работать, нам нужно DStream типа DStream[(Int, String, Int)].

Сложная часть этого вопроса заключается в том, как принести локальный счетчик, который по определению известен только в драйвере, до уровня DStream.

Для этого нам нужно сделать две вещи: «поднять» счетчик на распределенный уровень (в Spark, мы имеем в виду «RDD» или «DataFrame») и соединить это значение с существующими данными DStream.

Уходя от классического Streaming количество слов пример:

// Split each line into words 
val words = lines.flatMap(_.split(" ")) 

// Count each word in each batch 
val pairs = words.map(word => (word, 1)) 
val wordCounts = pairs.reduceByKey(_ + _) 

Мы добавить локальный вар провести подсчет microbatches:

@transient var batchCount = 0 

Он объявлен переходным, так что искры не попытайтесь закрыть его значение, когда мы объявляем преобразования, которые его используют.

Теперь хитрая бит: В контексте DStream transform Ц И А Ц, мы делаем RDD из этого одного var iable и присоединиться к нему с подстилающим РДОМ в DStream с использованием декартова продукта:

val batchWordCounts = wordCounts.transform{ rdd => 
    batchCount = batchCount + 1 

    val localCount = sparkContext.parallelize(Seq(batchCount)) 
    rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)} 
} 

(Примечание что простая функция map не будет работать, так как только начальное значение var iable будет захвачено и сериализовано. Поэтому было бы похоже, что счетчик никогда не увеличивался при просмотре данных DStream.

Наконец, теперь, что данные в нужной форме, сохраните его в Кассандре:

batchWordCounts.saveToCassandra("keyspace", "wordcounts") 
+0

Это кажется очень близким, но бросает 'java.io.NotSerializableException: объект org.apache.spark.streaming.dstream.MappedDStream сериализуется, возможно, как часть закрытия операции RDD. Это происходит из-за того, что объект DStream ссылается в пределах замыкания. Пожалуйста, перепишите операцию RDD внутри этого DStream, чтобы этого избежать. Это было соблюдено, чтобы избежать раздувания задач Spark с ненужными объектами. « – p3zo

+0

@ p3zo Я протестировал это. Может быть что-то в том, как вы адаптировали свой код? Это исключение, по-видимому, указывает на то, что в преобразовании «RDD» имеется ссылка «DStream»: * «Это связано с тем, что объект DStream ссылается из замыкания». * Это не так в коде. – maasg

+0

Справа - индекс моего шага по-прежнему был 'DStream' из заявления, которое я забыл удалить. Решение, которое вы дали, работает, как описано. – p3zo

-1

Поскольку вы пытаетесь сохранить RDD в существующую таблицу Cassandra, вам необходимо включить все значения столбца первичного ключа в RDD.

Что вы можете сделать, вы можете использовать приведенные ниже методы, чтобы сохранить RDD в новую таблицу.

saveAsCassandraTable or saveAsCassandraTableEx 

Для получения дополнительной информации см. this.

0

updateStateByKey функция обеспечивается искрой для глобальной обработки состояния. В этом случае он может выглядеть как следующее

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { 
    val newCount: Int = runningCount.getOrElse(0) + 1 
    Some(newCount) 
} 
val step = stream.updateStateByKey(updateFunction _) 

stream.join(step).map{case (key,(count, step)) => (step,key,count)}) 
    .saveToCassandra("keyspace", "wordcounts") 
Смежные вопросы