2015-03-12 3 views
0

У меня есть система обмена сообщениями, использующая Azure ServiceBus, но я использую Nimbus поверх этого. У меня есть конечная точка, которая отправляет команду другой конечной точке, и в какой-то момент класс обработчика с другой стороны подбирает ее, поэтому все работает нормально.Сообщение повторено, когда операция занимает время

Когда операция требует времени, примерно более 20 секунд или около того, обработчик получает «другой» вызов с тем же сообщением. Похоже, что Nimbus повторяет сообщение, которое уже обрабатывается другим (даже тем же) экземпляром обработчика, я не вижу никаких исключений, и я мог бы легко воспроизвести его со следующим обработчиком:

public class Synchronizer : IHandleCommand<RequestSynchronization> 
{ 
    public async Task Handle(RequestSynchronization synchronizeInfo) 
    { 
     Console.WriteLine("Received Synchronization"); 

     await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process 

     Console.WriteLine("Got through first timeout"); 

     await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process 

     Console.WriteLine("Got through second timeout"); 
    } 
} 

Мой вопрос: Как отключить это поведение? Я рад, что транзакция занимает много времени, так как это тяжелый процесс, который я отключил от своего веб-сайта, и это в первую очередь касается этой архитектуры.

Другими словами, я ожидал, что сообщение не будет подхвачено другим обработчиком, когда оно его подхватит, и обработает его, если только не существует исключения, и сообщение возвращается в очередь и в итоге получает для повторной попытки.

Любые идеи, как это сделать? Что-нибудь мне не хватает?

+0

Полезное - таймаут блокировки по умолчанию должен составлять 1 минуту. Я не знаком с нимбом (и с их сайта, это займет немного копания), но как вы читаете из очереди? Вы можете либо увеличить тайм-аут блокировки там, либо возобновить блокировку через 'BrokeredMessage.RenewLock()'. –

ответ

2

По умолчанию ASD/WSB даст вам блокировку сообщений 30 секунд. Идея состоит в том, что вы выталкиваете BrokeredMessage с головы очереди, но должны либо .Complete(), либо .Abandon() это сообщение в течение таймаута блокировки.

Если вы этого не сделаете, служебная шина предполагает, что вы потерпели крах или провалились иначе, и оно вернет это сообщение в очередь, подлежащую повторной обработке.

У вас есть несколько вариантов:

1) осуществлять ILongRunningHandler на обработчике. Nimbus обратит внимание на оставшееся время блокировки и автоматически возобновит блокировку сообщений. Внимание. Максимальное время блокировки сообщений, поддерживаемое ASB/WSB, составляет пять минут, независимо от того, сколько раз вы обновляете, поэтому, если ваш обработчик занимает больше времени, то вам может понадобиться опция №2.

public class Synchronizer : IHandleCommand<RequestSynchronization>, ILongRunningTask 
{ 
    public async Task Handle(RequestSynchronization synchronizeInfo) 
    { 
     Console.WriteLine("Received Synchronization"); 

     await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process 

     Console.WriteLine("Got through first timeout"); 

     await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process 

     Console.WriteLine("Got through second timeout"); 
    } 
} 

2) В обработчике, вызовите Task.Run (() => SomeService (yourMessage)) и обратно. Если вы это сделаете, будьте осторожны с охватом жизненных циклов зависимостей, если ваш обработчик возьмет любой. Если вам нужен IFoo, возьмите зависимость от Func> (или эквивалента в зависимости от вашего контейнера) и решите это в своей задаче обработки.

public class Synchronizer : IHandleCommand<RequestSynchronization> 
{ 
    private readonly Func<Owned<IFoo>> fooFunc; 

    public Synchronizer(Func<Owned<IFoo>> fooFunc) 
    { 
     _fooFunc = fooFunc; 
    } 

    public async Task Handle(RequestSynchronization synchronizeInfo) 
    { 
     // don't await! 
     Task.Run(() => { 
      using (var foo = _fooFunc()) 
      { 
       Console.WriteLine("Received Synchronization"); 

       await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process 

       Console.WriteLine("Got through first timeout"); 

       await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process 

       Console.WriteLine("Got through second timeout"); 
      } 

     }); 
    } 
} 
Смежные вопросы