2015-07-12 4 views
11

Мне интересно, почему мой RabbitMQ RPC-Client всегда обрабатывал мертвые сообщения после перезагрузки. _channel.QueueDeclare(queue, false, false, false, null); должен отключить буферы. Если я перегружаю QueueDeclare внутри RPC-клиента, я не могу подключиться к серверу. Здесь что-то не так? Любая идея, как решить эту проблему?RabbitMQ длительная очередь не работает (RPC-сервер, RPC-клиент)


RPC-сервер

new Thread(() => 
{ 
    var factory = new ConnectionFactory { HostName = _hostname }; 
    if (_port > 0) 
     factory.Port = _port; 
    _connection = factory.CreateConnection(); 
    _channel = _connection.CreateModel(); 

    _channel.QueueDeclare(queue, false, false, false, null); 
    _channel.BasicQos(0, 1, false); 
    var consumer = new QueueingBasicConsumer(_channel); 
    _channel.BasicConsume(queue, false, consumer); 
    IsRunning = true; 
    while (IsRunning) 
    { 
     BasicDeliverEventArgs ea; 
     try { 
      ea = consumer.Queue.Dequeue(); 
     } 
     catch (Exception ex) { 
      IsRunning = false; 
     } 
     var body = ea.Body; 
     var props = ea.BasicProperties; 
     var replyProps = _channel.CreateBasicProperties(); 
     replyProps.CorrelationId = props.CorrelationId; 

     var xmlRequest = Encoding.UTF8.GetString(body); 

     var messageRequest = XmlSerializer.DeserializeObject(xmlRequest, typeof(Message)) as Message; 
     var messageResponse = handler(messageRequest); 

     _channel.BasicPublish("", props.ReplyTo, replyProps, 
           messageResponse); 
     _channel.BasicAck(ea.DeliveryTag, false); 
    } 
}).Start(); 

RPC-клиент

public void Start() 
{ 
    if (IsRunning) 
     return; 
    var factory = new ConnectionFactory { 
     HostName = _hostname, 
     Endpoint = _port <= 0 ? new AmqpTcpEndpoint(_endpoint) 
           : new AmqpTcpEndpoint(_endpoint, _port) 
    }; 
    _connection = factory.CreateConnection(); 
    _channel = _connection.CreateModel(); 
    _replyQueueName = _channel.QueueDeclare(); // Do not connect any more 
    _consumer = new QueueingBasicConsumer(_channel); 
    _channel.BasicConsume(_replyQueueName, true, _consumer); 
    IsRunning = true; 
} 

public Message Call(Message message) 
{ 
    if (!IsRunning) 
     throw new Exception("Connection is not open."); 
    var corrId = Guid.NewGuid().ToString().Replace("-", ""); 
    var props = _channel.CreateBasicProperties(); 
    props.ReplyTo = _replyQueueName; 
    props.CorrelationId = corrId; 

    if (!String.IsNullOrEmpty(_application)) 
     props.AppId = _application; 

    message.InitializeProperties(_hostname, _nodeId, _uniqueId, props); 

    var messageBytes = Encoding.UTF8.GetBytes(XmlSerializer.ConvertToString(message)); 
    _channel.BasicPublish("", _queue, props, messageBytes); 

    try 
    { 
     while (IsRunning) 
     { 
      var ea = _consumer.Queue.Dequeue(); 
      if (ea.BasicProperties.CorrelationId == corrId) 
      { 
       var xmlResponse = Encoding.UTF8.GetString(ea.Body); 
       try 
       { 
        return XmlSerializer.DeserializeObject(xmlResponse, typeof(Message)) as Message; 
       } 
       catch(Exception ex) 
       { 
        IsRunning = false; 
        return null; 
       } 
      } 
     } 
    } 
    catch (EndOfStreamException ex) 
    { 
     IsRunning = false; 
     return null; 
    } 
    return null; 
} 

ответ

6

Попробуйте установить свойство DeliveryMode на непостоянное (1) в вашем rPC- Код клиента:

public Message Call(Message message) 
{ 
    ... 
    var props = _channel.CreateBasicProperties(); 
    props.DeliveryMode = 1; //you might want to do this in your RPC-Server as well 
    ... 
} 

AMQP Model Explained содержит очень полезные ресурсы, например, объясняя, как обрабатывать сообщения, которые попадают в очередь мертвой буквы.

Еще одна полезная заметка из документации касаемо очереди Долговечность:

Прочные очереди, сохраняются на диск и, таким образом выжить брокера перезагружается. Очереди, которые не являются долговечными, называются переходными. Не все сценарии и мандатные очереди использования должны быть долговечными.

Долговечность очереди не делает сообщения, которые перенаправлены на то, что очередь долговечна. Если брокер снят, а затем возвращен, просроченная очередь будет повторно объявлена ​​во время запуска брокера, однако только постоянные сообщения будут восстановлены.

Обратите внимание, что он говорит о брокера рестарт не издатель или перезапуске потребителя.

+0

Помогло ли это @ MR.ABC? –

Смежные вопросы