2015-08-11 2 views
1

У меня есть искровое приложение, которое считывает данные из одного кластера cassandra и после некоторых вычислений сохраняет данные в другой кластер cassandra. Я могу установить только 1 конфигурацию cassandra в sparkconf. но мне нужно подключиться к еще одному кластеру cassandra.Как подключиться к более 1 узлам cassandra с использованием разъема искры cassandra

Я вижу класс CassandraConnector, который используется для подключения к cassandra, но он использует объект CassandraConnectorConf для создания объекта, который принимает множество параметров, которые я не знаю.

Любая помощь будет полезна

ответ

2

Используйте следующий код:

SparkConf confForCassandra = new SparkConf().setAppName("ConnectToCassandra") 
       .setMaster("local[*]") 
       .set("spark.cassandra.connection.host", "<cassandraHost>"); 

CassandraConnector connector = CassandraConnector.apply(confForCassandra); 

javaFunctions(rdd).writerBuilder("keyspace", "table", mapToRow(Table.class)).withConnector(connector).saveToCassandra(); 
+0

Похоже, вы только подключаетесь к одному хозяину cassandra здесь. Где вы добавляете другой хост? Есть файл конфигурации где-нибудь? – Nadine

+0

Вы имеете в виду, что вы создаете 2 переменные SparkConf, по одному для каждого хоста cassandra, а затем две разные переменные соединителя? – Nadine

0

Если вы хотите подключиться к двум группам Cassandra с помощью Scala и свечи, вы можете использовать следующий код:

import com.datastax.spark.connector._ 
import com.datastax.spark.connector.cql._ 

import org.apache.spark.SparkContext 


def twoClusterExample (sc: SparkContext) = { 
    val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1")) 
    val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2")) 

    val rddFromClusterOne = { 
    // Sets connectorToClusterOne as default connection for everything in this code block 
    implicit val c = connectorToClusterOne 
    sc.cassandraTable("ks","tab") 
    } 

    { 
    //Sets connectorToClusterTwo as the default connection for everything in this code block 
    implicit val c = connectorToClusterTwo 
    rddFromClusterOne.saveToCassandra("ks","tab") 
    } 

} 

Оригинальный код был написан RusselSpitzer здесь: https://gist.github.com/RussellSpitzer/437f57dae4fd4bc4f32d

В настоящее время нет способа сделать это с помощью Python и Spark.

Смежные вопросы