2013-04-16 3 views
1

Каждое сообщение, получаемое получателем, содержит полное состояние совокупного корня. Затем система может выполнять требуемые операции на основе этих данных. В моем сценарии предоставляется доступ на основе данных в сообщении, например. доступ в комнату A & B. Сообщение содержит весь набор предоставленного доступа. Эти сообщения могут выходить из строя, поскольку системы обмена сообщениями, такие как MSMQ, не гарантируют заказанную доставку.Как фильтровать сообщения с помощью Rebus?

Сценарий, в котором сообщение № 1 предоставляет доступ в комнату A & B, но сообщение №2 предоставляет только доступ в комнату A. Если они выходят из-под контроля, тогда доступ предоставляется в комнату A, а позднее в комнату A & B. Это не желаемый результат. Должен быть предоставлен только доступ в комнату А. Каждое сообщение содержит метку времени, которая устанавливается при публикации. Я хотел бы использовать эту метку времени для удаления старых сообщений, которые выходят из-под контроля, например. если сообщение # 2 поступает до сообщения # 1, тогда сообщение 1 # должно быть отброшено.

Я мог бы реализовать этот фильтр в каждом методе обработчика, но это было бы утомительно, поэтому я надеюсь, что у Rebus есть что-то по линии EAI Message Filters?

Я открыт для других вариантов/внедрений?

ответ

1

Я не уверен, что понимаю, почему в ваших сообщениях будет передаваться весь совокупный корень, но помимо этого есть простой способ, которым Rebus позволит вам упорядочить ваши сообщения.

Предлагаю вам позволить всем вашим сообщениям реализовать общий интерфейс, который фиксирует этот конкретный аспект, например. что-то вроде

public interface IHaveSequenceNumber 
{ 
    int SequenceNumber { get; } 
} 

Затем создать простой обработчик сообщений, который обрабатывает IHaveSequenceNumber и ликвидирует конвейер обработчика, когда старые сообщения встречаются, например,

public class WillDiscardOldMessages : IHandleMessages<IHaveSequenceNumber> 
{ 
    public void Handle(IHaveSequenceNumber messageWithSequenceNumber) 
    { 
     if (IsTooOld(messageWithSequenceNumber)) 
     { 
      MessageContext.GetCurrent().Abort(); //< make this the last handler 
     } 
    } 
} 

А потом - очень важно :) - вы убедитесь, что ваше сообщение фильтр всегда первый в трубопроводе обработчиков, когда начинается отправка:

Configure.With(...) 
    .Transport(...) 
    .SpecifyOrderOfHandlers(s => s.First<WillDiscardOldMessages>()) 
    .(...) 

Реализация метода IsTooOld() выше я оставлю вам, вероятно, будет просто, если в вашей конечной точке есть один рабочий поток, тогда как одновременная обработка этих вещей не является тривиальной.

Это имеет смысл?

+0

Я сохраняю в заголовке агрегированного сообщения вместо идентификатора, поэтому обработчикам не нужно извлекать данные в центральный репозиторий. Это обычная практика, не так ли? – Lybecker

+0

Думаю, я понял. У меня должно быть несколько обработчиков, но убедитесь, что первый сбрасывает _old_ сообщения, например. фильтр. Будет ли это работать, когда у меня есть несколько потребителей одного и того же сообщения (несколько обработчиков, которые подписываются на один и тот же тип сообщения)? Будет ли сообщение всегда передаваться одновременно каждому обработчику? – Lybecker

+1

Что значит «будет ли сообщение передаваться одновременно с каждым обработчиком»? Rebus получит последовательность обработчиков для входящего сообщения, закажите последовательность в соответствии с вашей спецификацией (например, '.First ()'), а затем выполните последовательность в порядке и отправьте сообщение каждому ... есть параллелизм при обработке одного сообщения - параллелизм достигается за счет наличия нескольких работников, где каждый может обрабатывать одно сообщение в данный момент времени. – mookid8000

Смежные вопросы