1

Как скопировать columnfamily из одного кластера cassandra в другой?Как скопировать columnfamily из одного кластера cassandra в другой?

Сценарий:

  1. У меня есть IP только хозяин (как для источника и целевых кластеров), порта, имени Key_Space и column_family имени.
  2. Я уже создал метаданные в целевом кластере (только данные должны быть скопированы).
  3. Наиболее предпочтительно, я хочу, чтобы это выполнялось в одиночных/нескольких искровых заданиях (создавая DataFrame, а затем сохраняя его) с использованием соединителя JAVA для искры-кассандры.
  4. Умеренно предпочтительно, используя драйвер cassandra-java от datastax.
  5. Наименее предпочтительно, используя драйвер cassandra-jdbc и разъем spark-cassandra JAVA API.

Любая помощь будет оценена. Спасибо заранее.

ответ

2

Сделайте снимок на вашем существующем кластере и используйте массовый загрузчик в целевом кластере, нет необходимости в Spark (хотя вы можете сделать это именно так).

Здесь the docs о процедуре, но я расскажу вам о том, что вам нужно сделать на высоком уровне.

  1. Сделать снимок на существующем кластере
  2. Отправить (scp) снимок в узел на новом кластере
  3. Создать клон схемы (вы сказали, что вы уже сделали это)
  4. Используйте массовый загрузчик для пересылки sstables из моментального снимка в новый кластер.
+0

Привет .. У меня есть некоторые вопросы 1. Существуют ли все шаги, которые вы говорили, можно выполнить с помощью драйвера Кассандры Java (или Java)? 2. Как вы уже упоминали, как я могу достичь этого, используя искру? Заранее спасибо. –

3

После того, как мы приложили много усилий, мы нашли решение для этого. Это решение очень простое и безумное. Мы можем очень хорошо сделать это, используя искру, посмотрим, что мы сделали.

То, что мы делали (который не работал):

// Reading from first cassandra cluster 

dataframe = cassandraSQLContext.read().format("org.apache.spark.sql.cassandra").options("otherOptionsMap").option("spark.cassandra.connection.host","firstClusterIP").load(); 

// Writing to second cassandra cluster 

dataframe.write.mode("saveMode").options("otherOptionsMap").option("spark.cassandra.connection.host","secondClusterIP").save(); 

То, что работало отлично:

// Reading from first cassandra cluster 

dataframe = cassandraSQLContext.read().format("org.apache.spark.sql.cassandra").options("otherOptionsMap").option("spark_cassandra_connection_host","firstClusterIP").load(); 

// Writing to second cassandra cluster 

dataframe.write.mode("saveMode").options("otherOptionsMap")option("spark_cassandra_connection_host","secondClusterIP").save(); 

Да, вот правильно вы просто должны изменить период (.)для подчеркивания (_) для недвижимости в spark-cassandra host prope RTY. Я не знаю, является ли это ошибкой в ​​разъеме spark-cassandra.

+0

FYI, фактическая строка, которая позволяет это преобразование, https://github.com/datastax/spark-cassandra-connector/blob/9ee7e7acb2befb81e791e7178ea0da7b4ab52133/spark-cassandra-connector/src/main/scala/org/apache/spark/sql /cassandra/DefaultSource.scala#L138 –

+0

@MartinTapp Спасибо за ссылку! –

+0

От Spark 1.6, больше не нужно преобразовывать (.) В (_). – Ravikumar

2

Если вы используете spark-cassandra-connector, он предлагает поддержку для подключения нескольких кластеров по умолчанию.Соответствующий фрагмент кода ниже:

import com.datastax.spark.connector._ 
import com.datastax.spark.connector.cql._ 

import org.apache.spark.SparkContext 


def twoClusterExample (sc: SparkContext) = { 
    val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1")) 
    val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2")) 

    val rddFromClusterOne = { 
    // Sets connectorToClusterOne as default connection for everything in this code block 
    implicit val c = connectorToClusterOne 
    sc.cassandraTable("ks","tab") 
    } 

    { 
    //Sets connectorToClusterTwo as the default connection for everything in this code block 
    implicit val c = connectorToClusterTwo 
    rddFromClusterOne.saveToCassandra("ks","tab") 
    } 

} 

Here является соответствующая документация и пример фрагмента кода.

0

Java пример

Это будет работать

private static String sourceKeyspace = null; 
private static String targetKeyspace = null; 
private static String sourceHost = null; 
private static String targetHost = null; 
private static String sourceUsername = null; 
private static String targetUsername = null; 
private static String sourcePassword = null; 
private static String targetPassword = null; 
private static String sourceColumnFamily = null; 
private static String targetColumnFamily = null; 
private static String[] sourceColumns = null; 
// Set all above values according to your requirements 

private static JavaSparkContext sc; 
SparkConf sparkConf; 

sparkConf = new SparkConf(true).setAppName("Source Cassandra to Target Cassandra job"); 
sparkConf.setMaster(jobConfig.getString("spark.context-settings.master")); // Leave empty if you are running on local spark cluster 
sparkConf 
     .set("spark.cassandra.connection.host", sourceHost) 
     .set("spark.cassandra.input.fetch.size_in_rows", jobConfig.getString("spark.context-settings.fetchsize")) 
     .set("spark.cassandra.input.split.size_in_mb", jobConfig.getString("spark.context-settings.splitsize")) 
     .set("spark.cassandra.auth.username", sourceUsername) 
     .set("spark.cassandra.auth.password", sourcePassword) 
     .set("cassandra.username", sourceUsername) 
     .set("cassandra.password", sourcePassword) 
     .set("spark.cassandra.input.consistency.level", jobConfig.getString("spark.context-settings.spark.cassandra.consistency.level")) 
     .set("spark.executor.memory", jobConfig.getString("spark.context-settings.spark.executor.memory")) 
     .set("spark.driver.memory",jobConfig.getString("spark.context-settings.spark.driver.memory")) 
     .set("spark.executor.tasks", jobConfig.getString("spark.context-settings.spark.executor.tasks")) 
     .set("spark.mesos.coarse", "true") 
     .set("spark.cores.max", jobConfig.getString("spark.context-settings.spark.cores.max")) 
     .set("spark.scheduler.mode", "FAIR") 
     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
     sc = new JavaSparkContext(sparkConf); 

JavaRDD<Tuple2<String, Integer>> tupleRows = CassandraJavaUtil.javaFunctions(sc.sc()). 
cassandraTable(sourceKeyspace, sourceColumnFamily).select(sourceColumns) 
.map(row -> { 
    String authorName = row.getString("author_name"); 
    Integer numBooks = row.getString("num_books"); 
    return new Tuple2<>(authorName, numBooks); 
}) 

Основная часть с помощью com.datastax.spark.connector.cql.CassandraConnector и writerBuilder:

CassandraConnector targetConnection = CassandraConnector.apply(
    sparkConf.set("spark.cassandra.connection.host",targetHost) 
    .set("spark.cassandra.auth.username", targetUsername) 
    .set("spark.cassandra.auth.password", targetPassword) 
    .set("cassandra.username", targetUsername) 
    .set("cassandra.password", targetPassword) 
); 

CassandraJavaUtil.javaFunctions(tupleRows).writerBuilder(targetKeyspace, targetColumnFamily, mapTupleToRow(String.class, Integer.class)) 
.withConnector(targetConnection) 
.saveToCassandra(); 

sc.stop(); 

Viola! Вы сделали!

https://datastax-oss.atlassian.net/browse/SPARKC-340

Смежные вопросы