1

Моего искрового потоковое приложения имеет следующие строки:Cassandra разъема Datastax искры неудача громко и невнятно

Я пытаюсь написать серию объектов для таблиц в Кассандре (а также в текстовый файл). У меня есть следующий код:

val rmqReceiver = new RMQReceiver(queueIp, "vehicle-data") 
val statusMessageStream = myStreamingContext.receiverStream[String](rmqReceiver) 
val vsStream = customReceiverStream.map(jsonToVehicleStatus) 

customReceiverStream.foreachRDD((vs: RDD[String])=> vs.saveAsTextFile("/var/log")) 
vsStream.foreachRDD((vs: RDD[Vehicle_Status])=> vs.saveToCassandra("vehicle_data","vehicles",AllColumns)) 
vsStream.foreachRDD((vs: RDD[Vehicle_Status])=> vs.saveToCassandra("vehicle_data","vehicle_locations",AllColumns)) 

Я пробовал кучу вариантов, но вот что происходит:

Текстовый файл будет написана (иногда)

Первый звонок «saveToCassandra», успешно сохраняет запись Второй вызов вызывает исключение, указанное ниже?

Я чувствую, что мне не хватает чего-то очевидного, я просто не вижу, что это такое.

java.io.IOException: Не удалось подготовить заявление INSERT INTO "vehicle_data" "vehicle_locations" ("метки времени", "vehicle_id", "Lon", "geobin", "LAT") VALUES (:.» timestamp ",:" vehicle_id ",:" lon ",:" geobin ",:" lat "): все хосты (ы), попробованные для запроса, не удались (ни один хост не был пробован) at com.datastax.spark.connector.writer .TableWriter.com $ datastax $ spark $ connector $ writer $ TableWriter $$ prepareStatement (TableWriter.scala: 96) at com.datastax.spark.connector.writer.TableWriter $$ anonfun $ write $ 1.apply (TableWriter.scala: 122) at com.datastax.spark.connector.writer.TableWriter $$ anonfun $ write $ 1.apply (TableWriter.scala: 120) at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1. применять (Ca ssandraConnector.scala: 100) at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply (CassandraConnector.scala: 99) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse (CassandraConnector.scala: 151) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo (CassandraConnector.scala: 99) at com.datastax.spark.connector.writer.TableWriter.write (TableWriter.scala: 120) at com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply (RDDFunctions.scala: 36) at com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply (RDDFunctions.scala: 36) at org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 61) at org.apache.spark.scheduler.Task.r un (Task.scala: 64) at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 203) по адресу java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) at java.lang.Thread.run (Thread.java:745) Вызвано: com.datastax.driver.core.exceptions.NoHostAvailableException: Все хосты (ы), попробованные для запроса, не удалось (ни один хост не был пробован) at com.datastax.driver.core.exceptions.NoHostAvailableException.copy (NoHostAvailableException.java:84) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException (DefaultResultSetFuture.java:289) at com.datastax.driver.core.AbstractSession.prepare (AbstractSession.java:91) на sun.reflect.GeneratedMethodAccessor57.invoke (Unknown Source) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:497) на ком .datastax.spark.connector.cql.SessionProxy.invoke (SessionProxy.scala: 33) at com.sun.proxy. $ Proxy11.prepare (Неизвестный источник) at com.datastax.spark.connector.cql.PreparedStatementCache $. prepareStatement (PreparedStatementCache.scala: 45) at com.datastax.spark.connector.cql.SessionProxy.invoke (SessionProxy.scala: 28) at com.sun.proxy. $ Proxy11.prepare (Неизвестный источник) на com.datastax.spark. connector.writer.TableWriter.com $ datastax $ spark $ connector $ writer $ TableWriter $$ prepareStatement (TableWriter.scala: 92) ... 15 еще Вызвано: com.datastax.driver.core.exceptions.NoHostAvailableException: Все хост (ы), попробованный для запроса, не прошел (ни один хост не был пробован) at com.datastax.driver.core.RequestHandler.sendRequest (RequestHandler.java:107) at com.datastax.driver.core.SessionManager.execute (SessionManager. java: 538) at com.datastax.driver.core.SessionManager.prepareAsync (SessionManager.java:124) at com.datastax.driver.core.AbstractSession.prepa reAsync (AbstractSession.java:103) в com.datastax.driver.core.AbstractSession.prepare (AbstractSession.java:89) ... 24 более

кроме того, это исключение:

ОШИБКА QueryExecutor: Не удалось выполнить: [email protected] com.datastax.driver.core.exceptions.NoHostAvailableException: все хосты, пытавшиеся выполнить запрос, не удалось (попробовали:/52. { MYIP}: 9042 (com.datastax.driver.core.TransportException: [/52.{MYIP}:9042] Соединение закрыто)) по адресу com.datastax.driver.core.RequestHa ndler.sendRequest (RequestHandler.java:107) at com.datastax.driver.core.RequestHandler $ 1.run (RequestHandler.java:210) at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) в java.lang.Thread.run (Thread.java:745)

Я подключил это до хорошо -rpovisioned cluster, и я получаю эти ошибки, пытаясь превысить 6 записей/секунду (по 3 на каждую таблицу)

+0

Это, вероятно, происходит потому, что все ваши узлы cassandra были помечены. Какую версию разъема spark-cassandra вы используете, и вы видите что-нибудь еще в журналах о том, что хосты отмечены или запрошены тайм-аут? –

+0

ах да, мне было интересно узнать об этом дополнительном исключении. Я обновил этот вопрос – lostinplace

+0

я бегу: libraryDependencies + = "com.datastax.spark" % "искрового Кассандры connector_2.10" % "1.2.0-RC3" – lostinplace

ответ

0

Если вы используете это на своем локальном компьютере, то проверьте коэффициент репликации вашего ключа, сделайте его 1 и попробуй еще раз. Это разрешилось для меня

Смежные вопросы