ОБНОВЛЕНИЕ таблицы с counter column возможно через разъем искры-кассандра. Вам нужно будет использовать DataFrames и DataFrameWriter метод save с режимом "append" (или SaveMode.Append, если хотите). Проверьте код DataFrameWriter.scala.
Например, если таблица:
cqlsh:test> SELECT * FROM name_counter ;
name | surname | count
---------+---------+-------
John | Smith | 100
Zhang | Wei | 1000
Angelos | Papas | 10
Код должен выглядеть следующим образом:
val updateRdd = sc.parallelize(Seq(Row("John", "Smith", 1L),
Row("Zhang", "Wei", 2L),
Row("Angelos", "Papas", 3L)))
val tblStruct = new StructType(
Array(StructField("name", StringType, nullable = false),
StructField("surname", StringType, nullable = false),
StructField("count", LongType, nullable = false)))
val updateDf = sqlContext.createDataFrame(updateRdd, tblStruct)
updateDf.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "test", "table" -> "name_counter"))
.mode("append")
.save()
После UPDATE:
name | surname | count
---------+---------+-------
John | Smith | 101
Zhang | Wei | 1002
Angelos | Papas | 13
Преобразование DataFrame может быть проще, неявно конвертировать RDD to a DataFrame: import sqlContext.implicits._
и используя .toDF()
.
Проверьте полный код для этого игрушечного применения: https://github.com/kyrsideris/SparkUpdateCassandra/tree/master
Поскольку версии очень важно здесь, выше применяется к Scala 2.11.7, Спарк 1.5.1, искровой Кассандра разъем 1.5.0-RC1 -s_2.11, Cassandra 3.0.5. DataFrameWriter обозначен как @Experimental
с @since 1.4.0
.
Каков результат, если вы запускаете инструкцию SQL, которая генерируется вашим кодом непосредственно на Cassandra? – kerkero
@kerkero: Если я запустил его на cassandra, он либо обновит строку, если ключ уже присутствует, либо создаст новую строку для этого ключа, если ключ отсутствует. –
Определили ли вы столбец, соответствующий «a "в вашем примере как тип счетчика? – kerkero