2015-04-28 3 views
0

Я пытаюсь запустить код, предоставленный в следующей ссылке: http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-javaCassandra Интеграция с Apache Спарк

код приведен ниже:

import java.io.Serializable; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 

import com.datastax.driver.core.Session; 
import com.datastax.spark.connector.cql.CassandraConnector; 

public class JavaDemo implements Serializable { 
    private transient SparkConf conf; 

    private JavaDemo(SparkConf conf) { 
     this.conf = conf; 
    } 

    private void run() { 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     generateData(sc); 
     compute(sc); 
     showResults(sc); 
     sc.stop(); 
    } 

    private void generateData(JavaSparkContext sc) { 

     CassandraConnector connector = CassandraConnector.apply(sc.getConf()); 

     try (Session session = connector.openSession()) { 
      session.execute("DROP KEYSPACE IF EXISTS java_api"); 
      session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"); 
      session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)"); 
      session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)"); 
      session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)"); 
     } 
    } 

    private void compute(JavaSparkContext sc) { 
    } 

    private void showResults(JavaSparkContext sc) { 
    } 

    public static void main(String[] args) { 

     SparkConf conf = new SparkConf(); 
     conf.setAppName("Java API demo"); 
     conf.setMaster("local"); 
     conf.set("spark.cassandra.connection.host", "XX.XX.XX.XX"); 

     JavaDemo app = new JavaDemo(conf); 
     app.run(); 

    } 
} 

Я получаю следующее сообщение об ошибке:

Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {<Cassandra IP>}:9042< 
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:176) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162) 
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) 
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) 
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73) 
    at JavaDemo.generateData(JavaDemo.java:28) 
    at JavaDemo.run(JavaDemo.java:18) 
    at JavaDemo.main(JavaDemo.java:52) 
Caused by: java.lang.IllegalArgumentException: Contact points contain multiple data centers: 
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.init(LocalNodeFirstLoadBalancingPolicy.scala:47) 
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1024) 
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:270) 
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:169) 

Есть ли что-то, что я могу с этим поделать. Я пробовал использовать Java-соединение с Cassandra, который, кажется, работает нормально

+0

за исключением выглядит довольно ясно мне: "java.lang.IllegalArgumentException: Точки контакта содержат множество данных центры "- измените ваш' conf.set ("spark.cassandra.connection.host", "XX.XX.XX.XX"); 'справа' YY.YY.YY.YY' – maasg

+0

Спасибо за ответ. Я уже пробовал переходить на другой рабочий IP-адрес, который соединяется с помощью простого Java-драйвера Cassandra. Получение такой же ошибки. Нужно ли нам изменить конфигурацию для решения: вызвано: java.lang.IllegalArgumentException: контактные точки содержат несколько центров обработки данных –

+0

Интересно ... похоже, что это конфиг, возвращаемый из cassandra, который дает эту ошибку. Является ли ваш кластер мульти DC? Попробуйте задать этот вопрос на https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user – maasg

ответ

0

Преобразовал проект в проект Maven и попытался запустить, это как-то разрешило проблему. Используемая версия Cassandra: 2.1.2. Я предполагаю, что это вопрос версии несоответствие содержания

pom.xml представлены ниже

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
 
    <modelVersion>4.0.0</modelVersion> 
 
    <groupId>com.mycompany.app</groupId> 
 
    <artifactId>my-app</artifactId> 
 
    <version>1.0-SNAPSHOT</version> 
 
    <packaging>jar</packaging> 
 

 
    <name>Maven Quick Start Archetype</name> 
 
    <dependencies> 
 
<dependency> 
 
     <groupId>com.datastax.spark</groupId> 
 
     <artifactId>spark-cassandra-connector_2.10</artifactId> 
 
     <version>1.2.0-rc3</version> 
 
</dependency> 
 

 
<dependency> 
 
     <groupId>com.datastax.spark</groupId> 
 
     <artifactId>spark-cassandra-connector-java_2.10</artifactId> 
 
     <version>1.2.0-rc3</version> 
 
</dependency> 
 
<dependency> 
 
     <groupId>org.apache.spark</groupId> 
 
     <artifactId>spark-core_2.10</artifactId> 
 
     <version>1.2.1</version> 
 
</dependency> 
 
<dependency> 
 
     <groupId>org.apache.spark</groupId> 
 
     <artifactId>spark-streaming_2.10</artifactId> 
 
     <version>1.2.1</version> 
 
</dependency> 
 
<dependency> 
 
     <groupId>org.apache.spark</groupId> 
 
     <artifactId>spark-sql_2.10</artifactId> 
 
     <version>1.2.0</version> 
 
</dependency> 
 
</dependencies> 
 
</project>