2016-11-09 2 views
1

Я использую ServiceStack MQ Server/Client, чтобы использовать архитектуру на основе сообщений на своей платформе, и она работает безупречно. Теперь я пытаюсь сделать что-то, что, как мне кажется, не поддерживает SS Message Producer/Consumer.ServiceStack Message Filtering

По существу, я увольняю сообщения (события) в централизованном центре обработки данных, и у меня есть децентрализованные узлы ~ 2000 по всей территории США над ненадежной сетью, которые должны потенциально знать об этом событии, но это событие должно быть нацелено к одному из узлов ~ 2000. Мне нужна гибкость произвольно названных каналов с Pub/Sub, но долговечность MQ. Я начал с Pub/Sub, но сеть слишком ненадежна, поэтому я переместил решение для использования RedisMQServer. Я работаю, но хочу убедиться, что в интерфейсе не хватает чего-то. Мне любопытно, если создатели СС продумали этот вариант использования, и если да, то каков был результат этой дискуссии? Это противоречит концепции использования POCO для управления результатами/действиями потребления сообщений. Может быть, это и есть причина?

Вот мой продюсер

public ExpressLightServiceResponse Get(ExpressLightServiceRequest query) 
    { 
     var result = new ExpressLightServiceResponse(); 

     var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run); 
     var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName"); 
     var typeBuilder = moduleBuilder.DefineType(string.Format("EventA{0}", query.Store), TypeAttributes.Public); 

     typeBuilder.DefineDefaultConstructor(MethodAttributes.Public); 

     var newType = typeBuilder.CreateType(); 

     using (var messageProducer = _messageService.CreateMessageProducer()) 
     { 
      var message = MessageFactory.Create(newType.CreateInstance()); 
      messageProducer.Publish(message); 
     } 

     return result; 
    } 

Вот мой потребитель

public class ServerAppHost : AppHostHttpListenerBase 
{ 
    private readonly string _store; 

    public string StoreQueue => $"EventA{_store}"; 

    public ServerAppHost(string store) : base("Express Light Server", typeof(PubSubServiceStatsService).Assembly) 
    { 
     _store = store; 
    } 

    public override void Configure(Container container) 
    { 
     container.Register<IRedisClientsManager>(new PooledRedisClientManager(ConfigurationManager.ConnectionStrings["Redis"].ConnectionString)); 

     var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run); 
     var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName"); 
     var typeBuilder = moduleBuilder.DefineType(StoreQueue, TypeAttributes.Public); 

     typeBuilder.DefineDefaultConstructor(MethodAttributes.Public); 

     var newType = typeBuilder.CreateType(); 

     var mi = typeof(Temp).GetMethod("Foo"); 
     var fooRef = mi.MakeGenericMethod(newType); 
     fooRef.Invoke(new Temp(container.Resolve<IRedisClientsManager>()), null); 
    } 
} 

public class Temp 
{ 
    private readonly IRedisClientsManager _redisClientsManager; 

    public Temp(IRedisClientsManager redisClientsManager) 
    { 
     _redisClientsManager = redisClientsManager; 
    } 

    public void Foo<T>() 
    { 
     var mqService = new RedisMqServer(_redisClientsManager); 
     mqService.RegisterHandler<T>(DoWork); 
     mqService.Start(); 
    } 

    private object DoWork<T>(IMessage<T> arg) 
    { 
     //Do work 
     return null; 
    } 
} 

Что это дает мне это гибкость Pub/Sub с прочностью на очереди. Кто-нибудь видит/знает более «родной» способ достичь этого?

ответ

0

Там должен быть только 1 MQ хост зарегистрирован в вашем AppHost, так что я бы в первую очередь удалить его из класса обертки и он просто зарегистрировать обработчик, например:

public override void Configure(Container container) 
{ 
    //... 

    container.Register<IMessageService>(
     c => new RedisMqServer(c.Resolve<IRedisClientsManager>()); 
    var mqServer = container.Resolve<IMessageService>(); 

    fooRef.Invoke(new Temp(mqServer), null); 

    mqServer.Start(); 
} 

public class Temp 
{ 
    private readonly IMessageService mqServer; 
    public Temp(IMessageService mqServer) 
    { 
     this.mqServer = mqServer; 
    } 

    public void Foo<T>() => mqService.RegisterHandler<T>(DoWork); 
} 

Но этот подход не является хорошо подходит для ServiceStack, который поощряет использование кодовых первых сообщений, которые определяют контракт на обслуживание, который клиент/сервер использует для обработки отправляемых и полученных сообщений. Поэтому, если вы хотите использовать ServiceStack для отправки пользовательских сообщений, я бы рекомендовал либо иметь отдельный класс для каждого сообщения, либо иметь общий тип, например SendEvent, где сообщение или тип события является свойством класса.

В противном случае, если вы хотите продолжать пользовательские сообщения не использовать RedisMqServer, вы можете просто использовать dedicated MQ like Rabbit MQ или, если вы предпочитаете использовать Redis List directly - который является структурой данных, использование все REDIS MQ под ним.