2015-04-28 5 views
2

Я использую искру с Кассандрой и я хочу, чтобы записать данные в мою Кассандре таблицу:Не удался написать заявления

CREATE TABLE IF NOT EXISTS MyTable(
user TEXT, 
date TIMESTAMP, 
event TEXT, 
PRIMARY KEY((user),date , event) 
); 

Но я получил эту ошибку:

java.io.IOException: Failed to write statements to KeySpace.MyTable. 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:56) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
    Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
15/04/28 17:57:47 WARN TaskSetManager: Lost task 13.2 in stage 1.0 (TID 43, dev2-cim.aid.fr): TaskKilled (killed intentionally) 

и предупреждения в моем Cassandra лог файл:

WARN [SharedPool-Worker-2] 2015-04-28 16:45:21,219 BatchStatement.java:243 - Batch of prepared statements for [*********] is of size 8158, exceeding specified threshold of 5120 by 3038 

после внесения некоторых поисковых запросов в Интернете, я нашел эту ссылку, который объясняет, как он фиксирует S AME проблема: http://progexc.blogspot.fr/2015/03/write-batch-size-error-spark-cassandra.html

Итак, теперь я изменил мой алгоритм искрового добавить:

conf.set("spark.cassandra.output.batch.grouping.key", "None") 
conf.set("spark.cassandra.output.batch.size.rows", "10") 
conf.set("spark.cassandra.output.batch.size.bytes", "2048") 

это значение удалить предупреждающее сообщение я получил в Кассандре Лог, но я до сих пор имеет то же ошибку: Failed to write statements ,

В моей искры журнал не в состоянии я нашел эту ошибку:

Failed to execute: 
    [email protected] 
    com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be empty 
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:103) 
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) 
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) 
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) 
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) 
    at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) 
    at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) 
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) 
+1

в трассировке стека, оставленную неудачей, вы видите ** причина **? Предупреждение, вероятно, не имеет к этому никакого отношения. – maasg

+0

Я только что редактировал свое сообщение, чтобы показать трассировку стека. Но я не вижу причины в трассировке стека. –

+0

Я нашел эту причину в файле журнала искры: InvalidQueryException: ключ не может быть пустым –

ответ

2

У меня была такая проблема, что я нашел решение в примечаниях выше (от Amine CHERIFI и maasg).

Столбец, соответствующий первому ключу, не всегда был заполнен надлежащим значением (в моем случае с пустой строкой «»).

Это вызвало ошибку

ERROR QueryExecutor: Failed to execute: \ 
[email protected] \ 
com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be empty 

Решение должно было обеспечить строку по умолчанию не пусто.

0

Я решил проблему путем перезагрузки моего кластера как будет, как узлы. Следующее - это то, что я пробовал. Я также сталкиваюсь с тем же вопросом. Я пробовал все варианты выше, чем вы упомянули в блоге, но не успели. Мои данные размером 174gb. Всего 174 Гб данных, Мой кластер имеет 3 узла, каждый узел имеет 16 ядер и 48 ГБ оперативной памяти. Я попытался lode 174gb в один выстрел, в то время у меня такая же проблема. После этого я выделил 174 gb в 109 файлах каждый 1,6 Гб и попытался lode, на этот раз я снова столкнулся с такой же проблемой после загрузки 100 файлов (каждый 1,6 gb). Я думал, что проблема связана с данными в 101 файле. Я попытался загрузить первый файл и попытался перенести первый файл в новую таблицу и попытался объединить новые данные в новую таблицу, но все эти случаи имеют проблему. Тогда я думаю, что это проблема с кластером cassandra и перезапустили кластер и узлы. Тогда проблема исчезла.

0

Добавить точку останова в «com/datastax/spark/connector/writer/AsyncExecutor.scala: 45», вы можете получить реальное исключение.

В моем случае, replication_factor моего пространства ключей 2, но у меня есть только один живой.

3

Если вы работаете в режиме кладки пряжи, не забудьте проверить весь лог на пряжу, используя yarn logs -applicationId <appId> --appOwner <appOwner>. Это дало мне больше причин для отказа, чем бревна на пряжу WebUI

Caused by: com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive) 
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:50) 
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37) 
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266) 
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246) 
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89) 
... 11 more 

Решение состоит в установке spark.cassandra.output.consistency.level=ANY в вашей искровым defaults.conf

Смежные вопросы