2012-06-19 2 views
1

Я пытаюсь создать службу RPC в PHP с помощью RabbitMQ подобного этого примера: http://www.rabbitmq.com/tutorials/tutorial-six-java.html Я использую это расширение PECL: http://pecl.php.net/package/amqp (версия 1.0.3)RabbitMQ RPC: Эксклюзивные очереди запирающих @ PHP

проблема в том, что моя очередь обратного вызова (объявленная в скрипте клиента) равна заблокирована для сервера, когда я добавляю к нему флаг AMQP_EXCLUSIVE.

Вот мой сервер

// connect to server 
$cnn = new AMQPConnection('...'); 
$cnn->connect(); 
$channel = new AMQPChannel($cnn); 
// create exchange 
$exchangeName = 'k-exchange'; 
$exchange = new AMQPExchange($channel); 
$exchange->setName($exchangeName); 
$exchange->setType(AMQP_EX_TYPE_DIRECT); 
$exchange->declare(); 

// declare queue to consume messages from 
$queue = new \AMQPQueue($channel); 
$queue->setName('tempQueue'); 
$queue->declare(); 

// start consuming messages 
$queue->consume(function($envelope, $queue) 
    use ($channel, $exchange) { 

    // create callback queue 
    $callbackQueue = new \AMQPQueue($channel); 
    $callbackQueue->setName($envelope->getReplyTo()); 
    $callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag 

    /* WARNING: Following code line causes error. See rabbit logs below: 
    * connection <0.1224.10>, channel 1 - error: 
    * {amqp_error,resource_locked, 
    * "cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'", 
    * 'queue.bind'} 
    */ 
    $callbackQueue->bind($exchange->getName(), 'rpc_reply'); 

    // trying to publish response back to client's callback queue 
    $exchange->publish(
     json_encode(array('processed by remote service!')), 
     'rpc_reply', 
     AMQP_MANDATORY & AMQP_IMMEDIATE 
    ); 

    $queue->ack($envelope->getDeliveryTag()); 
}); 

А вот мой Client.php

// connect to server 
$cnn = new AMQPConnection('...'); 
$cnn->connect(); 
$channel = new AMQPChannel($cnn); 
// create exchange 
$exchangeName = 'k-exchange'; 
$exchange = new AMQPExchange($channel); 
$exchange->setName($exchangeName); 
$exchange->setType(AMQP_EX_TYPE_DIRECT); 
$exchange->declare(); 

// create a queue which we send messages to server via 
$queue = new \AMQPQueue($channel); 
$queue->setName('tempQueue'); 
$queue->declare(); 

// binding exchange to queue 
$queue->bind($exchangeName, 'temp_action'); 

// create correlation_id 
$correlationId = sha1(time() . rand(0, 1000000)); 

// create anonymous callback queue to get server response response via 
$callbackQueue = new \AMQPQueue($channel); 
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag 
$callbackQueue->declare(); 

// publishing message to exchange (passing it to server) 
$exchange->publish(
    json_encode(array('process me!')), 
    'temp_action', 
    AMQP_MANDATORY, 
    array(
     'reply_to' => $callbackQueue->getName(), // pass callback queue name 
     'correlation_id' => $correlationId 
    ) 
); 

// going to wait for remote service complete tasks. tick once a second 
$attempts = 0; 
while ($attempts < 5) 
{ 
    echo 'Attempt ' . $attempts . PHP_EOL; 
    $envelope = $callbackQueue->get(); 
    if ($envelope) { 
     echo 'Got response! '; 
     print_r($envelope->getBody()); 
     echo PHP_EOL; 
     exit; 
    } 

    sleep(1); 
    $attempts++; 
} 

Таким образом, в конце концов, я просто вижу ошибку в журналах RabbitMQ в:

connection <0.1224.10>, channel 1 - error: 
{amqp_error,resource_locked, 
    "cannot obtain exclusive access to locked queue 'amq.gen-Q6J...' in vhost '/'", 
    'queue.bind'} 

Вопрос: Каков правильный способ создания объекта callbackQueue в Server.php? Похоже, что мой Server.php отличается от соединения Client.php с сервером RabbitMQ. Что мне здесь делать? Как мне «разделить» одно и то же (с Client.php) соединение на стороне Server.php.

UPDATE Вот еще несколько RabbitMQ Бревна

Мои Server.php соединение (Id является: < 0.22322.27>)

=INFO REPORT==== 20-Jun-2012::13:30:22 === 
    accepting AMQP connection <0.22322.27> (127.0.0.1:58457 -> 127.0.0.1:5672) 

Мои Client.php соединение (Id является: < 0.22465.27>)

=INFO REPORT==== 20-Jun-2012::13:30:38 === 
    accepting AMQP connection <0.22465.27> (127.0.0.1:58458 -> 127.0.0.1:5672) 

Теперь я вижу Server.php причины ошибки:

=ERROR REPORT==== 20-Jun-2012::13:30:38 === 
    connection <0.22322.27>, channel 1 - error: 
{amqp_error,resource_locked, 
"cannot obtain exclusive access to locked queue 'amq.gen-g6Q...' in vhost '/'", 
'queue.bind'} 

Мой Успенская Я подозреваю, так Client.php и Server.php не разделяют связи с тем же идентификатором, что невозможно для них и использовать эксклюзивную очередь, объявленную в Client.php

ответ

0

На ваш сервер, вы также должны объявить свою очередь исключительной. Помните, что очереди RabbitMQ должны иметь одинаковый флаг. Например, если вы объявите очередь, которая установлена ​​на «долговечность», другой конец должен также объявить очередь «прочной». Таким образом, на вашем сервере установите флаг $callbackQueue->setFlags(AMQP_EXCLUSIVE);, как это у вашего клиента.

+0

Привет, Я добавил ЭКСКЛЮЗИВ флага моей Server.php но не работает Похоже, сервер и клиент имеют разный соед на сервер RabbitMQ, и это проблема. Или, может быть, я ошибаюсь – Denis

0

Моего ответа от этого вопроса ответил на RabbitMQ официального списка

рассылки

Пока не использует ту же библиотеку здесь у вас есть официальные учебники перенесены на PHP

https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php

Проблема в вашем коде что вы объявляете очереди с разными параметрами.

Так, как говорится в одном ответе, если вы объявляете очередь A долговечной, то любое другое объявление этой очереди должно быть долговечным. То же самое для эксклюзивного флага.

Также вам не нужно обновлять очередь для публикации сообщений. Как сервер RPC вы предполагаете, что адрес, отправленный в свойство «reply_to», уже присутствует. Я считаю, что RpcClient отвечает за то, чтобы очередь, в которой он ждет ответов, уже существует.

Добавление:

Эксклюзивность в очередях означает, что единственный канал, который объявлен очередь может получить к нему доступ.

4

Есть несколько проблем, с вашей реализации:

  1. декларации Обмен
  2. Ручная установка очереди ответа противоположность используя временную очередь
  3. Запрещается использование AMQP_EXCLUSIVE в обоих направлениях

Заявление об обмене

Yo u не нужно объявлять обмен (AMQPExchange) для публикации сообщений. В этом примере RPC вам необходимо использовать его как способ трансляции сообщения (например, временная очередь или временный обмен). Вся связь будет происходить непосредственно на QUEUE и теоретически обходит обмен.

$exchange = new AMQPExchange($channel); 
$exchange->publish(...); 

QUEUES & Ответить Кому:

При использовании AMQPQueue :: SetName() вместе с AMQPQueue :: заявляют(), вы привязки к очереди с определенным именем пользователя. Если вы объявляете очередь без имени, это называется временной очередью. Это полезно, когда вам нужно получить широковещательное сообщение с определенного ключа маршрутизации. По этой причине RabbitMQ/AMQP генерирует случайное временное имя. Поскольку имя очереди создается для данного экземпляра, чтобы потреблять информацию исключительно, сама по себе, она удаляется, когда соединение закрывается.

Когда клиент RPC хочет опубликовать сообщение (AMQPExchange :: publish()), он должен указать ответ в качестве одного из параметров публикации. Таким образом, сервер RPC может получать случайно сгенерированное имя при получении запроса. Он использует имя ответа для имени как имя QUEUE, на котором сервер будет отвечать данному клиенту. Наряду с временным именем очереди экземпляр должен послать коррел цию, чтобы гарантировать, что получаемое им сообщение ответа уникально для экземпляра запроса.

RabbitMQ Tutorial Six Diagram

Client

$exchange = new AMQPExchange($channel); 

$rpcServerQueueName = 'rpc_queue'; 

$client_queue = new AMQPQueue($this->channel); 
$client_queue->setFlags(AMQP_EXCLUSIVE); 
$client_queue->declareQueue(); 
$callbackQueueName = $client_queue->getName(); //e.g. amq.gen-JzTY20BRgKO-HjmUJj0wLg 

//Set Publish Attributes 
$corrId = uniqid(); 
$attributes = array(
    'correlation_id' => $corrId, 
    'reply_to'  => $this->callbackQueueName 
); 

$exchange->publish(
    json_encode(['request message']), 
    $rpcServerQueueName, 
    AMQP_NOPARAM, 
    $attributes 
); 

//listen for response 
$callback = function(AMQPEnvelope $message, AMQPQueue $q) { 
    if($message->getCorrelationId() == $this->corrId) { 
     $this->response = $message->getBody(); 
     $q->nack($message->getDeliveryTag()); 
     return false; //return false to signal to consume that you're done. other wise it continues to block 
    } 
}; 

$client_queue->consume($callback); 

Сервер

$exchange = new AMQPExchange($channel); 

$rpcServerQueueName = 'rpc_queue'; 


$srvr_queue = new AMQPQueue($channel); 
$srvr_queue->setName($rpcServerQueueName); //intentionally declares the rpc_server queue name 
$srvr_queue->declareQueue(); 
... 
$srvr_queue->consume(function(AMQPEnvelope $message, AMQPQueue $q) use (&$exchange) { 

    //publish with the exchange instance to the reply to queue 
    $exchange->publish(
     json_encode(['response message']), //reponse message 
     $message->getReplyTo(),    //get the reply to queue from the message 
     AMQP_NOPARAM,      //disable all other params 
     $message->getCorrelationId()  //obtain and respond with correlation id 
    ); 

    //acknowledge receipt of the message 
    $q->ack($message->getDeliveryTag()); 
}); 

AMQP_EXCLUSIVE

В этом случае EXCLUSIVE используется только для временной очереди клиента Rpc для каждого экземпляра, чтобы он мог публиковать сообщение. Другими словами, клиент создает одноразовую временную очередь для самостоятельного получения ответа с сервера RPC. Это гарантирует, что ни одна другая нить канала не может отправлять сообщения в эту очередь. Он заблокирован только для клиента и его ответчика.Важно отметить, что AQMP_EXCLUSIVE не мешает RPC-серверу отвечать на очередь ответа клиента. AMQP_EXCLUSIVE относится к двум отдельным потокам (экземплярам каналов), пытающимся опубликовать их в один и тот же ресурс очереди. Когда это происходит, очередь по существу заблокирована для последующих соединений. Такое же поведение происходит с объявлением обмена.

@Denis: Ваша реализация в данном случае правильно до точки

Bad - не повторно объявить очереди на сервере. Это работа клиента

$callbackQueue = new \AMQPQueue($channel); 
$callbackQueue->setName($envelope->getReplyTo()); 
$callbackQueue->setFlags(AMQP_EXCLUSIVE); // set EXCLUSIVE flag 
... 
$callbackQueue->bind($exchange->getName(), 'rpc_reply'); 

Вы пытаетесь связать с очередью под названием tempQueue. Но вы уже создали очередь tempQueue в client.php. В зависимости от того, какой сервис запускается первым, другой будет выдавать ошибку. Таким образом, вы можете вырезать все, что и просто держать последнюю часть:

// trying to publish response back to client's callback queue 
$exchange->publish(
    json_encode(array('processed by remote service!')), 
    'rpc_reply', //<--BAD Should be: $envelope->getReplyTo() 
    AMQP_MANDATORY & AMQP_IMMEDIATE 
); 

Затем измените выше заменой:

'rpc_reply' 

with 

$envelope->getReplyTo() 

Не объявляйте Имя очереди на стороне клиента

// create a queue which we send messages to server via 
$queue = new \AMQPQueue($channel); 
//$queue->setName('tempQueue'); //remove this line 
//add exclusivity 
$queue->setFlags(AMQP_EXCLUSIVE); 
$queue->declare(); 

//no need for binding... we're communicating on the queue directly 
//there is no one listening to 'temp_action' so this implementation will send your message into limbo 
//$queue->bind($exchangeName, 'temp_action'); //remove this line 
Смежные вопросы