2012-12-21 5 views
4

Мне любопытно, смогут ли потребители MassTransit Peek() в очереди MSMQ до фактического извлечения сообщений.Смерть потребителя Masstransit - Peek()

Что шаги/процесс:

1) Msg отправлен в очередь

2) Потребители получает это и нужно сделать обновление БД - занимает около 5 секунд

3) Потребитель должен выполнять второй раунд обновлений, если первый работал.

Моя проблема в том, как я могу обработать случай, если при первом обновлении БД сообщение остается в очереди (т. Е. Проблема с сетью и не может попасть в db).

В настоящее время, как только он читает Сообщ из очереди она удаляет его, а затем он просто исчезает, если обновление БД не в состоянии ..

Кроме того, как я могу справиться с сбоем питания - я имею в виду, если на полпути через «задание» потребителем, что бы это ни было (обновление db или что-то еще), и власть умирает и т. д., как я могу повторно запустить процесс на msg в очереди? Допустим, что работа (в моем текущем экземпляре так или иначе) вызывает новую строку в таблице. Я имею в виду, что я могу написать код, чтобы сначала проверить, существует ли строка, и если она затем отбросит сообщение, а если нет, запустите задачу, но как я могу заставить ее повторно запустить весь процесс в первую очередь?

Я прочитал, что я мог Peek() очереди, а затем запустить задачу, а затем прочитать очередь сообщений msg для реального и удалить ее, но я не могу для жизни меня выяснить, работает ли это с массовым транзитом ... бит потерян ...

Кроме того, я знаю, что Masstransit имеет .RetryLater, но тогда я использую это в процессе? Это Initially ->When ->Then ->.RetryLater в саге ??

Все указатели будут appeciated

сердечным приветом Робин

EDIT

PS: Я использую сагу ....

Define(() => 
      { 
       RemoveWhen(saga => saga.CurrentState == Completed); 

       Initially(
        When(NewAC) 
         .Then((saga, message) => saga.ProcessPSM(message), 
          InCaseOf<Exception>() 
           .TransitionTo(Problem)         
           ) 
         .Then((saga, message) => saga.PostProcessPSM()) 
         .Complete() 
        ); 
       During(Problem, 
        When(Waiting) 
           // NOTE: THIS DOES NOT WORK!!!! 
         .RetryLater() 
        ); 
       }); 

RetryLater сгенерирует ошибку : «Сообщение не может быть принято существующей сагой»

Я не уверен, как еще я могу получить доступ к «RetryLater».

ответ

7

MassTransit абстрагирует понятия базовых очередей. Таким образом, Peek не является решением, , но у него есть другие средства для повторной отправки сообщений. Если вы просто заинтересованы в обработке ошибок и сбоев, достаточны следующие механизмы.

По умолчанию, если потребитель бросает исключение сообщение будет повторена N раз:

  • где N настроен на шине и по умолчанию 5.Он может быть изменен в автобусе initilisation с использованием SetDefaultRetryLimit на ServiceBusConfigurator
  • Где повторено означает, что сообщение будет добавлено в конец очереди

Если вы хотели бы тонкоуровневый подход к обработке ошибок вы можете реализовать Контекст-Потребитель, уловить восстанавливаемые или временные исключения и вручную вызвать RetryLater. Нет предела тому, сколько раз это можно сделать, как я понимаю.

public class RetryConsumer : Consumes<AwesomeMessage>.Context 
{ 

    public void Consume(IConsumeContext<AwesomeMessage> message) 
    { 
     try 
     { 
      Console.WriteLine("This is Attempt " + message.RetryCount); 
      // Do Something 
     } 
     catch (SomeTransientException e) 
     { 
      message.RetryLater(); 
     } 
    } 
} 
+0

Thanks mate. Я дам попробовать попробовать сегодня. Мой вопрос несколько остается, однако, в одном случае. Что, если половина пути через власть умрет? Msg больше не находится в очереди. Так оно потеряно да? Поскольку потребитель никогда не попадает в 'message.retrylater()' ... Есть ли способ обработки зависающего процесса или полного сбоя, чтобы повторить попытку позже в этих случаях? Или мне что-то не хватает? Благодаря! –

+0

Вы всегда можете использовать транзакционную очередь с MSMQ, которая гарантирует, что сообщения не будут потеряны в сценарии сбоя питания. ? tx = true в URI доставит вас туда. –

+0

@ChrisPatterson Да, я уже использую это. Я не думал, что это охватывает сценарий чтения сообщения. Я думал, что это просто гарантирует, что если вы добавите «в очередь», чтобы гарантировать, что он попадет в очередь, а затем, поскольку восстанавливается, установлено true, persisited. Но также устанавливает ли настройка транзакции, когда я читаю сообщение из очереди, что процесс, который его читает, завершает свою работу? Я думаю, что нет или ...? –

Смежные вопросы