1

Я прочитал много статей, но не нашел, как настроить Producer, у которого есть тема с несколькими разделами (тема, созданная во время выполнения) с использованием Spring Integration Kafka.Как настроить тему производителя kafka с использованием нескольких разделов с использованием Spring интеграции kafka

Я использую github link, чтобы понять и настроить kafka для моего приложения.

Просьба предоставить решение для того же

еще одна вещь, Что является использование kafkaHeader.messageKey.

Update: Ниже тестовый код, который я разрабатываю
`общественный класс OutboundTests { @Test общественных недействительный MyTest() бросает исключение {

KafkaProducerContext<String, SMSNotificationVO> ctx = new KafkaProducerContext<String, SMSNotificationVO>(); 
    ProducerMetadata<String, SMSNotificationVO> meta = new ProducerMetadata<String, SMSNotificationVO>("sms_topic"); 
    meta.setValueClassType(SMSNotificationVO.class); 
    meta.setKeyClassType(String.class); 
    meta.setValueEncoder(new SMSObjectSerializer()); 

    ProducerMetadata<String, SMSNotificationVO> meta1 = new ProducerMetadata<String, SMSNotificationVO>("sms_topic_10"); 
    meta1.setValueClassType(SMSNotificationVO.class); 
    meta1.setKeyClassType(String.class); 
    meta1.setValueEncoder(new SMSObjectSerializer()); 
    Properties producerProperties = new Properties(); 
    producerProperties.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, "1");//"message.send.max.retries" 
    ProducerFactoryBean<String, SMSNotificationVO> producer = new ProducerFactoryBean<String, SMSNotificationVO>(meta,"192.168.1.147:9092"); 
    ProducerFactoryBean<String, SMSNotificationVO> producer1 = new ProducerFactoryBean<String, SMSNotificationVO>(meta1,"192.168.1.147:9092", producerProperties); 


    ProducerConfiguration<String, SMSNotificationVO> p = new ProducerConfiguration<String, SMSNotificationVO>(meta, producer.getObject()); 
    ProducerConfiguration<String, SMSNotificationVO> p1 = new ProducerConfiguration<String, SMSNotificationVO>(meta1, producer1.getObject()); 

    Map<String, ProducerConfiguration<String, SMSNotificationVO>> map = new HashMap<String, ProducerConfiguration<String, SMSNotificationVO>>(); 
    map.put("sms_topic", p); 
    map.put("sms_topic_10", p1); 

    ctx.setProducerConfigurations(map); 

    // java code to send message 
    Map<String, String> params = new HashMap<>(); 
    params.put("Topic", "TEST MULTIPLE TOPIC : this is sms_topic_1"); 
    List<String> smsRecipients = new ArrayList<>(); 
    smsRecipients.add("9953225211"); 


    String message = "This Test message from junit topic sms_topic_1"; 
    Integer messageType =1; 

    SMSNotificationVO vo = new SMSNotificationVO(); 
    vo.setMessage(message); 
    vo.setMessageType(messageType); 
    vo.setParams(params); 
    vo.setSmsRecipients(smsRecipients); 


    try{ 
    KafkaProducerMessageHandler<String, SMSNotificationVO> handler = new KafkaProducerMessageHandler<String, SMSNotificationVO>(ctx); 
    handler.setMessageKeyExpression(new LiteralExpression("sms_topic_10")); 
    handler.handleMessage(MessageBuilder.withPayload(vo) 
      //if i remove the below comments I will get null pointer exception 
      //.setHeader(KafkaHeaders.MESSAGE_KEY, "some key") 
      //.setHeader(KafkaHeaders.PARTITION_ID, "1") 
       .setHeader(KafkaHeaders.TOPIC, "sms_topic_10") 
       .build()); 

    }catch(Exception e){ 
     e.printStackTrace(); 
     Assert.fail("Kafka SMS Producer fail"); 
    } 

} 
}` 

Я получаю Исключение нулевого указателя. Ниже журнал Упоминание:

`org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springf[email protected]7b94089b]; nested exception is java.lang.NullPointerException 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) 
at dd.kafka.producer.OutboundTests.mytest(OutboundTests.java:85) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
at java.lang.reflect.Method.invoke(Unknown Source) 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 
Caused by: java.lang.NullPointerException at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130) 
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:127) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:127) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53) 
at kafka.producer.Producer.send(Producer.scala:77) 
at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
at org.springframework.integration.kafka.support.ProducerConfiguration.send(ProducerConfiguration.java:70) 
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:197) 
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:81) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 
... 24 more` 

Благодаря

ответ

1

Тема не связанные с Producer. Перед отправкой или получением сообщения в/из него вы должны предварительно настроить тему.

Тот факт, что вы можете создать тему во время выполнения с использованием AdminUtils.createTopic() над интерфейсом высокого уровня интеграции Spring, предназначен для удобства тестирования и его следует избегать для производства.

The KafkaHeaders.messageKey полностью отображается в стандартный Кафка messageKey да и может быть использован для определения цели partition в topic.

Таким образом, вы определенно должны идти к официальной документации Apache Кафка, чтобы понять, как создать тему с «множественным раздела» и что делать с Producer стороны относительно partition и messageKey параметров.

+0

Спасибо за ответ Там несколько запросов в моем уме, который является следующим: 1. messagekey будет использовать для определенного раздела в теме (некоторые, как хеширование), то, как выбрать этот ключ, как я хочу, чтобы выбрать ключ случайным образом, чтобы каждый раз, когда сообщение публиковало его, он переходит в другой раздел, чтобы использовать параллелизм. 2. В ссылке укажите «message-key-expression» и 'partition-id-expression' на каких основаниях это значение. 3. Если выражение message-key-expression упоминается, то только я могу использовать KafkaHeaders.messageKey. С благодарностью – rahul

+0

Если вы укажете конкретный 'partition' при операции отправки, чем' messageKey' будет обходить алгоритмом определения раздела. Эти выражения используются в точности для работы Кафки «Продюсер». См. Его API. В самом деле, вместо 'message-key-expression' можно использовать' KafkaHeaders.messageKey'. –

+0

Но если вы видите мой код, 'setHeader (KafkaHeaders.MESSAGE_KEY,« некоторый ключ »)' дал исключение нулевого указателя – rahul

Смежные вопросы