Я пытаюсь реализовать решение с использованием RabbitMQ для достижения чего-то вроде распределенного RPC, используя только один запрос и одну очередь ответов для большого числа процессоров, я уже реализовал такое решение с Apache Apollo и я бы хотел, чтобы смог перенести его в RabbitMQ. Вот ключевые моменты:RabbitMQ: использование маршрутизации для достижения выбора сообщений
- Каждый серверов подключается к очереди запросов только
- Каждый сервер обрабатывает запросы, которые должны быть для него (поле заголовка)
В моей реализации для Аполлона ключевым моментом было использование селекторов (например, where clauses on values of fields fields), я думал, что это достигнуто в RabbitMQ с помощью маршрутизации и маршрутизации, но я должен быть неправ, потому что я вижу, что рабочие получают сообщения, которые не должны быть для них ,
Я изменил образец маршрутизации (http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html), чтобы воспроизвести проблему. У меня есть два пользователя, с которыми я могу начать с разных параметров, определяющих routingKey, и производителя, который генерировал сообщения для одного из потребителей. Поведение, которое я вижу, заключается в том, что потребление сообщений кажется случайным (сообщение для «Джона» обрабатывается потребителем для «Джона» в первый раз, а потребителем - для «Мэри» второй раз)
У кого-нибудь есть индикация или фрагменты кода при использовании селекторов в RabbitMQ?
Ниже мой код для потребителя:
public static void Main(String[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
const String request = "request";
channel.ExchangeDeclare(request, "direct");
channel.QueueDeclare(request, true, false, false, null);
if (args.Length < 1)
{
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
var myRoutingKey = args[0];
channel.QueueBind(request, request, myRoutingKey);
Console.WriteLine($" [*] Waiting for messages for {myRoutingKey}.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
};
channel.BasicConsume(request, true, consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
и для производителя:
public static void Main(String[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
const String request = "request";
channel.ExchangeDeclare(request, "direct");
channel.QueueDeclare(request, true, false, false, null);
var routingKey = args.Length > 0 ? args[0] : "John";
const String message = "Hi";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(request, routingKey, null, body);
Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
Спасибо заранее.
Привет, Амин, спасибо за ваш ответ. Я также обнаружил, что при использовании динамически генерируемых очередей проблема не возникнет. В любом случае я искал решение только с запросом и очередью ответов как из-за проблем с производительностью, так и из-за того, что другое решение, которое я реализовал ранее с другим брокером, было таким. Узнав другую модель, последнее не является проблемой, просто требует некоторой редизайна, но я определенно должен проверить результаты. – Leon
Кроме того, используя динамически генерируемые очереди, я теряю все сообщения от производителей, которые генерируются, пока пользователи маршрутизации не работают. Я также пытался объявить обмены прочными, но это не помогает. – Leon
Динамически сгенерированные очереди будут по существу временными очередями для вас, так как вы потеряете имя очереди, которую я предполагаю? Я не считаю, что динамические имена очередей будут решением для вас, просто используйте согласованные имена (подумайте о них как о своих селекторах). Для этого не существует производительности. Для этого предназначен RabbitMQ. –