2016-08-16 2 views
0

Я создал образец HttpSource и HttpSink. Мои application.properties выглядит такSpring cloud stream rabbitMQ потребительский раздел

Source 
spring.cloud.stream.bindings.output.destination=greetings 
spring.cloud.stream.bindings.output.partitionKeyExpression=payload 
spring.cloud.stream.bindings.output.partitionCount=2 


Sink 
spring.cloud.stream.bindings.input.destination=greetings 
spring.cloud.stream.bindings.input.partitioned=true 
spring.cloud.stream.instanceCount=2 
spring.cloud.stream.instanceIndex=0 

HttpSource

@RestController 
@EnableBinding(Source.class) 
public class SampleSource { 

@Autowired 
private MessageChannel output; 

@RequestMapping(path="/message",method=RequestMethod.POST) 
public void sendMessage(@RequestBody String name){ 
    output.send(MessageBuilder.withPayload("Hello, "+name).build()); 
} 
} 

HttpSink

@EnableBinding(Sink.class) 
public class SampleSink { 

@ServiceActivator(inputChannel=Sink.INPUT) 
public void sendMessage(String name){ 
    System.out.println(name); 
} 
} 

Я развертывается оба эти приложения в Pivotal Cloud Foundry. У HttpSource есть конечная точка, которая при вызове отправляет сообщение в обмен Тема, называемый «приветствиями». Затем я масштабировал HttpSink, чтобы иметь 2 экземпляра. Это создало две очереди и привязки к обмену «приветствиями».

Теперь, когда я попал в конечную точку, я обнаружил, что сообщение отправляется в обе очереди. Я знаю это, потому что я привязал журнал и обнаружил, что сообщение печатается дважды.

Как я могу отправить сообщение только в одну из очередей?

EDIT:

вместо масштабирования HttpSink в Pivotal Cloud Foundry, я разворачивал HttpSink как два различных приложений. Но в application.properties они принадлежали к одной группе. Один из них имел instanceIndex = 0, а другой intanceIndex = 1.

Даже сейчас я получаю отдельную очередь со связыванием как «#» и двух потребителей в эту очередь.

Как я могу сделать разные экземпляры HttpSink создавать свою собственную очередь, а сообщение из HttpSource маршрутизируется на один из них на основе разделаKey?

+0

Как выглядят привязки очередей в интерфейсе Rabbit? 2 очереди должны быть привязаны к обмену с помощью ключей маршрутизации, добавленных с помощью '-0',' -1', так как производитель устанавливает ключ маршрутизации исходящих сообщений. На стороне потребителя каждый экземпляр нуждается в разности 'instanceIndex'. –

+0

Очереди имеют привязку «#» для обоих. У меня создалось впечатление, что если бы я масштабировал количество экземпляров с использованием Pivotal cloud Foundry, тогда он позаботился бы о экземпляреIndex – user3344591

+0

Вам нужно установить группу потребителей для секционированных потребителей. Анонимные пользователи получают все сообщения через #. –

ответ

0

я не замечал раньше - вам не хватает consumer. от собственности:

spring.cloud.stream.bindings.input.consumer.partitioned=true 

См the documentation.

Вы используете IDE для создания файла свойств? Моя версия STS (3.8.1) помешала это как проблему при копировании в ваших свойствах.

Я только что проверил тест, и очередь правильно названа и привязана к обмену с помощью правильного ключа.

EDIT

Чтобы получить его, чтобы правильно масштабировать на ФКП, я также должен был закомментировать instanceIndex свойство (предположительно потому, что она перекрывает ППР свойство среды) в противном случае я получил 2 потребителя на -0 очереди. С удалением этого свойства я получил 2 очереди, как ожидалось.

+0

Также см. Мое редактирование. –

+0

Вы правы. Я пропустил «потребитель». в HttpSink. Также я пропустил «продюсера». в HttpSource. И закомментировал indexInstance тоже – user3344591

Смежные вопросы