2013-04-05 2 views
2

Я обрабатываю огромный XML-документ (который содержит около миллиона записей) и впоследствии импортирует отформатированную версию в db с помощью rabbitmq. Каждый раз, когда вы публикуете около 200 000 записей, я получаю сообщение об ошибке, и rabbitmq не может восстановиться.Rabbitmq - php amqp broken broken pipe error

Уведомление об ошибке: FWRITE(): отправить 2651 байт не удалось с Errno = 11 Ресурс временно недоступен в [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, строка 439]

Уведомление об ошибке: fwrite(): ошибка отправки 33 байт с errno = 104 Сброс соединения с помощью одноранговой сети в [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, строка 439]

Уведомление об ошибке: FWRITE(): отправка из 19 байт потерпел неудачу с ERRNO = 32 Брокен трубы в [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, строка 439]

ошибка узла вниз, и процесс должен быть вручную убит, чтобы восстановить его.

Это мои методы класса: -

public function publishMessage($message) { 
    if (!isset($this->conn)) { 
     $this->_createNewConnectionAndChannel(); 
    } 
    try { 
     $this->ch->basic_publish(
      new AMQPMessage($message, array('content_type' => 'text/plain')), 
      $this->defaults['exchange']['name'], 
      $this->defaults['binding']['routing_key'] 
     ); 
    } catch (Exception $e) { 
     echo "Caught exception : " . $e->getMessage(); 
     echo "Creating new connection."; 
     $this->_createNewConnectionAndChannel(); 
     $this->publishMessage($message); // try again 
    } 
} 

protected function _createNewConnectionAndChannel() { 
    if (isset($this->conn)) { 
     $this->conn->close(); 
    } 

    if(isset($this->ch)) { 
     $this->ch->close(); 
    } 

    $this->conn = new AMQPConnection(
     $this->defaults['connection']['host'], 
     $this->defaults['connection']['port'], 
     $this->defaults['connection']['user'], 
     $this->defaults['connection']['pass'] 
    ); 
    $this->ch = $this->conn->channel(); 
    $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true); 
    $this->ch->basic_qos(0 , 20 , 0); // fair dispatching 

    $this->ch->queue_declare(
     $this->defaults['queue']['name'], 
     $this->defaults['queue']['passive'], 
     $this->defaults['queue']['durable'], 
     $this->defaults['queue']['exclusive'], 
     $this->defaults['queue']['auto_delete'] 
    ); 

    $this->ch->exchange_declare(
     $this->defaults['exchange']['name'], 
     $this->defaults['exchange']['type'], 
     $this->defaults['exchange']['passive'], 
     $this->defaults['exchange']['durable'], 
     $this->defaults['exchange']['auto_delete'] 
    ); 

    $this->ch->queue_bind(
     $this->defaults['queue']['name'], 
     $this->defaults['exchange']['name'], 
     $this->defaults['binding']['routing_key'] 
    ); 
} 

Любая помощь будет оценена.

+0

вы пробовали [расширения PECL AMQP] (http://pecl.php.net/packages/amqp)? По опыту, он намного менее хрупкий и [лучше поддерживается] (https://github.com/bkw/pecl-amqp-official). – salathe

+0

да .. такой же выпуск. –

ответ

8

Убедитесь, что вы добавили доступ к виртуальному хосту для своего пользователя на Rabbit MQ. Я создал нового пользователя и забыл установить права доступа для хоста «/», который используется по умолчанию.

Вы можете сделать это через панель управления yourhost: 15672> Администратор> нажмите на пользователя> Ищите «Установить разрешение».

P.S. Я предполагаю, что ваша служба RabbitMQ запущена, пользователь существует и пароль правильный.

+0

Вы правы. Это тоже мое дело. Благодарю. – Winston

2

На самом деле эта проблема возникает, когда у вас есть большой контент внутри вашего сообщения, и ваш потребитель тратит слишком много времени, чтобы обработать только одно сообщение, то есть проблему ответа «ACK» на кролика и попытаться использовать другое сообщение.

Когда у меня есть эта проблема, например, я пытаюсь «подгонять» мои сообщения, потому что у ее работника по продуктам и у каждого сообщения было некоторое количество идентификаторов продуктов 1k, поэтому я перехожу на 100 продуктов, и он работает очень хорошо.

Вы можете прочитать больше о Детектирование мертвых TCP соединений с Heartbeats here