0

Я пытаюсь подключиться к Cassandra, используя разъем Cassandra для Spark, искровое задание работает на EMR.Spark Job Cant Подключиться к Cassandra

Ниже мой код

public class SparkCassandraDriver implements Serializable { 
    private transient SparkConf conf; 

    private SparkCassandraDriver(SparkConf conf) { 
     this.conf = conf; 

    } 

    private void run() { 
     JavaSparkContext sc = new JavaSparkContext(conf); 
    // generateData(sc); 
     connectToCassandra(sc); 


     sc.stop(); 
    } 

    private void connectToCassandra(JavaSparkContext sc) { 
     CassandraConnector connector = CassandraConnector.apply(sc.getConf()); 
     System.out.println("Conencted is " + sc.getConf().get("spark.cassandra.connection.host")); 

     Session session = connector.openSession(); 
     session.execute("USE dmp"); 


     ResultSet rs = session.execute(
        "SELECT XYZ FROM XYZ"); 

     Iterator<Row> it = rs.iterator(); 

     while(it.hasNext()){ 

      System.out.println("it issssssss " +it.next()); 
     } 
     session.close(); 

    } 

    public static void main(String[] args) { 


     SparkConf conf = new SparkConf(); 
     conf.setAppName("Spark-Cassandra Integration"); 
     conf.setMaster("yarn-cluster"); 
     conf.set("spark.cassandra.connection.host", "PUBLIC IP"); 
     conf.set("spark.cassandra.connection.rpc.port", "9042"); 
     conf.set("spark.cassandra.connection.timeout_ms", "40000"); 
     conf.set("spark.cassandra.read.timeout_ms", "200000"); 


     conf.set("spark.cassandra.auth.username", "username"); 
     conf.set("spark.cassandra.auth.password", "password"); 


     SparkCassandraDriver app = new SparkCassandraDriver(conf); 
     app.run(); 
    } 

} 

ПОМ, который я использую

<dependency> 
     <groupId>org.apache-extras.cassandra-jdbc</groupId> 
     <artifactId>cassandra-jdbc</artifactId> 
     <version>1.2.5</version> 
    </dependency> 
    <dependency> 
    <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>1.2.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.2.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.10</artifactId> 
     <version>1.2.1</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.10</artifactId> 
     <version>1.2.1</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector-java_2.10</artifactId> 
     <version>1.2.1</version> 
    </dependency> 

Но Im получаю следующее сообщение об ошибке.

16/02/22 16:47:20 ERROR ApplicationMaster: User class threw exception: java.io.IOException: Failed to open native connection to Cassandra at {54.166.142.199}:9042 
    java.io.IOException: Failed to open native connection to Cassandra at {54.166.142.199}:9042 
     at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraCon 
    nector.scala:176) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162) 
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) 
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) 
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73) 
at com.mobi.vserv.driver.SparkCassandraDriver.connectToCassandra(SparkCassandraDriver.java:55) 
    at com.mobi.vserv.driver.SparkCassandraDriver.run(SparkCassandraDriver.java:45) 
    at com.mobi.vserv.driver.SparkCassandraDriver.main(SparkCassandraDriver.java:90) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) 
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /54.166.142.199:9042 (com.d 
    atastax.driver.core.TransportException: [/54.166.142.199:9042] Cannot connect)) 
    at      com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223) 
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78) 
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1230) 
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:333) 
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:169) 

Пожалуйста, помогите мне в решении этого:

Благодаря

+0

Довольно ясно он не может подключиться к 9024 на свой IP-адрес. Что вы сделали для того, чтобы определить, действительно, что 9042 уже установлен и установлен? – apesa

+0

Да, я могу telnet, и мой местный искровой код может подключиться к кассандре. –

ответ

1

Наконец я нашел решение, я добавил группу Безопасности (ЭМИ-рабов) в группе Cassandra безопасности, который не был там.

И это решило проблему.

С уважением,

Рахул

Смежные вопросы