Я пытаюсь выполнить тестирование довольно простого сценария: рабочая очередь с двумя рабочими и 1 сценарий издателя, но она продолжает возвращать одно и то же сообщение снова и снова из очереди.RabbitMQ возвращает одно и то же сообщение снова и снова
Следующий код теста просто помещает от 1 до 100 сообщений в очередь, и 2 потребителя съедают их. Проблема в том, что они продолжают получать сообщения 1 и 2. Я попытался отделить подтверждение в методе, так как в моем приложении требуется время для сообщения, чтобы получить процесс (метод комментариев Подтвердить) - тогда он сделал исключение, которое маркер неизвестен:
AMQP операция была прервана: AMQP крупной причины, по инициативе Peer, код = 406, текст = "PRECONDITION_FAILED - неизвестно доставка бирка 1", Classid = 60, methodId = 80, причина =
Похоже, что признание нарушено как-то. Я попытался отключить его - тоже не повезло.
Класс:
using System;
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Backend.MQ.OCR
{
public class BatchQueue : QueueBase<BatchMessage>
{
private readonly IModel _channel;
private const string QPrefix = "ocrbatches_";
private readonly QueueingBasicConsumer _consumer;
private ulong _latesttoken = ulong.MaxValue;
private readonly string _jobid;
public BatchQueue(string connectionString, String jobid):
base(connectionString)
{
_jobid = jobid;
var factory = new ConnectionFactory()
{
HostName = connectionString
};
var connection = factory.CreateConnection();
_channel = connection.CreateModel();
_channel.QueueDeclare(Name, true, false, false, null);
//binding consumers
_channel.BasicQos(0, 1, false);
_consumer = new QueueingBasicConsumer(_channel);
_channel.BasicConsume(Name, false, _consumer);
}
public override void Publish(BatchMessage msg)
{
var message = JsonConvert.SerializeObject(msg);
var body = Encoding.UTF8.GetBytes(message);
var properties = _channel.CreateBasicProperties();
properties.SetPersistent(true);
_channel.BasicPublish("", Name, properties, body);
#if DEBUG
System.Diagnostics.Trace.WriteLine("[x] Sent task:" + msg);
#endif
}
private string Name
{
get { return QPrefix + _jobid; }
}
public override BatchMessage Receive()
{
var ea =
(BasicDeliverEventArgs)_consumer.Queue.Dequeue();
var body = ea.Body;
_channel.BasicAck(ea.DeliveryTag, false);
return JsonConvert.DeserializeObject<BatchMessage>(Encoding.UTF8.GetString(body));
}
public override void Confirm()
{
//if (_latesttoken < ulong.MaxValue) _channel.BasicAck(_latesttoken, false);
}
}
}
Юнит тесты:
#if NUNIT
using TestClass = NUnit.Framework.TestFixtureAttribute;
using TestMethod = NUnit.Framework.TestAttribute;
using TestCleanup = NUnit.Framework.TearDownAttribute;
using TestInitialize = NUnit.Framework.SetUpAttribute;
using ClassCleanup = NUnit.Framework.TestFixtureTearDownAttribute;
using ClassInitialize = NUnit.Framework.TestFixtureSetUpAttribute;
#else
#endif
using System.Threading.Tasks;
using System.Threading;
using System;
using System.Collections.Generic;
using Backend.MQ.OCR;
using Microsoft.VisualStudio.TestTools.UnitTesting;
#if NUNIT
using MAssert = NUnit.Framework.Assert;
#else
using MAssert = Microsoft.VisualStudio.TestTools.UnitTesting.Assert;
#endif
namespace MQ.Test
{
[TestClass]
public class BatchQueueTest
{
[TestMethod]
public void Concurrencytest()
{
var batchname = Guid.NewGuid().ToString();
var queue = new BatchQueue("localhost", batchname);
var tasks = new List<Task>();
var counter = 0;
for (int i = 0; i < 100; i++)
{
queue.Publish(new BatchMessage()
{
Files = new List<string>() { i.ToString() }
});
}
for (int i = 0; i < 2; i++)
{
var task = Task.Factory.StartNew(() =>
{
var q = new BatchQueue("localhost", batchname);
var res = q.Receive();
while (res != null)
{
System.Diagnostics.Trace.WriteLine(res.Files[0]);
q.Confirm();
Interlocked.Increment(ref counter);
}
});
tasks.Add(task);
}
var ok = Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(30));
MAssert.IsTrue(ok, "Tasks didnt complete in time");
MAssert.AreEqual(counter, 100, "Not all messages have been processed");
}
}
}