3

я столкнулся вопрос с разъемом искрой Cassandra на Скале при обновлении таблицы в моем ключевом пространствеUPDATE Cassandra таблица с помощью разъема искры Cassandra

Вот мой кусок кода

val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE + 
         " SET a= a + " + b + " WHERE x=" + 
         x + " AND y=" + y + 
         " AND z=" + x 

println(query) 

val KeySpace = new CassandraSQLContext(sparkContext) 
KeySpace.setKeyspace(KEYSPACE) 

hourUniqueKeySpace.sql(query) 

Когда я выполняю это код, я получаю такую ​​ошибку

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found 

Любая идея, почему это происходит? Как я могу это исправить?

+0

Каков результат, если вы запускаете инструкцию SQL, которая генерируется вашим кодом непосредственно на Cassandra? – kerkero

+0

@kerkero: Если я запустил его на cassandra, он либо обновит строку, если ключ уже присутствует, либо создаст новую строку для этого ключа, если ключ отсутствует. –

+0

Определили ли вы столбец, соответствующий «a "в вашем примере как тип счетчика? – kerkero

ответ

2

Я считаю, что вы не можете обновить изначально через разъем SPARK. Смотрите documention:

«Поведение по умолчанию Спарка Cassandra Connector является перезапись коллекции при вставке в таблицу Cassandra Чтобы изменить это поведение, вы можете задать пользовательский картограф с инструкциями о том, как вы хотели бы коллекцию быть. обрабатывали."

Таким образом, вы захотите на самом деле ЗАПИСАТЬ новую запись с существующим ключом.

3

ОБНОВЛЕНИЕ таблицы с counter column возможно через разъем искры-кассандра. Вам нужно будет использовать DataFrames и DataFrameWriter метод save с режимом "append" (или SaveMode.Append, если хотите). Проверьте код DataFrameWriter.scala.

Например, если таблица:

cqlsh:test> SELECT * FROM name_counter ; 

name | surname | count 
---------+---------+------- 
    John | Smith | 100 
    Zhang |  Wei | 1000 
Angelos | Papas | 10 

Код должен выглядеть следующим образом:

val updateRdd = sc.parallelize(Seq(Row("John", "Smith", 1L), 
            Row("Zhang", "Wei", 2L), 
            Row("Angelos", "Papas", 3L))) 

val tblStruct = new StructType(
    Array(StructField("name", StringType, nullable = false), 
      StructField("surname", StringType, nullable = false), 
      StructField("count", LongType, nullable = false))) 

val updateDf = sqlContext.createDataFrame(updateRdd, tblStruct) 

updateDf.write.format("org.apache.spark.sql.cassandra") 
.options(Map("keyspace" -> "test", "table" -> "name_counter")) 
.mode("append") 
.save() 

После UPDATE:

name | surname | count 
---------+---------+------- 
    John | Smith | 101 
    Zhang |  Wei | 1002 
Angelos | Papas | 13 

Преобразование DataFrame может быть проще, неявно конвертировать RDD to a DataFrame: import sqlContext.implicits._ и используя .toDF().

Проверьте полный код для этого игрушечного применения: https://github.com/kyrsideris/SparkUpdateCassandra/tree/master

Поскольку версии очень важно здесь, выше применяется к Scala 2.11.7, Спарк 1.5.1, искровой Кассандра разъем 1.5.0-RC1 -s_2.11, Cassandra 3.0.5. DataFrameWriter обозначен как @Experimental с @since 1.4.0.

+0

Как я могу вставить новую запись или удалить с помощью dataframe? –

Смежные вопросы