2017-02-23 47 views
1

Я пытаюсь выполнить обработку потока и CEP на потоке сообщений Kafka. Для этого я выбрал Apache Ignite, чтобы сначала реализовать прототип. Однако я не могу подключиться к очереди:Проблемы с подключением Apache Ignite Kafka

Использование kafka_2.11-0.10.1.0 апач воспламеняться-ткань-1.8.0-бен

бен/zookeeper-server-start.sh конфигурации/Zookeeper. свойства бен/kafka-server-start.sh Config/server.properties бен/kafka-topics.sh --create --zookeeper локальный: +2181 --replication-фактор 1 --partitions 1 --topic тест

Кафка работает правильно, я тестировал его с потребителем. Затем я начинаю загораться, затем я запускаю следующую команду в командной строке весны загрузки.

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>(); 

    Ignition.setClientMode(true); 

    Ignite ignite = Ignition.start(); 

    Properties settings = new Properties(); 
    // Set a few key parameters 
    settings.put("bootstrap.servers", "localhost:9092"); 
    settings.put("group.id", "test"); 
    settings.put("zookeeper.connect", "localhost:2181"); 
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    // Create an instance of StreamsConfig from the Properties instance 
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings); 

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache"); 

    try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) { 
     // allow overwriting cache data 
     stmr.allowOverwrite(true); 

     kafkaStreamer.setIgnite(ignite); 
     kafkaStreamer.setStreamer(stmr); 

     // set the topic 
     kafkaStreamer.setTopic("test"); 

     // set the number of threads to process Kafka streams 
     kafkaStreamer.setThreads(1); 

     // set Kafka consumer configurations 
     kafkaStreamer.setConsumerConfig(config); 

     // set decoders 
     StringDecoder keyDecoder = new StringDecoder(null); 
     StringDecoder valueDecoder = new StringDecoder(null); 

     kafkaStreamer.setKeyDecoder(keyDecoder); 
     kafkaStreamer.setValueDecoder(valueDecoder); 

     kafkaStreamer.start(); 
    } finally { 
     kafkaStreamer.stop(); 
    } 

При запуске приложения я получаю

2017-02-23 10: 25: 23,409 WARN 1388 --- [главная] kafka.utils.VerifiableProperties: bootstrap.servers собственности не действует 2017 -02-23 10: 25: 23.410 INFO 1388 --- [main] kafka.utils.VerifiableProperties: свойство group.id переопределено для проверки 2017-02-23 10: 25: 23.410 WARN 1388 --- [main] kafka.utils.VerifiableProperties: Property key.deserializer недействителен 2017-02-23 10: 25: 23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties: ключ свойства.сервератор недействителен 2017-0 2-23 10: 25: 23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties: Значение свойства.сервизатор недействителен 2017-02-23 10: 25: 23.411 WARN 1388 --- [основной] kafka. utils.VerifiableProperties: value.serializer свойство не действует 2017-02-23 10: 25: 23,411 INFO 1388 --- [главная] kafka.utils.VerifiableProperties: zookeeper.connect Property переопределяется для локального хоста: 2181

Затем

2017-02-23 10: 25: 24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils $: получение метаданных темы с идентификатором корреляции 0 для тем [Set (test)] from брокера [BrokerEndPoint (0, user.local, 9092)] не удалось

java.nio.channels.ClosedChannelException: null at kafka.network.BlockingChannel.send (BlockingChannel.scala: 110) ~ [kafka_2.11-0.10.0.1.jar: na] at kafka.producer.SyncProducer.liftedTree1 $ 1 (SyncProducer.scala: 80) ~ [kafka_2.11-0.10.0.1.jar: na] at kafka.producer.SyncProducer.kafka $ производитель $ SyncProducer $$ doSend (SyncProducer.scala: 79) ~ [kafka_2.11 -0.10.0.1.jar: na] at kafka.producer.SyncProducer.send (SyncProducer.scala: 124) ~ [kafka_2.11-0.10.0.1.jar: na] at kafka.client.ClientUtils $ .fetchTopicMetadata (ClientUtils.scala: 59) [kafka_2.11-0.10.0.1.jar: na] at kafka.client.ClientUtils $ .fetchTopicMetadata (ClientUtils.scala: 94) [kafka_2.11-0.10.0.1.jar: na] на kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread.doWork (ConsumerFetcherManag er.scala: 66) [kafka_2.11-0.10.0.1.jar: na] at kafka.utils.ShutdownableThread.run (ShutdownableThread.scala: 63) [kafka_2.11-0.10.0.1.jar: na]

И чтение из очереди не работает. У кого-нибудь есть идея, как это исправить?

Edit: Если я комментирую содержимое окончательно преградить то следующее сообщение об ошибке приходит

[2m2017-02-27 16:42:27.780 [0,39 м. [31mERROR [0,39 м [35 м29946 [0,39 м [2 м --- [0,39 м [2 м [пул-3-нить-1] [0,39 м [36 м [0,39 м [2 м: [0; 39m Сообщение игнорируется из-за ошибки [msg = MessageAndMetadata (test, 0, Message (magic = 1, attributes = 0, CreateTime = -1, crc = 2558126716, key = java.nio.HeapByteBuffer [pos = 0 lim = 1 cap = 79], полезная нагрузка = java.nio.HeapByteBuffer [pos = 0 lim = 74 cap = 74]), 15941704, kafka.serializer.StringDecoder @ 74a96647, kafka.serializer.StringDecoder @ 42849d34, -1, CreateTime)]

java.lang.IllegalStateException: стример данных был закрыт. at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy (DataStreamerImpl.java:401) ~ [ignite-core-1.8.0.jar: 1.8.0] at org.apache.ignite.internal. processor.datastreamer.DataStreamerImpl.addDataInternal (DataStreamerImpl.java:613) ~ [ignite-core-1.8.0.jar: 1.8.0] at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData (DataStreamerImpl. java: 667) ~ [ignite-core-1.8.0.jar: 1.8.0] at org.apache.ignite.stream.kafka.KafkaStreamer $ 1.run (KafkaStreamer.java:180) ~ [ignite-kafka-1.8 .0.jar: 1.8.0] at java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) [na: 1.8.0_111] at java.util.concurrent.FutureTask.run (FutureTask. java: 266) [na: 1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.j ava: 1142) [na: 1.8.0_111] at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) [na: 1.8.0_111] at java.lang.Thread.run (Thread. java: 745) [na: 1.8.0_111]

Спасибо!

ответ

1

Я думаю, что это происходит потому, что KafkaStreamer закрывается сразу после его начала (kafkaStreamer.stop() звонок в finally блок). kafkaStreamer.start() не является синхронным, он просто выталкивает потоки, чтобы потреблять из Кафки и выходы.

+0

Спасибо за ответ, если я прокомментирую содержимое блока «finally». Я получаю сообщение об ошибке, которое я опубликовал выше (в редакции). – razvan

+0

Это потому, что вы закрываете 'IgniteDataStreamer'. Избавьтесь от блока try-with-resources, и он будет работать. –

+0

Привет, я не получил приложение для запуска еще (потому что я не уверен, как читать из кеша), но, по крайней мере, я больше не получаю ошибок. Поэтому я буду отмечать этот вопрос, как ответ, и открыть новый для остальных. Еще раз спасибо, и, возможно, вы также можете посмотреть https://stackoverflow.com/questions/42562766/how-to-properly-read-from-ignite-cache – razvan