Я обрабатываю огромный XML-документ (который содержит около миллиона записей) и впоследствии импортирует отформатированную версию в db с помощью rabbitmq. Каждый раз, когда вы публикуете около 200 000 записей, я получаю сообщение об ошибке, и rabbitmq не может восстановиться.Rabbitmq - php amqp broken broken pipe error
Уведомление об ошибке: FWRITE(): отправить 2651 байт не удалось с Errno = 11 Ресурс временно недоступен в [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, строка 439]
Уведомление об ошибке: fwrite(): ошибка отправки 33 байт с errno = 104 Сброс соединения с помощью одноранговой сети в [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, строка 439]
Уведомление об ошибке: FWRITE(): отправка из 19 байт потерпел неудачу с ERRNO = 32 Брокен трубы в [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, строка 439]
ошибка узла вниз, и процесс должен быть вручную убит, чтобы восстановить его.
Это мои методы класса: -
public function publishMessage($message) {
if (!isset($this->conn)) {
$this->_createNewConnectionAndChannel();
}
try {
$this->ch->basic_publish(
new AMQPMessage($message, array('content_type' => 'text/plain')),
$this->defaults['exchange']['name'],
$this->defaults['binding']['routing_key']
);
} catch (Exception $e) {
echo "Caught exception : " . $e->getMessage();
echo "Creating new connection.";
$this->_createNewConnectionAndChannel();
$this->publishMessage($message); // try again
}
}
protected function _createNewConnectionAndChannel() {
if (isset($this->conn)) {
$this->conn->close();
}
if(isset($this->ch)) {
$this->ch->close();
}
$this->conn = new AMQPConnection(
$this->defaults['connection']['host'],
$this->defaults['connection']['port'],
$this->defaults['connection']['user'],
$this->defaults['connection']['pass']
);
$this->ch = $this->conn->channel();
$this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
$this->ch->basic_qos(0 , 20 , 0); // fair dispatching
$this->ch->queue_declare(
$this->defaults['queue']['name'],
$this->defaults['queue']['passive'],
$this->defaults['queue']['durable'],
$this->defaults['queue']['exclusive'],
$this->defaults['queue']['auto_delete']
);
$this->ch->exchange_declare(
$this->defaults['exchange']['name'],
$this->defaults['exchange']['type'],
$this->defaults['exchange']['passive'],
$this->defaults['exchange']['durable'],
$this->defaults['exchange']['auto_delete']
);
$this->ch->queue_bind(
$this->defaults['queue']['name'],
$this->defaults['exchange']['name'],
$this->defaults['binding']['routing_key']
);
}
Любая помощь будет оценена.
вы пробовали [расширения PECL AMQP] (http://pecl.php.net/packages/amqp)? По опыту, он намного менее хрупкий и [лучше поддерживается] (https://github.com/bkw/pecl-amqp-official). – salathe
да .. такой же выпуск. –