2015-04-25 5 views
0

Я пытаюсь выполнить тестирование довольно простого сценария: рабочая очередь с двумя рабочими и 1 сценарий издателя, но она продолжает возвращать одно и то же сообщение снова и снова из очереди.RabbitMQ возвращает одно и то же сообщение снова и снова

Следующий код теста просто помещает от 1 до 100 сообщений в очередь, и 2 потребителя съедают их. Проблема в том, что они продолжают получать сообщения 1 и 2. Я попытался отделить подтверждение в методе, так как в моем приложении требуется время для сообщения, чтобы получить процесс (метод комментариев Подтвердить) - тогда он сделал исключение, которое маркер неизвестен:

AMQP операция была прервана: AMQP крупной причины, по инициативе Peer, код = 406, текст = "PRECONDITION_FAILED - неизвестно доставка бирка 1", Classid = 60, methodId = 80, причина =

Похоже, что признание нарушено как-то. Я попытался отключить его - тоже не повезло.

Класс:

using System; 
using System.Text; 
using Newtonsoft.Json; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 

namespace Backend.MQ.OCR 
{ 
    public class BatchQueue : QueueBase<BatchMessage> 
    { 
     private readonly IModel _channel; 
     private const string QPrefix = "ocrbatches_"; 
     private readonly QueueingBasicConsumer _consumer; 
     private ulong _latesttoken = ulong.MaxValue; 
     private readonly string _jobid; 
     public BatchQueue(string connectionString, String jobid): 
      base(connectionString) 
     { 
      _jobid = jobid; 
      var factory = new ConnectionFactory() 
      { 
       HostName = connectionString 
      }; 
      var connection = factory.CreateConnection(); 
      _channel = connection.CreateModel(); 
      _channel.QueueDeclare(Name, true, false, false, null); 
      //binding consumers 
      _channel.BasicQos(0, 1, false); 
      _consumer = new QueueingBasicConsumer(_channel); 
      _channel.BasicConsume(Name, false, _consumer); 
     } 

     public override void Publish(BatchMessage msg) 
     { 
      var message = JsonConvert.SerializeObject(msg); 
      var body = Encoding.UTF8.GetBytes(message); 
      var properties = _channel.CreateBasicProperties(); 
      properties.SetPersistent(true); 
      _channel.BasicPublish("", Name, properties, body); 
#if DEBUG 
      System.Diagnostics.Trace.WriteLine("[x] Sent task:" + msg); 
#endif 
     } 

     private string Name 
     { 
      get { return QPrefix + _jobid; } 
     } 

     public override BatchMessage Receive() 
     { 
      var ea = 
        (BasicDeliverEventArgs)_consumer.Queue.Dequeue(); 

      var body = ea.Body; 
      _channel.BasicAck(ea.DeliveryTag, false); 
      return JsonConvert.DeserializeObject<BatchMessage>(Encoding.UTF8.GetString(body)); 
     } 


     public override void Confirm() 
     { 
      //if (_latesttoken < ulong.MaxValue) _channel.BasicAck(_latesttoken, false); 
     } 
    } 
} 

Юнит тесты:

#if NUNIT 
using TestClass = NUnit.Framework.TestFixtureAttribute; 
using TestMethod = NUnit.Framework.TestAttribute; 
using TestCleanup = NUnit.Framework.TearDownAttribute; 
using TestInitialize = NUnit.Framework.SetUpAttribute; 
using ClassCleanup = NUnit.Framework.TestFixtureTearDownAttribute; 
using ClassInitialize = NUnit.Framework.TestFixtureSetUpAttribute; 
#else 
#endif 
using System.Threading.Tasks; 
using System.Threading; 
using System; 
using System.Collections.Generic; 
using Backend.MQ.OCR; 
using Microsoft.VisualStudio.TestTools.UnitTesting; 
#if NUNIT 
using MAssert = NUnit.Framework.Assert; 
#else 
using MAssert = Microsoft.VisualStudio.TestTools.UnitTesting.Assert; 
#endif 

namespace MQ.Test 
{ 
    [TestClass] 
    public class BatchQueueTest 
    { 
     [TestMethod] 
     public void Concurrencytest() 
     { 
      var batchname = Guid.NewGuid().ToString(); 
      var queue = new BatchQueue("localhost", batchname); 
      var tasks = new List<Task>(); 
      var counter = 0; 
      for (int i = 0; i < 100; i++) 
      { 
       queue.Publish(new BatchMessage() 
       { 
        Files = new List<string>() { i.ToString() } 
       }); 
      } 
      for (int i = 0; i < 2; i++) 
      { 
       var task = Task.Factory.StartNew(() => 
       { 
        var q = new BatchQueue("localhost", batchname); 
        var res = q.Receive(); 
        while (res != null) 
        { 
         System.Diagnostics.Trace.WriteLine(res.Files[0]); 
         q.Confirm(); 
         Interlocked.Increment(ref counter); 
        } 
       }); 
       tasks.Add(task); 
      } 
      var ok = Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(30)); 
      MAssert.IsTrue(ok, "Tasks didnt complete in time"); 
      MAssert.AreEqual(counter, 100, "Not all messages have been processed"); 

     } 
    } 
} 

ответ

2

Ваш аппарат тест начинается две задачи. Перед тем как время цикла вы получите сообщение, но вы держите подтверждая то же самое сообщение внутри время цикла:

var q = new BatchQueue("localhost", batchname); 
//Receive message 1 or 2 
var res = q.Receive(); 
while (res != null) 
{ //Infinite loop 
    System.Diagnostics.Trace.WriteLine(res.Files[0]); 
    q.Confirm(); 
    Interlocked.Increment(ref counter); 
} 

Try положить var res = q.Receive(); внутри цикла