0

У меня есть кластер cassandra из 3 узлов и через мой модульный тест в Java, я сначала создаю пространство ключей, а затем создаю семейство столбцов в этом пространстве ключей. Иногда тесты модулей проходят, но случайным образом я продолжаю получать следующую ошибку. Я использую последний драйвер datastax 2.1.4 java и версию cassandra в 2.1.0.InvalidQueryException при попытке создать семейство столбцов в Cassandra через модуль

com.symc.edp.database.nosql.NoSQLPersistenceException: com.datastax.driver.core.exceptions.InvalidQueryException: Cannot add column family 'testmaxcolumnstable' to non existing keyspace 'testmaxcolumnskeyspace'. 
    at com.symc.edp.database.nosql.cassandra.CassandraCQLTableEditor.createTable(CassandraCQLTableEditor.java:67) 
    at com.symc.edp.database.nosql.cassandra.TestCassandraWideRowPerformance.testWideRowInserts(TestCassandraWideRowPerformance.java:74) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) 
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Cannot add column family 'testmaxcolumnstable' to non existing keyspace 'testmaxcolumnskeyspace'. 
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35) 
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) 
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) 
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) 
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) 
    at com.symc.edp.database.nosql.cassandra.CassandraCQLTableEditor.createTable(CassandraCQLTableEditor.java:65) 
    ... 6 more 
Caused by: com.datastax.driver.core.exceptions.InvalidConfigurationInQueryException: Cannot add column family 'testmaxcolumnstable' to non existing keyspace 'testmaxcolumnskeyspace'. 
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:104) 
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) 
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:249) 
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:421) 
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:697) 
    at com.datastax.shaded.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) 
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) 
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:296) 
    at com.datastax.shaded.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) 
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) 
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:296) 
    at com.datastax.shaded.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) 
    at com.datastax.shaded.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) 
    at com.datastax.shaded.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) 
    at com.datastax.shaded.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) 
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) 
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) 
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:268) 
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:255) 
    at com.datastax.shaded.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) 
    at com.datastax.shaded.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) 
    at com.datastax.shaded.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) 
    at com.datastax.shaded.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) 
    at com.datastax.shaded.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) 
    at com.datastax.shaded.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) 
    at com.datastax.shaded.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

И в system.log файл Кассандры я вижу следующее исключение:

ERROR [SharedPool-Worker-1] 2015-01-28 15:08:24,286 ErrorMessage.java:218 - Unexpected exception during request 
java.io.IOException: Connection reset by peer 
     at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_05] 
     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_05] 
     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_05] 
     at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_05] 
     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:375) ~[na:1.8.0_05] 
     at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) ~[netty-all-4.0.20.Final.jar:4.0.20.Final] 
     at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:878) ~[netty-all-4.0.20.Final.jar:4.0.20.Final] 
     at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) ~[netty-all-4.0.20.Final.jar:4.0.20.Final] 
     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:114) ~[netty-all-4.0.20.Final.jar:4.0.20.Final] 
     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:507) ~[netty-all-4.0.20.Final.jar:4.0.20.Final] 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:464) ~[netty-all-4.0.20.Final.jar:4.0.20.Final] 
     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:378) ~[netty-all-4.0.20.Final.jar:4.0.20.Final] 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:350) ~[netty-all-4.0.20.Final.jar:4.0.20.Final] 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ~[netty-all-4.0.20.Final.jar:4.0.20.Final] 
     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05] 
INFO [SharedPool-Worker-1] 2015-01-28 15:13:01,051 MigrationManager.java:229 - Create new Keyspace: KSMetaData{name=testmaxcolumnskeyspace, strategyClass=SimpleStrategy, strategyOptions={replication_factor=1}, cfMetaData={}, durableWrites=true, [email protected]} 
INFO [MigrationStage:1] 2015-01-28 15:13:01,058 ColumnFamilyStore.java:856 - Enqueuing flush of schema_keyspaces: 512 (0%) on-heap, 0 (0%) off-heap 
INFO [MemtableFlushWriter:7] 2015-01-28 15:13:01,059 Memtable.java:326 - Writing [email protected](138 serialized bytes, 3 ops, 0%/0% of on/off-heap limit) 
INFO [MemtableFlushWriter:7] 2015-01-28 15:13:01,077 Memtable.java:360 - Completed flushing /usr/share/apache-cassandra-2.1.0/bin/../data/data/system/schema_keyspaces-b0f2235744583cdb9631c43e59ce3676/system-schema_keyspaces-ka-103-Data.db (175 bytes) for commitlog position ReplayPosition(segmentId=1422485457803, position=1181) 

Кроме того, я проверил через DevCenter, то пространство ключи не получило созданы.

+2

Я ответил на это в списке рассылки, прежде чем замечать вопрос SO. Если вы собираетесь перекрестно размещать сообщения, укажите ссылку в одном или другом, чтобы разговор мог случиться в одном месте. –

+0

Несомненно, сохранит это в виду для будущих сообщений. – Sau

ответ

0

Не видя своего кода, я думаю, вам нужно поспать между созданием пространства ключей и попыткой создания в нем таблиц. Вероятно, вам нужно предоставить определение пространства ключей на пару секунд для распространения на все узлы вашего кластера, прежде чем пытаться его использовать.

+0

Проблема в том, что сами ключи не создавались. Он пытается сбросить соединение путем исключения peer при попытке создать пространство ключей. – Sau

+0

Это может быть связано с сбоем клиента при попытке создать таблицу в пространстве ключей, которое еще не было распространено, поэтому Cassandra видит, что соединение сбрасывается, когда клиент умирает. У вас есть сон между созданием пространства ключей и созданием таблиц? –

0

Это поможет, как указано, в вашем классе конфигурации. Мы используем ClassPathCQLDataSet для выпуска наших заявлений и создания keyspace при одинаковом ходу (link to ClassPathCqlDataSet documentation, обратите внимание, что логическое значение в позиции 2 и 3 говорит ему о создании и удалении пространства ключей). db.cql - файл, в котором мы храним таблицы создания таблиц. Вот наша конфигурация, которая может вам помочь:

package some.package; 

import org.cassandraunit.CQLDataLoader; 
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; 
import org.cassandraunit.utils.EmbeddedCassandraServerHelper; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.DisposableBean; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.annotation.Profile; 
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; 
import org.springframework.core.io.ClassPathResource; 

import com.datastax.driver.core.Cluster; 
import com.datastax.driver.core.Session; 

@Configuration 
@Profile({"test"}) 
public class TestCassandraConfig implements DisposableBean { 

    private static final Logger LOGGER = LoggerFactory.getLogger(TestCassandraConfig.class); 

    private final String CQL = "db.cql"; 

    @Value("${cassandra.contact_points:localhost}") 
    private String contact_points; 
    @Value("${cassandra.port:9142}") 
    private int port; 
    @Value("${cassandra.keyspace:test}") 
    private String keyspace; 

    private static Cluster cluster; 
    private static Session session; 
    private static SessionProxy sessionProxy; 

    @Bean 
    public Session session() throws Exception { 
     if (session == null) { 
      initialize(); 
     } 

     return sessionProxy; 
    } 

    @Bean 
    public TestApplicationContext testApplicationContext() { 
     return new TestApplicationContext(); 
    } 

    private void initialize() throws Exception { 
     LOGGER.info("Starting embedded cassandra server"); 
     EmbeddedCassandraServerHelper.startEmbeddedCassandra("another-cassandra.yaml"); 

     LOGGER.info("Connect to embedded db"); 
     cluster = Cluster.builder().addContactPoints(contact_points).withPort(port).build(); 
     session = cluster.connect(); 

     LOGGER.info("Initialize keyspace"); 
     final CQLDataLoader cqlDataLoader = new CQLDataLoader(session); 
     cqlDataLoader.load(new ClassPathCQLDataSet(CQL, false, true, keyspace)); 
    } 

    @Override 
    public void destroy() throws Exception { 
     if (cluster != null) { 
      cluster.close(); 
      cluster = null; 
     } 
    } 
} 
Смежные вопросы