Я создал образец HttpSource и HttpSink. Мои application.properties выглядит такSpring cloud stream rabbitMQ потребительский раздел
Source
spring.cloud.stream.bindings.output.destination=greetings
spring.cloud.stream.bindings.output.partitionKeyExpression=payload
spring.cloud.stream.bindings.output.partitionCount=2
Sink
spring.cloud.stream.bindings.input.destination=greetings
spring.cloud.stream.bindings.input.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
HttpSource
@RestController
@EnableBinding(Source.class)
public class SampleSource {
@Autowired
private MessageChannel output;
@RequestMapping(path="/message",method=RequestMethod.POST)
public void sendMessage(@RequestBody String name){
output.send(MessageBuilder.withPayload("Hello, "+name).build());
}
}
HttpSink
@EnableBinding(Sink.class)
public class SampleSink {
@ServiceActivator(inputChannel=Sink.INPUT)
public void sendMessage(String name){
System.out.println(name);
}
}
Я развертывается оба эти приложения в Pivotal Cloud Foundry. У HttpSource есть конечная точка, которая при вызове отправляет сообщение в обмен Тема, называемый «приветствиями». Затем я масштабировал HttpSink, чтобы иметь 2 экземпляра. Это создало две очереди и привязки к обмену «приветствиями».
Теперь, когда я попал в конечную точку, я обнаружил, что сообщение отправляется в обе очереди. Я знаю это, потому что я привязал журнал и обнаружил, что сообщение печатается дважды.
Как я могу отправить сообщение только в одну из очередей?
EDIT:
вместо масштабирования HttpSink в Pivotal Cloud Foundry, я разворачивал HttpSink как два различных приложений. Но в application.properties они принадлежали к одной группе. Один из них имел instanceIndex = 0, а другой intanceIndex = 1.
Даже сейчас я получаю отдельную очередь со связыванием как «#» и двух потребителей в эту очередь.
Как я могу сделать разные экземпляры HttpSink создавать свою собственную очередь, а сообщение из HttpSource маршрутизируется на один из них на основе разделаKey?
Как выглядят привязки очередей в интерфейсе Rabbit? 2 очереди должны быть привязаны к обмену с помощью ключей маршрутизации, добавленных с помощью '-0',' -1', так как производитель устанавливает ключ маршрутизации исходящих сообщений. На стороне потребителя каждый экземпляр нуждается в разности 'instanceIndex'. –
Очереди имеют привязку «#» для обоих. У меня создалось впечатление, что если бы я масштабировал количество экземпляров с использованием Pivotal cloud Foundry, тогда он позаботился бы о экземпляреIndex – user3344591
Вам нужно установить группу потребителей для секционированных потребителей. Анонимные пользователи получают все сообщения через #. –