2016-11-19 4 views
1

Tree StructureРебус/Содержание маршрутизации на основе

Как вы видите, есть Headquarter в качестве корневого узла и некоторые отрасли, как дочерние узлы. Существует сообщение о данных типа, и я хочу, чтобы опубликовать сообщение на основании содержания данных объекта, например:

if (data.value == xxxx) publish(data, Br1, Br2) 
else if (data.value == yyyy) publish(data, Br3, Br4) 
else if (data.value == zzzz) publis(data, Br5, Br6) 

Это какое-то образом настроить версию паба/суб шаблона. Но я хочу опубликовать сообщение типа Data только некоторым специальным подписчикам на основе содержимого сообщения.

Есть ли решение в Rebus?

ответ

0
static void Main() 
    { 

     using (var activator = new BuiltinHandlerActivator()) 
     { 
      activator.Handle<Packet>(async (bus, packet) => 
      { 
       string subscriber = "subscriberA"; 
       await bus.Advanced.TransportMessage.Forward(subscriber); 
      }); 

      Configure.With(activator) 
       .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn)) 
       .Transport(t => t.UseMsmq("router")) 
       .Start(); 

      for (int i = 0; i < 10; i++) 
      { 
       activator.Bus.SendLocal(
        new Packet() 
        { 
         ID = i, 
         Content = "content" + i.ToString(), 
         Sent = false, 
        }).Wait(); 
      } 
     } 

     Console.ReadLine(); 
    } 
+0

Вы получаете 'NullReferenceException', потому что вы слишком рано устанавливаете' BuiltinHandlerActivator' - если вы 'Console.ReadLine();' сразу после существования 'for'-loop, вы получите еще одну ошибку, указав, что очереди 'абонентА' не существует – mookid8000

+0

Глупый я. Благодарю. :) –

+0

да :) вы можете прочитать об этом [здесь, на странице вики об операциях] (https://github.com/rebus-org/Rebus/wiki/Transactions) – mookid8000

0

Там в нескольких решений в Rebus :)

Для вашего сценария, я вижу два пути ее решения: 1) Использование пользовательских тем, или 2) Реализовать реальный маршрутизатор на основе контента.

Если это имеет смысл, вы можете смоделировать этот сценарий pub/sub, используя темы, используя API-интерфейс Rebus, чтобы заботиться о маршрутизации. Это имеет смысл, если вы можете сказать, что ваши сообщения данных относятся к какой-то категории, которую подписчики могут подписывать.

По сравнению с «настоящими» системами массового обслуживания на основе тем, например, RabbitMQ, API тем в Rebus очень груб. Это не позволяет использовать подстановочные знаки (*) или что-то подобное - темы - это просто строки, которые вы можете подписаться, а затем использовать в качестве паба/подканала для передачи события нескольким подписчикам.

Вы можете использовать его, как это в конце абонента:

await bus.Advanced.Topics.Subscribe("department_a"); 

, а затем в конце издателя:

var data = new Data(...); 

await bus.Advanced.Topics.Publish("department_a", data); 

Если это не режет его, вы можете вставить «реальный «основанный на контенте маршрутизатор, который является просто конечной точкой, которой вы пользуетесь await bus.Send(eachDataMessage), которая, в свою очередь, направляет сообщение соответствующим подписчикам.

Это может быть сделано на двух уровнях с помощью Rebus, в зависимости от ваших требований. Если этого достаточно, чтобы посмотреть на заголовках сообщения, вы должны реализовать его в качестве «транспортного сообщения экспедитора», потому что скачет десериализацию и обеспечивает хороший API для просто пересылок сообщений:

Configure.With(...) 
    .Transport(t => t.UseMsmq("router")) 
    .Routing(r => { 
     r.AddTransportMessageForwarder(async transportMessage => { 
      var headers = transportMessage.Headers; 

      var subscribers = Decide(headers); 

      return ForwardAction.ForwardTo(subscribers); 
     }); 
    }) 
    .Start(); 

Если вам нужно смотреть на фактическое сообщение, вы должны просто реализовать обычный обработчик сообщений, а затем использовать автобус, чтобы переслать сообщение:

public class Router : IHandleMessages<Data> 
{ 
    readonly IBus _bus; 

    public Router(IBus bus) 
    { 
     _bus = bus; 
    } 

    public async Task Handle(Data message) 
    { 
     var subscribers = Decide(message); 

     foreach(var subscriber in subscribers) 
     { 
      await _bus.Advanced.TransportMessage.ForwardTo(subscriber); 
     } 
    } 
} 

обычай Реализуемый маршрутизатор является наиболее гибким решением, так как вы можете реализовать любую логику вам нравится, но как вы можете видеть, это немного более активно.


(*) Ребус не позволяет использовать групповые символы в целом, хотя ли проходят темы непосредственно RabbitMQ, если вам случится использовать, что в качестве транспорта, а это значит, что вы можете взять полное преимущество RabbitMQ (см this issue для получения более подробной информации о какой-то)

+0

Спасибо. Что касается последнего решения (маршрутизатор/обработчик), я написал эти строки кодов, но после отправки некоторых объектов пакета я получил исключение в обработчике. (bus is null) –

0
using (var trScope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) 
{ 
    scope.EnlistRebus(); 
    Packet packet = ReadFromDB() 
    activator.Bus.SendLocal(packet).Wait() 
    scope.Complete() 
} 


activator.Handle<Packet>(async (bus, packet) => 
{ 
    string subscriber = "subscriberA"; 
    await bus.Advanced.TransportMessage.Forward(subscriber); 
}); 
0
 using (var activator = new BuiltinHandlerActivator()) 
     { 
      activator.Handle<Packet>(async message => 
      { 
       string connectionString = 
        "Data Source=.;Initial Catalog=Rebus;User ID=sa;Password=123456"; 

       using (SqlConnection connection = new SqlConnection(connectionString)) 
       { 
        string queryString = @"INSERT INTO CLIENTPACKET(ID, CONTENT, SENT) VALUES(@id, @content, @sent)"; 
        connection.Open(); 

        using (SqlCommand command = new SqlCommand(queryString, connection)) 
        { 
         command.Parameters.Add(new SqlParameter("@id", message.ID)); 
         command.Parameters.Add(new SqlParameter("@content", message.Content)); 
         command.Parameters.Add(new SqlParameter("@sent", message.Sent)); 

         await command.ExecuteNonQueryAsync(); 
        } 
       } 
      }); 


      Configure.With(activator) 
       .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn)) 
       .Transport(t => t.UseMsmq(@"subscriberA")) 
       .Routing(r => r.TypeBased().MapAssemblyOf<Packet>("router")) 
       .Options(o => 
       { 
        TransactionOptions tranOp = new TransactionOptions(); 
        tranOp.IsolationLevel = IsolationLevel.ReadCommitted; 
        o.HandleMessagesInsideTransactionScope(tranOp); 

        o.SetNumberOfWorkers(2); 
        o.SetMaxParallelism(2); 
       }) 
       .Start(); 

      activator.Bus.Subscribe<Packet>().Wait(); 

      Console.WriteLine("Press ENTER to quit"); 
      Console.ReadLine(); 
     }