2016-03-30 1 views
1

Привет Я хочу иметь ребус с двумя очередь, это, как я сконфигурировать его прямо сейчасКак я могу получить ребус с двумя очередями?

container.Register<BuiltinHandlerActivator>(() => 
     { 
      var activator = new BuiltinHandlerActivator();    
      var rebusConnection = configuration["Rebus:ConnectionString"]; 
      activator.Register(() => ActivatorUtilities.CreateInstance<CampaignsHandler>(container)); 
      activator.Register(() => ActivatorUtilities.CreateInstance<MessageHandler>(container)); 
      activator.Register(() => ActivatorUtilities.CreateInstance<DeliveryStatusHandler>(container)); 
      Log.Logger = container.GetInstance<ILogger>(); 
      Configure.With(activator) 
       .Transport(t => t.UseSqlServer(rebusConnection, "RebusQueue", "BackgroundJobs")) 
       .Logging(l => l.Serilog(Log.Logger)) 
       .Routing(r => 
       { 
        var typeBasedRouting = r.TypeBased(); 
        typeBasedRouting.MapAssemblyOf<MessageSent>("BackgroundJobs");      

       }) 
       .Options(o => o.SetNumberOfWorkers(6)) 
       .Options(o => o.SetMaxParallelism(6)) 
       .Options(b => b.SimpleRetryStrategy(maxDeliveryAttempts: 1)) 
       .Start();    
      activator.Bus.Subscribe<MessageSent>().Wait(); 

      return activator; 
     }, Lifestyle.Singleton); 

мои MessageHandler обработанные объекты SendMessage и превратить их в messageSent OBJ с Infomation статус и мой DeliveryStatusHandler обрабатываются messageSent OBJ обновить моя база данных. эта проблема: у меня есть только одна очередь (одна таблица базы данных «RebusQueue»). Поэтому только после того, как все объекты sendMessage будут выполнены, база данных начнет обновляться.

Я хочу обработать свой messageSent obj сразу после того, как объекты sendMessage обработаны. Предположим, у меня должно быть две очереди (две таблицы)? но как настроить ребус?

Я смотрел на вопрос Multiple input queues in one rebus process Мы сталкиваемся с теми же проблемами?

это то, что я делаю в Messagehandler.cs

class MessageHandler : IHandleMessages<SendMessage> 
{ 
    private readonly IBus _bus; 
    private MessagingRuntime _messagingRuntime; 
    private IRepository _repository; 
    private ISnapshotRepository _snapshotRepository; 
    private readonly ITemplateEngineProvider _templateEngineProvider; 
    private IUrlShortener _urlShortener; 
    private ILogger _logger; 

    public MessageHandler(MessagingRuntime messagingRuntime, IRepository repository, ISnapshotRepository snapshotRepository, ITemplateEngineProvider templateEngineProvider, IUrlShortener urlShortener, IBus bus, ILogger logger) 
    { 
     _messagingRuntime = messagingRuntime; 
     _repository = repository; 
     _snapshotRepository = snapshotRepository; 
     _templateEngineProvider = templateEngineProvider; 
     _urlShortener = urlShortener; 
     _bus = bus; 
     _logger = logger; 
    } 

    public async Task Handle(SendMessage message) 
    { 
     var template = _snapshotRepository.Query<Template>(message.DateCreated).Where(x => x.Id == message.TemplateId).FirstOrDefault(); 
     var subscriber = _snapshotRepository.Query<Subscriber>(message.DateCreated).Where(x => x.Id == message.SubscriberId).FirstOrDefault(); 
     var templateEngine = GetTemplateEngine(message.CampaignId, message.Tags); 


     var @event = new MessageSent 
     { 
      Id = SequentialGuid.Instance.NewGuid(), 
      DeliveryStatusId = message.DeliveryStatusId, 
      SubscriberId = subscriber.Id, 
      DateCreated = message.DateCreated     
     }; 

     try 
     { 
      _messagingRuntime.ProcessSendRequest(new[] { subscriber }, templateEngine, template); 
      @event.IsDeliverySuccessful = true; 
     } 
     catch (MessagingException ex) 
     { 
      _logger.Error(ex.ToString()); 
      @event.IsDeliverySuccessful = false; 
     } 

     await _bus.Publish(@event); 
    } 
} 

это второй обработчик, который будет (обновить базу данных)

class DeliveryStatusHandler : IHandleMessages<MessageSent> 
{ 
    private ILogger _logger; 
    private IRepository _repository; 
    private IRepository2 _repository2; 
    private ISnapshotRepository _snapshotRepository; 

    public DeliveryStatusHandler(IRepository repository, IRepository2 repository2,ISnapshotRepository snapshotRepository, ILogger logger) 
    { 
     _repository = repository; 
     _repository2 = repository2; 
     _snapshotRepository = snapshotRepository; 
     _logger = logger; 
    } 

    public Task Handle(MessageSent @event) 
    { 
     var deliveryStatus = _repository2.Find<DeliveryStatus>(@event.DeliveryStatusId); 
     if (deliveryStatus == null) 
     { 
      _logger.Error("Delivery Status does not exist"); 
      return Task.FromResult<object>(null); 
     } 

     var deliveryStatusItem = _repository2.Find<DeliveryStatusItem>(@event.Id); 
     var subscriber = _snapshotRepository.Query<Subscriber>(@event.DateCreated).Where(x => x.Id == @event.SubscriberId).FirstOrDefault(); 
     if (deliveryStatusItem == null) 
     { 
      deliveryStatusItem = new DeliveryStatusItem(); 
      deliveryStatusItem.Id = @event.Id; 
      deliveryStatusItem.Email = subscriber.Email; 
      deliveryStatusItem.PhoneNumber = subscriber.PhoneNumber; 
      deliveryStatusItem.Name = subscriber.Name; 

     } 
     deliveryStatusItem.DeliveryStatusId = @event.DeliveryStatusId; 
     deliveryStatusItem.IsDeliverySuccessful = @event.IsDeliverySuccessful; 
     _repository2.Save<DeliveryStatusItem>(deliveryStatusItem); 
     return Task.FromResult<object>(null); 
    } 
} 

ответ

2

Если бы я тебя, я бы начать два экземпляра ребус , каждый из которых имеет свою собственную входную очередь (вполне нормально делать это в том же процессе, если вам не нужно обновлять их независимо друг от друга).

два экземпляра будет:

  1. Первый один с обработчиком вы показали мне
  2. второй, который будет подписываться на MessageSent и обрабатывать его соответствующим образом

Таким образом, вы можете легко настроить количество потоков и параметры параллельности для каждого экземпляра, что позволяет вам дросселировать/масштабировать обработку сообщений.

+0

Есть ли какой-нибудь образец проекта на вашем сайте, используя два экземпляра ребуса? ура –

+0

Один ребус может иметь только одну очередь, правильно? –

+0

https://github.com/rebus-org/Rebus/wiki/Rebus-configuration-section Мне нужно изменить app.config в моем проекте? –