2016-03-03 3 views
2

Итак, у нас была существующая библиотека помощников, которая использовала ServiceStack.Redis и в настоящее время пытается обменять ее на StackExchange.Redis. Мы использовали BlockingPop (BLPOP), но поскольку StackExchange.Redis не поддерживает его. мы реализовали следующиеStackExchange.Redis Blocking pop design

public static void Push(string Qname, string val) 
{ 
    IDatabase db = redis.GetDatabase(); 
    db.ListLeftPush(Qname, val); 
    ISubscriber sub = redis.GetSubscriber(); 
    sub.Publish(Qname + "_msg", "1"); 
} 

и Pop с блокированием опции следующим образом:

public static string Pop(string Qname, 
    bool block_until_available = false,int timeout_secs=0) 
{ 
    IDatabase db = redis.GetDatabase();    
    var popped = db.ListRightPop(Qname); 
    if (popped.IsNull) 
    { 
     if (block_until_available == false) 
      return null; 
    } 
    else 
     return popped; 

    //wait for an item to be pushed in. 
    ISubscriber sub = redis.GetSubscriber(); 
    AutoResetEvent autoEvent = new AutoResetEvent(false); 
    string obj = null; 
    Task.Run(() => 
    { 
     sub.Subscribe(Qname + "_msg", (channel, message) => 
     { 
      popped = db.ListRightPop(Qname); 
      if (!popped.IsNull) 
      { 
       obj = popped; 
       sub.Unsubscribe(Qname + "_msg"); 
       autoEvent.Set(); 
      } 
     }); 
    }); 
    if (timeout_secs > 0) 
     autoEvent.WaitOne(timeout_secs * 1000); 
    else 
     autoEvent.WaitOne(); 
    return obj; 
} 

ли вы все видите какой-либо очевидной проблемы с этим подходом.

Кроме того, я быстро столкнулся с следующей ошибкой. Я увеличил syncTimeout. надеюсь, что это исправит?

System.TimeoutException: Timeout performing RPOP DL_PROD, 
inst: 0, mgr: ProcessReadQueue, err: never, queue: 0, qu: 0, qs: 0, qc: 0, 
wr: 0, wq: 0, in: 0, ar: 1, IOCP: (Busy=0,Free=1000,Min=8,Max=1000), 
WORKER: (Busy=2,Free=32765,Min=8,Max=32767), 
clientName: CD147RE1 at 
StackExchange.Redis.ConnectionMultiplexer.ExecuteSyncImpl[T] 
(Message message, ResultProcessor`1 processor, ServerEndPoint server) 
+1

Не знаю, почему это нисходящее - вопрос справедлив –

+0

У меня возникло много проблем с этим решением. Используйте это решение с особой осторожностью. – coderguy123

ответ

2

Нет никакой конкретной причины, по которой RPOP должен тайм-аут здесь, если только это не связано с пропускной способностью (огромная полезная нагрузка). Эта ошибка не связана с вопросом ИМО. Этот подход является большим уродливым, но ... ну, это может сработать. Однако нет необходимости использовать Task.Run здесь. Одна из проблем подхода заключается в том, что он будет работать некорректно одновременно, я думаю. Кажется, что отменили подписку на всех участников этого канала/соединения, а не только на одного. Самостоятельная отмена подписки возможна, но, откровенно говоря, я задаюсь вопросом, проще ли просто иметь одно событие автоматического сброса и одну подписку, а если сообщение разблокирует ворота во время ожидания: отлично.


Общая схема самостоятельной отписки делегатов, по существу:

YourDelegateType handler = null; 
handler = (args) => { 
    DoTheThing(); 
    UnSubscribe(handler); 
}; 
Subscribe(handler); 

Это использует радость лексических захваченные переменные, чтобы обеспечить доступ к экземпляру делегата внутри делегата, что означает делегат может пройти себя к методу отмены подписки. Образец, показанный выше, должен работать для всех сценариев callback на основе делегатов, включая регулирование событий и таких вещей, как SE/REDIS pub/sub handlers.

+0

Нет, полезная нагрузка небольшая. Редис почти не используется. Единственное, что я мог подумать, это сеть. Хороший момент о том, что вы подписываете и подписываете идею. будет изучать это. – coderguy123

+0

также как я могу отказаться от подписки? – coderguy123

+0

@ coderguy123, что является более сложным; будет отображаться в редакции –

Смежные вопросы