2015-09-15 4 views
2

Я реализовал Spring-integration-kafka 1.0.0M в проект Spring MVC в прошлом году, используя конфигурацию XML, и это было очень просто сделать. Поскольку Spring, похоже, перемещается в направлении конфигурации Java (а не в XML), я хотел бы перейти от использования XML-конфигурации spring-integration-kafka к конфигурации Java, которая представляет собой последнюю версию spring-integration-kafka (1.2. 1). Проблема в том, что на самом деле не так много завершенных примеров того, как это делается в Интернете, и примеры, которые я нашел, выглядят устаревшими из того, что я могу сказать. Конфигурация у меня есть довольно просто:Как конвертировать из spring-integration-kafka 1.0.0M (XML config) в spring-integration-kafka 1.2.1 (Java config)?

<bean id="kafkaStringEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder" /> 

<bean id="customObjectMapper" class="ad.content.api.utils.ObjectMapperFactory" factory-method="getMapper" /> 

<int:channel id="kafkaConversionRequest" /> 

<bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> 
    <property name="properties"> 
     <props> 
      <prop key="message.send.max.retries">${kafka.retries}</prop> 
     </props> 
    </property> 
</bean> 

<int-kafka:producer-context id="kafkaWidgetProducerContext" producer-properties="producerProperties"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration 
      broker-list="${kafka.broker}" key-class-type="java.lang.String" 
      key-encoder="kafkaStringEncoder" value-class-type="java.lang.String" 
      value-encoder="kafkaStringEncoder" topic="widget-.*" 
      compression-codec="default" async="true" /> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 

<!-- declare spring integration gateway for kafka --> 
<int:gateway service-interface="ad.content.api.models.kafka.KafkaGateway" default-reply-timeout="2000"> 
    <int:method name="publishConversion" request-channel="kafkaConversionRequest" /> 
</int:gateway> 

<int:chain input-channel="kafkaConversionRequest" output-channel="kafkaToJson"> 
    <int:header-enricher> 
     <int:header name="topic" value="widget-conversion" /> 
    </int:header-enricher> 
</int:chain> 

<int:object-to-json-transformer input-channel="kafkaToJson" output-channel="kafkaOutbound" object-mapper="customObjectMapper" /> 

<int-kafka:outbound-channel-adapter id="kafkaOutbound" kafka-producer-context-ref="kafkaWidgetProducerContext" /> 

Вот что я могу понять, до сих пор:

// gateway 
@MessagingGateway(defaultReplyTimeout="2000") 
public interface KafkaGateway { 
    @Gateway(requestChannel="kafkaConversionRequest", [email protected](name="topic", value="widget-conversion")) 
    void publishConversion(Conversion conversion); 
} 

// create channel 
@Bean(name="kafkaConversionRequest") 
public MessageChannel getConversionRequest() { 
    return new DirectChannel(); 
} 

@Bean 
public KafkaProducerMessageHandler getHandler() throws Exception { 
    return new KafkaProducerMessageHandler(getContext()); 
} 

@Bean 
public KafkaProducerContext getContext() throws Exception { 
    KafkaProducerContext context = new KafkaProducerContext(); 
    context.setProducerConfigurations(Collections.singletonMap("config", getConfiguration())); 
    return context; 
} 

@Bean 
public ProducerConfiguration<String, String> getConfiguration() throws Exception { 
    return new ProducerConfiguration<String, String>(getMetaData(), getProducer()); 
} 

@Bean 
@Transformer(inputChannel="kafkaToJson", outputChannel="kafkaOutbound") 
public ObjectToJsonTransformer getJsonTransformer() { 
    return new ObjectToJsonTransformer(); 
} 

@Bean 
public ProducerMetadata<String, String> getMetaData() { 
    StringSerializer serializer = new StringSerializer(); 
    return new ProducerMetadata<String, String>("widget-.*", String.class, String.class, serializer, serializer); 
} 

@Bean 
public Producer<String, String> getProducer() throws Exception { 
    return new ProducerFactoryBean<String, String>(getMetaData(), "dev.kafka-broker01:9092").getObject(); 
} 

ответ

0

Там выдающаяся pull request для образца Кафки, которые могли бы помочь вам.

Смежные вопросы