2016-08-15 10 views
1

Я пытаюсь использовать spring.cloud.stream.kafka.binder.headers для переноса настраиваемого заголовка, который я устанавливаю на основе предыдущего question.spring.cloud.stream.kafka.binder.headers не работает как ожидалось

Я прочитал в documentation где ...

spring.cloud.stream.kafka.binder.headers 
The list of custom headers that will be transported by the binder. 

Default: empty. 

, кажется, предполагает, что установка списка (разделенные запятой?) Вызовет специальный заголовок, чтобы транспортироваться в Message<>, но заголовок теряется как только напишет кафка.

Мой аннотацию создает заголовок как часть вызова к MessagingGateway:

@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name")) 
public interface RedemptionGateway { 
    ... 
} 

наблюдаю, что заголовок правильно создается в первом preSend отладки:

2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput', message: GenericMessage [[email protected][orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={orderId=create, id=5dccea6f-266e-82b9-54c6-57ec441a26ac, timestamp=1471288144882}] - {applicationSystemCode=x, clientIP=0:0:0:0:0:0:0:1, clusterId=Cluster-Id-NA, containerId=Container-Id-NA, correlationId=UNDEFINED, domainName=defaultDomain, hostName=Host-NA, messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef, userId=anonymous, webAnalyticsCorrelationId=|} 

Но на следующий preSend, заголовок отсутствует:

2016-08-15 15:09:05 kafka-binder- DEBUG DirectChannel:430 - preSend on channel 'enrichingInput', message: GenericMessage [[email protected][orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={kafka_offset=10, orderId=create, kafka_messageKey=null, kafka_topic=received, kafka_partitionId=0, kafka_nextOffset=11, contentType=application/x-java-object;type=x.TrivialRedemption}] - {} 

Мои объекты содержат:

 

    spring.cloud.stream.kafka.binder.headers=orderId 

ответ

3

Какую версию весеннего облачного потока вы используете?

Я просто написал быстрый тест, и он работал просто отлично ...

spring.cloud.stream.kafka.binder.headers=bar 
spring.cloud.stream.bindings.output.destination=foobar 
spring.cloud.stream.bindings.input.destination=foobar 
spring.cloud.stream.bindings.input.group=foo 

App:

package com.example; 

import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.cloud.stream.annotation.EnableBinding; 
import org.springframework.cloud.stream.messaging.Processor; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.context.annotation.Bean; 
import org.springframework.integration.support.MessageBuilder; 
import org.springframework.messaging.Message; 
import org.springframework.messaging.MessageHandler; 
import org.springframework.messaging.MessagingException; 

@SpringBootApplication 
@EnableBinding(Processor.class) 
public class So38961697Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So38961697Application.class, args); 
     Foo foo = context.getBean(Foo.class); 
     foo.start(); 
     foo.send(); 
     Thread.sleep(30000); 
     context.close(); 
    } 

    @Bean 
    public Foo foo() { 
     return new Foo(); 
    } 

    private static class Foo { 

     @Autowired 
     Processor processor; 

     public void send() { 
      Message<?> m = MessageBuilder.withPayload("foo") 
        .setHeader("bar", "baz") 
        .build(); 
      processor.output().send(m); 
     } 

     public void start() { 
      this.processor.input().subscribe(new MessageHandler() { 

       @Override 
       public void handleMessage(Message<?> m) throws MessagingException { 
        System.out.println(m); 
       } 

      }); 
     } 

    } 

} 

Результат:

GenericMessage [payload=foo, headers={bar=baz, kafka_offset=0, kafka_messageKey=null, kafka_topic=foobar, kafka_partitionId=0, kafka_nextOffset=1, contentType=text/plain}] 

Весь проект is here.

Edit: Смотрите комментарий, обновление до 1.0.2.RELEASE решить проблему

EDIT

Добавить группу, чтобы обеспечить потребитель потребляет с самого раннего сообщения. См. Комментарий ниже.

+0

Я исследовал ваши зависимости и заметил, что вы используете 1.0.2.RELEASE, тогда как я использовал 1.0.0.RELEASE. Модернизация моего проекта для использования 1.0.2.RELEASE решила проблему. В следующий раз я гарантирую, что использую последнюю версию. –

+0

Также обратите внимание, что ваша ссылка на документацию в вопросе указывает на текущий моментальный снимок, который может быть впереди текущей версии. Правильная ссылка на текущий выпуск (1.0.2) - [здесь] (http://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_kafka_binder_properties); он всегда укажет на последнюю выпущенную версию документов. –

+0

Не работает в 1.2.0.RELEASE – Savash

Смежные вопросы