Я ищу для создания простой очереди задач с RabbitMQ и расширением PECL PHP с именем AMQP.Как сделать простую очередь задач
Моя цель довольно проста: Производители должны отправлять сообщения в определенную очередь, содержащую конверт объекта, который необходимо обработать.
Потребители должны прислушиваться к указанной очереди и обрабатывать сообщения по мере их поступления. Мне нужно добавить больше Потребителей и позволить RabbitMq отправлять сообщения в формате roundrobin.
Хотя это очень легко найти учебники для библиотек python или java, я не смог найти их для библиотеки PECL PHP.
Я не совсем уверен, что я должен что-то связывать, у меня был рабочий пример с пользовательской php-библиотекой, которая использовала «basic_publish и basic_consume», которые не реализованы таким образом в библиотеке PECL.
Так вот что я получил до сих пор: Издательство:
$oConfig = Zend_Registry::get('config');
$sQueue = $oConfig->amqp->validate_queue_name;
$oConnection = new AMQPConnection();
$oConnection->setLogin($oConfig->amqp->login);
$oConnection->setPassword($oConfig->amqp->pass);
$oConnection->setVhost($oConfig->amqp->vhost);
$oConnection->setPort($oConfig->amqp->port);
$oConnection->connect();
$oChannel = new AMQPChannel($oConnection);
$oExchange = new AMQPExchange($oChannel);
$sMsg = new stdClass();
$sMsg->nId = $p_nId;
$sMsg->nStatus= $p_nStatus;
try {
$oChannel->startTransaction();
$bResponse = $oExchange->publish($sMgs,$sQueue);
if (!$bResponse) {
echo "<h1>An error occured, the message can't be published</h1>";
echo "<h3>Sorry i don't know why</h3>";
exit;
}
$oChannel->commitTransaction();
} catch (Exception $oException) {
echo "<h1>An error occured, the message can't be published</h1>";
echo "<h3>See error below</h3>";
echo "<pre>";
echo print_r($oException->getMessage());
echo "</pre>";
exit;
}
работник
$oConfig = Zend_Registry::get('config');
$oConnection = new AMQPConnection();
$oConnection->setLogin($oConfig->amqp->login);
$oConnection->setPassword($oConfig->amqp->pass);
$oConnection->setVhost($oConfig->amqp->vhost);
$oConnection->setPort($oConfig->amqp->port);
$oConnection->connect();
$oChannel = new AMQPChannel($oConnection);
$oQueue = new AMQPQueue($oChannel);
$oQueue->declare($oConfig->amqp->validate_queue_name);
function processMessage($oMessage, $oQueue) {
$nId = $msg->body->nId;
$nStatus = $msg->body->nStatus;
$oIniAct = $oActionMap->findBy('id',$nId);
$sReply = $oIniAct->updateStatusMisc($nStatus);
if ($sReply->status == $nStatus) {
$oQueue->ack($sMsg['delivery_tag']);
} else {
$oQueue->nack($sMsg['delivery_tag'],AMQP_REQUEUE);
}
}
$oQueue->consume("processMessage",AMQP_NOPARAM);
Что PHP документ говорит мне о том, что потреблять() заблокирует по нитку для всех? так что в принципе у меня может быть только один рабочий рабочий за раз? Также я вижу, что люди привязываются к очередям, но первый рабочий пример с основным потреблением, который я видел, не использовал это.
Как вы можете видеть, я довольно смущен, любая помощь/направления/учебники ASO ... поможет
Благодаря
Что вы имеете в виду? вы запускаете потребительские скрипты за балансировщиком нагрузки? Я не уверен, что понимаю. В принципе, если у меня только два вхождения этого рабочего кода, это не сработает? Как вы это делаете, у вас есть рабочий, который потребляет всего одно сообщение и мгновенно умирает? Все это мне очень непонятно: s – user1159791
Обновлен мой ответ. Может быть, get() будет соответствовать вашим потребностям? – pinepain
На самом деле я думаю, что я в порядке с блокировкой основной темы. Я думаю, я понял, что многие потребители смогут работать на паррализованном уровне. В принципе, это недоразумение от меня, что вы и куча чтения читаете. Спасибо! – user1159791