2015-04-01 2 views
3

Итак, я пытаюсь использовать Kafka для своего приложения, которое имеет действия по регистрации производителей в Kafka MQ и потребитель, который считывает его с MQ. Поскольку мое приложение находится в Go, я использую Ищите Сараму, чтобы сделать это возможным.Обработка ошибок Kafka с использованием Shopify Sarama

Прямо сейчас, я в состоянии считывать с MQ и распечатать содержимое сообщения с помощью

fmt.Printf 

Howeveer, я бы очень хотел, обработку ошибки, чтобы быть лучше, чем консоли печати, и я готов Пройти лишнюю милю.

код прямо сейчас для присоединения потребителя:

mqCfg := sarama.NewConfig() 

master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg) 
if err != nil { 
    panic(err) // Don't want to panic when error occurs, instead handle it 
} 

и обработка сообщений:

go func() { 
    defer wg.Done() 
    for message := range consumer.Messages() { 
     var msgContent Message 
     _ = json.Unmarshal(message.Value, &msgContent) 
     fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it 
    } 
}() 

Мои вопросы (я новичок в тестировании Кафки и новый Кафка в целом):

  1. В каких случаях могут возникать ошибки в вышеуказанной программе, чтобы я мог их обрабатывать? Любой образец кода будет для меня отличным. Условия ошибки, о которых я мог подумать, - это когда msgContent на самом деле не содержит полей типа ContentId в JSON.

  2. В kafka есть ситуации, когда потребитель пытается читать при текущем смещении, но почему-то не смог (даже когда JSON хорошо сформирован)? Возможно ли, чтобы мой потребитель мог отступить, чтобы сказать x шагов выше неудачного смещения, прочитайте и повторно обработайте смещения? Или есть лучший способ сделать это? опять же, какими могут быть эти ситуации?

Я открыт для чтения и пробовать вещи.

+0

json.Unmarshal может вызвать заблуждение, и если вы не хотите, чтобы бросить панику ... Просто не :) – MIkCode

+1

Ха. Благодарю. Любая идея о том, как я мог бы сделать # 2? – premunk

ответ

2

Относительно 1) Проверьте, где я регистрирую сообщения об ошибках ниже. Это более или менее то, что я сделал бы.

Относительно 2) Я не знаю о попытке идти назад в теме. Это очень возможно, просто создавая потребителя снова и снова, со своим стартовым смещением минус каждый раз. Но я бы не советовал это, так как, скорее всего, вы снова и снова будете воспроизводить одно и то же сообщение. Я советую экономить ваше смещение часто, чтобы вы могли восстановиться, если все пойдет на юг.

Ниже приведен код, который, как я считаю, касается большинства ваших вопросов. Я не пробовал компилировать это. И sarama api меняется в последнее время, поэтому api может немного отличаться.

func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan<- *Message) (error) { 
    wg.Add(1) 
    go func(){ 
     defer wg.Done() 
     //to track the last known good offset we processed, which is 
     // updated after each successfully processed event. 
     saveprogress := func(off int64){ 
      //Save the offset somewhere...a file... 
      //Ive also used kafka to store progress 
      //using a special topic as a WAL 
     } 
     defer saveprogress(lastgoodoffset) 

     client, err := sarama.NewClient("clientId", brokers, sarama.NewClientConfig()) 
     if err != nil { 
      log.Error(err) 
      return 
     } 
     defer client.Close() 
     sarama.NewConsumerConfig() 
     consumerConfig.OffsetMethod = sarama.OffsetMethodManual 
     consumerConfig.OffsetValue = int64(lastgoodoff) 
     consumer, err := sarama.NewConsumer(client, topic, partition, "consumerId", consumerConfig) 
     if err != nil { 
      log.Error(err) 
      return 
     } 
     defer consumer.Close() 
     for { 
      select { 
      case event := <-consumer.Events(): 
       if event.Err != nil { 
        log.Error(event.Err) 
        return 
       } 
       msgContent := &Message{} 
       err = json.Unmarshal(message.Value, msgContent) 
       if err != nil { 
        log.Error(err) 
        continue //continue to skip this message or return to stop without updating the offset. 
       } 
       // Send the message on to be processed. 
       out <- msgContent 

       lastgoodoff = event.Offset 
      } 
     } 
    }() 
} 
Смежные вопросы