2016-04-05 3 views
1

Я пытаюсь написать RabbitMQ Consumer в Go. Предположим, что вы должны взять 5 объектов за один раз из очереди и обработать их. Более того, следует признать, что если успешно обработано, еще раз отправить в очередь на мертвую букву в 5 раз, а затем выбросить, он должен работать бесконечно и обрабатывать событие отмены потребителя. У меня есть несколько вопросов:RabbitMQ потребитель в Go

  1. есть ли понятие BasicConsumer против EventingBasicConsumer в RabbitMQ ход Reference?
  2. Что такое Model в RabbitMQ и есть ли он в RabbitMq-go?
  3. Как отправить объекты, когда не в очередь недоставленных и снова повторно их в очередь после того, как ttl
  4. Какое значение consumerTag аргумента в функции ch.Consume в коде ниже
  5. Если мы используем channel.Get() или channel.Consume() для этого сценария?

Каковы изменения, которые мне необходимо внести в приведенном ниже коде, чтобы соответствовать вышеуказанному требованию. Я спрашиваю об этом, потому что я не мог найти приличную документацию о RabbitMq-Go.

func main() { 

     consumer()   
    } 

    func consumer() { 

     objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}  
     initializeConn(&objConsumerConn.conn) 


     ch, err := objConsumerConn.conn.Channel() 
     failOnError(err, "Failed to open a channel") 
     defer ch.Close() 

     msgs, err := ch.Consume(
       objConsumerConn.queueName, // queue 
       "demo1",  // consumerTag 
       false, // auto-ack 
       false, // exclusive 
       false, // no-local 
       false, // no-wait 
       nil, // args 
     ) 
     failOnError(err, "Failed to register a consumer") 

     forever := make(chan bool) 

     go func() { 
      for d := range msgs {     
       k := new(EventCaptureData) 
       b := bytes.Buffer{} 
       b.Write(d.Body) 
       dec := gob.NewDecoder(&b) 
       err := dec.Decode(&k) 
       d.Ack(true) 

       if err != nil { fmt.Println("failed to fetch the data from consumer", err); } 
        fmt.Println(k)       
      } 
     }()  

     log.Printf(" Waiting for Messages to process. To exit press CTRL+C ") 
     <-forever 

    } 

Edited вопрос:

Я задержал обработку сообщений, как предложено в ссылках link1link2. Но проблема в том, что сообщения возвращаются в исходную очередь из очереди с мертвой буквой даже после ttl. Я использую RabbitMQ 3.0.0. Может ли кто-нибудь указать, в чем проблема?

+0

Попробуйте AMQP пакет для взаимодействия с кроликом, и он имеет очень приличную документацию https://godoc.org/github.com/streadway/amqp – PerroVerd

+0

@PerroVerd Вот что я использую. – Naresh

ответ

1

Есть ли понятие BasicConsumer против EventingBasicConsumer в RabbitMQ-го задания?

Не совсем, но звонки Channel.Get и Channel.Consume служат аналогичной концепции. С Channel.Get у вас есть неблокирующий вызов, который получает первое сообщение, если он есть, или возвращает ok=false. С Channel.Consume поставленные сообщения доставляются на канал.

Что такое модель в RabbitMQ и есть ли она в RabbitMq-go?

Если вы имеете в виду IModel и Connection.CreateModel в C# RabbitMQ, что кое-что из C# Lib, а не от самого RabbitMQ. Это была просто попытка отвлечься от терминологии RabbitMQ «Channel», но она никогда не попадалась.

Как отправить объекты, когда не в очередь недоставленных и снова вновь поставить их в очередь после того, как ТТЛ

Используйте метод delivery.Nack с requeue=false.

Каково значение аргумента consumerTag в ch.Потребление функции в коде ниже

ConsumerTag является только потребителем идентификатор. Он может использоваться для отмены канала с channel.Cancel и для идентификации потребителя, ответственного за доставку. Все сообщения, отправленные с channel.Consume, будут иметь поле ConsumerTag.

Должны ли мы использовать channel.Get() или channel.Consume() для этого сценария?

Я думаю, channel.Get() почти никогда не предпочтительнее более чем channel.Consume(). С channel.Get вы будете опросить очередь и потратить впустую CPU, ничего не делая, что не имеет смысла в Go.

Каковы изменения, которые мне необходимо внести в нижеследующем коде, чтобы соответствовать вышеуказанным требованиям .

  1. Поскольку вы пакетной обработки 5 в то время, вы можете иметь goroutine, который получает от потребителей канала и, как только он получает 5 поставки вы вызываете другую функцию для их обработки.

  2. Чтобы подтвердить или отправить в очередь с мертвой буквой, вы будете использовать функцииили delivery.Nack. Вы можете использовать multiple=true и называть его один раз для партии. Когда сообщение отправляется в очередь с мертвой буквой, вы должны проверить заголовок delivery.Headers["x-death"], сколько раз он был мертвым, и вызывать delivery.Reject, когда его повторно повторили уже 5 раз.

  3. Используйте channel.NotifyCancel для обработки события отмены.

+0

Благодарим вас за подробное объяснение. – Naresh

+0

@ Педро .. У меня есть одна проблема. Если я использую d.Ack (true) или d.Ack (false), он не публикует сообщения в dead-lettered-queue. Где, как и в случае d.Nack (true, false), публикуется. Но затем после ttl он отбрасывает сообщения оттуда. Итак, каковы значения для достижения того же – Naresh

+0

Я редактировал вопрос. Не могли бы вы заглянуть в него? – Naresh

Смежные вопросы