Привет Я хочу иметь ребус с двумя очередь, это, как я сконфигурировать его прямо сейчасКак я могу получить ребус с двумя очередями?
container.Register<BuiltinHandlerActivator>(() =>
{
var activator = new BuiltinHandlerActivator();
var rebusConnection = configuration["Rebus:ConnectionString"];
activator.Register(() => ActivatorUtilities.CreateInstance<CampaignsHandler>(container));
activator.Register(() => ActivatorUtilities.CreateInstance<MessageHandler>(container));
activator.Register(() => ActivatorUtilities.CreateInstance<DeliveryStatusHandler>(container));
Log.Logger = container.GetInstance<ILogger>();
Configure.With(activator)
.Transport(t => t.UseSqlServer(rebusConnection, "RebusQueue", "BackgroundJobs"))
.Logging(l => l.Serilog(Log.Logger))
.Routing(r =>
{
var typeBasedRouting = r.TypeBased();
typeBasedRouting.MapAssemblyOf<MessageSent>("BackgroundJobs");
})
.Options(o => o.SetNumberOfWorkers(6))
.Options(o => o.SetMaxParallelism(6))
.Options(b => b.SimpleRetryStrategy(maxDeliveryAttempts: 1))
.Start();
activator.Bus.Subscribe<MessageSent>().Wait();
return activator;
}, Lifestyle.Singleton);
мои MessageHandler обработанные объекты SendMessage и превратить их в messageSent OBJ с Infomation статус и мой DeliveryStatusHandler обрабатываются messageSent OBJ обновить моя база данных. эта проблема: у меня есть только одна очередь (одна таблица базы данных «RebusQueue»). Поэтому только после того, как все объекты sendMessage будут выполнены, база данных начнет обновляться.
Я хочу обработать свой messageSent obj сразу после того, как объекты sendMessage обработаны. Предположим, у меня должно быть две очереди (две таблицы)? но как настроить ребус?
Я смотрел на вопрос Multiple input queues in one rebus process Мы сталкиваемся с теми же проблемами?
это то, что я делаю в Messagehandler.cs
class MessageHandler : IHandleMessages<SendMessage>
{
private readonly IBus _bus;
private MessagingRuntime _messagingRuntime;
private IRepository _repository;
private ISnapshotRepository _snapshotRepository;
private readonly ITemplateEngineProvider _templateEngineProvider;
private IUrlShortener _urlShortener;
private ILogger _logger;
public MessageHandler(MessagingRuntime messagingRuntime, IRepository repository, ISnapshotRepository snapshotRepository, ITemplateEngineProvider templateEngineProvider, IUrlShortener urlShortener, IBus bus, ILogger logger)
{
_messagingRuntime = messagingRuntime;
_repository = repository;
_snapshotRepository = snapshotRepository;
_templateEngineProvider = templateEngineProvider;
_urlShortener = urlShortener;
_bus = bus;
_logger = logger;
}
public async Task Handle(SendMessage message)
{
var template = _snapshotRepository.Query<Template>(message.DateCreated).Where(x => x.Id == message.TemplateId).FirstOrDefault();
var subscriber = _snapshotRepository.Query<Subscriber>(message.DateCreated).Where(x => x.Id == message.SubscriberId).FirstOrDefault();
var templateEngine = GetTemplateEngine(message.CampaignId, message.Tags);
var @event = new MessageSent
{
Id = SequentialGuid.Instance.NewGuid(),
DeliveryStatusId = message.DeliveryStatusId,
SubscriberId = subscriber.Id,
DateCreated = message.DateCreated
};
try
{
_messagingRuntime.ProcessSendRequest(new[] { subscriber }, templateEngine, template);
@event.IsDeliverySuccessful = true;
}
catch (MessagingException ex)
{
_logger.Error(ex.ToString());
@event.IsDeliverySuccessful = false;
}
await _bus.Publish(@event);
}
}
это второй обработчик, который будет (обновить базу данных)
class DeliveryStatusHandler : IHandleMessages<MessageSent>
{
private ILogger _logger;
private IRepository _repository;
private IRepository2 _repository2;
private ISnapshotRepository _snapshotRepository;
public DeliveryStatusHandler(IRepository repository, IRepository2 repository2,ISnapshotRepository snapshotRepository, ILogger logger)
{
_repository = repository;
_repository2 = repository2;
_snapshotRepository = snapshotRepository;
_logger = logger;
}
public Task Handle(MessageSent @event)
{
var deliveryStatus = _repository2.Find<DeliveryStatus>(@event.DeliveryStatusId);
if (deliveryStatus == null)
{
_logger.Error("Delivery Status does not exist");
return Task.FromResult<object>(null);
}
var deliveryStatusItem = _repository2.Find<DeliveryStatusItem>(@event.Id);
var subscriber = _snapshotRepository.Query<Subscriber>(@event.DateCreated).Where(x => x.Id == @event.SubscriberId).FirstOrDefault();
if (deliveryStatusItem == null)
{
deliveryStatusItem = new DeliveryStatusItem();
deliveryStatusItem.Id = @event.Id;
deliveryStatusItem.Email = subscriber.Email;
deliveryStatusItem.PhoneNumber = subscriber.PhoneNumber;
deliveryStatusItem.Name = subscriber.Name;
}
deliveryStatusItem.DeliveryStatusId = @event.DeliveryStatusId;
deliveryStatusItem.IsDeliverySuccessful = @event.IsDeliverySuccessful;
_repository2.Save<DeliveryStatusItem>(deliveryStatusItem);
return Task.FromResult<object>(null);
}
}
Есть ли какой-нибудь образец проекта на вашем сайте, используя два экземпляра ребуса? ура –
Один ребус может иметь только одну очередь, правильно? –
https://github.com/rebus-org/Rebus/wiki/Rebus-configuration-section Мне нужно изменить app.config в моем проекте? –