2013-07-12 2 views
0

Я ищу для создания простой очереди задач с RabbitMQ и расширением PECL PHP с именем AMQP.Как сделать простую очередь задач

Моя цель довольно проста: Производители должны отправлять сообщения в определенную очередь, содержащую конверт объекта, который необходимо обработать.

Потребители должны прислушиваться к указанной очереди и обрабатывать сообщения по мере их поступления. Мне нужно добавить больше Потребителей и позволить RabbitMq отправлять сообщения в формате roundrobin.

Хотя это очень легко найти учебники для библиотек python или java, я не смог найти их для библиотеки PECL PHP.

Я не совсем уверен, что я должен что-то связывать, у меня был рабочий пример с пользовательской php-библиотекой, которая использовала «basic_publish и basic_consume», которые не реализованы таким образом в библиотеке PECL.

Так вот что я получил до сих пор: Издательство:

$oConfig = Zend_Registry::get('config'); 
$sQueue = $oConfig->amqp->validate_queue_name; 

$oConnection = new AMQPConnection(); 
$oConnection->setLogin($oConfig->amqp->login); 
$oConnection->setPassword($oConfig->amqp->pass); 
$oConnection->setVhost($oConfig->amqp->vhost); 
$oConnection->setPort($oConfig->amqp->port); 
$oConnection->connect(); 

$oChannel = new AMQPChannel($oConnection); 
$oExchange = new AMQPExchange($oChannel); 

$sMsg = new stdClass(); 
$sMsg->nId = $p_nId; 
$sMsg->nStatus= $p_nStatus; 
try { 
    $oChannel->startTransaction(); 
    $bResponse = $oExchange->publish($sMgs,$sQueue); 
    if (!$bResponse) { 
    echo "<h1>An error occured, the message can't be published</h1>"; 
    echo "<h3>Sorry i don't know why</h3>"; 
    exit; 
    } 
    $oChannel->commitTransaction(); 
} catch (Exception $oException) { 
    echo "<h1>An error occured, the message can't be published</h1>"; 
    echo "<h3>See error below</h3>"; 
    echo "<pre>"; 
    echo print_r($oException->getMessage()); 
    echo "</pre>"; 
    exit; 
} 

работник

$oConfig = Zend_Registry::get('config'); 
    $oConnection = new AMQPConnection(); 
    $oConnection->setLogin($oConfig->amqp->login); 
    $oConnection->setPassword($oConfig->amqp->pass); 
    $oConnection->setVhost($oConfig->amqp->vhost); 
    $oConnection->setPort($oConfig->amqp->port); 
    $oConnection->connect(); 

    $oChannel = new AMQPChannel($oConnection); 
    $oQueue = new AMQPQueue($oChannel); 

    $oQueue->declare($oConfig->amqp->validate_queue_name); 

    function processMessage($oMessage, $oQueue) { 
    $nId  = $msg->body->nId; 
    $nStatus = $msg->body->nStatus; 
    $oIniAct = $oActionMap->findBy('id',$nId); 

    $sReply = $oIniAct->updateStatusMisc($nStatus); 
    if ($sReply->status == $nStatus) { 
     $oQueue->ack($sMsg['delivery_tag']); 
    } else { 
    $oQueue->nack($sMsg['delivery_tag'],AMQP_REQUEUE); 
    } 
    } 

    $oQueue->consume("processMessage",AMQP_NOPARAM); 

Что PHP документ говорит мне о том, что потреблять() заблокирует по нитку для всех? так что в принципе у меня может быть только один рабочий рабочий за раз? Также я вижу, что люди привязываются к очередям, но первый рабочий пример с основным потреблением, который я видел, не использовал это.

Как вы можете видеть, я довольно смущен, любая помощь/направления/учебники ASO ... поможет

Благодаря

ответ

0

PHP имеет синхронный характер, так что да, consume() будет блокировать основной поток в то время как основной логика состоит в том, чтобы читать все входящие данные на сокетном соединении, переводить его в структуры PHP и подавать на вашу потребительскую функцию.

В github было обсуждение вопросов, связанных с асинхронностью php-amqp, но все мы согласились, что если кому-то нужны асинхронные функции, PHP не лучший язык для этого, по дизайну.

Лично я запускаю клиентский скрипт несколько раз (на самом деле у меня есть балансир для этого), поэтому каждый потребитель не влияет друг на друга, он может терпеть неудачу и перезапускаться независимо. Я думаю, вы можете сделать то же самое.

Я запускаю клиентский скрипт несколько раз одновременно как сценарий демонов и балансировщиков (фактически нет реального балансира нагрузки), который контролирует активность потребителей (выполняется через memcache, не ясный, но WFM), и когда нет активности на балансе пользователей убивайте их один за другим (но по крайней мере один рабочий потребитель должен все еще жить). Когда потребители перегружены, балансировочный скрипт запускает больше потребителей.

Если вам нужно уничтожить одно сообщение, а затем умереть, пусть ваша потребительская функция вернется false.

Если у вас в очереди есть хотя бы одно сообщение, вы можете использовать метод AMQPQueue::get(), который не будет блокировать (или, по крайней мере, не должен) ваш основной поток.

+0

Что вы имеете в виду? вы запускаете потребительские скрипты за балансировщиком нагрузки? Я не уверен, что понимаю. В принципе, если у меня только два вхождения этого рабочего кода, это не сработает? Как вы это делаете, у вас есть рабочий, который потребляет всего одно сообщение и мгновенно умирает? Все это мне очень непонятно: s – user1159791

+0

Обновлен мой ответ. Может быть, get() будет соответствовать вашим потребностям? – pinepain

+0

На самом деле я думаю, что я в порядке с блокировкой основной темы. Я думаю, я понял, что многие потребители смогут работать на паррализованном уровне. В принципе, это недоразумение от меня, что вы и куча чтения читаете. Спасибо! – user1159791

Смежные вопросы