1

Я использую Rebus, и я хочу представить что-то вроде описанного в CQRS Journey в пункте «Избегайте обработки событий несколько раз», но я не могу понять это.Как настроить Rebus на темы, основанные на типах обработчиков

Я настроены использовать Головоломки SQL Server для транспорта и MongoDB для Подписки и саг. Маршрутизация сконфигурирована TypeBased и отображает все типы обработчиков команд в очередь, сконфигурированную в Транспорт.

var bus = Configure.With(new SimpleInjectorContainerAdapter(container)) 
      .Logging(l => l.Trace()) 
      .Transport(t => 
      { 
       t.UseSqlServer(connectionstring, "TestMessages", "messageQueueName"); 
      }) 
      .Routing(r => r.TypeBased() 
          .MapAssemblyOf<Assembly1.Commands.DoSomething>("messageQueueName") 
          .MapAssemblyOf<Assembly2.Commands.DoSomethingElse>("messageQueueName") 
          ) 
      .Sagas(s => s.StoreInMongoDb(db, (sagaType) => 
      { 
       return sagaType.Name; 
      })) 
      .Subscriptions(s => s.StoreInMongoDb(db, "Subscriptions")) 
      .Options(o => 
      { 
       o.SetNumberOfWorkers(1); 
       o.SetMaxParallelism(1); 
       o.EnableSagaAuditing().StoreInMongoDb(db, "Snapshots"); 
      }) 
      .Start(); 

Теперь я должен настроить в Головоломки таким образом, что, когда команда Опубликовать событие, это тиражируется как множество отдельных тем (виртуальных или физических очередей), как типы существующих абонентов.

Что-то вроде:

bus.Subscribe<Assembly1.EventHandler1>("Assembly1.EventHandler1Queue").Wait(); 
bus.Subscribe<Assembly1.EventHandler2>("Assembly1.EventHandler2Queue").Wait(); 
bus.Subscribe<Assembly2.EventHandler1>("Assembly2.EventHandler1Queue").Wait(); 

Спасибо за помощь.

+0

вы помечено вопроса с 'лазури-ServiceBus-topics' и вы ссылаетесь статья о том, как использовать (среди прочего) Azure Service Bus тема для реализации паба/sub .... но у вас есть con понял, что ваша шина использует SQL Server в качестве очереди сообщений - это намеренно? – mookid8000

+0

Я просто добавил этот тег, потому что я не могу вставить «тему/темы» в качестве нового тега. – ilcorvo

ответ

1

Есть несколько вещей, которые кажутся вам непонятными.

Но, я думаю, ваш основной вопрос заключается в том, как быть уверенным, что каждое сообщение обрабатывается один раз только каждым подписчиком.

Ответ довольно прост: иметь отдельную конечную точку для каждого абонента - это означает, что каждый абонент будет иметь свою собственную входную очередь, с которой обрабатываются сообщения, и с которой будет возвращено сообщение об ошибке.

У вас может быть столько или меньше обработчиков в каждом подписчике, сколько захотите. Все совместимые обработчики будут выполняться для каждого входящего сообщения.

С Ребус, каждый вызов к Configure.With(...).(...).Start() даст вам отдельную конечную точку - так в вашем случае, я предлагаю вам обернуть создание абонентской конечной точки в методе, который затем можно ссылаться, как это:

var event1Subscriber = CreateSubscriber("subscriber_event1"); 
event1Subscriber.Subscribe<Event1>().Wait(); 

var event2Subscriber = CreateSubscriber("subscriber_event2"); 
event2Subscriber.Subscribe<Event2>().Wait(); 

var event3Subscriber = CreateSubscriber("subscriber_event3");  
event3Subscriber.Subscribe<Event3>().Wait(); 

// ... 

где CreateSubscriber бы тогда что-то вроде этого:

public IBus CreateSubscriber(string queueName) 
{ 
    return Configure.With(GetContainerAdapter()) 
     .Transport(t => t.UseMsmq(queueName)) 
     .Start();   
} 
+0

Вы точно поняли, несмотря на запутанный вопрос. Проблема в том, что я не могу понять, как это сделать с помощью Rebus. Не могли бы вы добавить пример кода? – ilcorvo

+0

То, что мне не хватает, на самом деле заключается в том, как создать «отдельную конечную точку для каждого абонента». – ilcorvo

+0

Я добавил код, чтобы показать, что я имею в виду :) – mookid8000

Смежные вопросы