Итак, я пытаюсь использовать Kafka для своего приложения, которое имеет действия по регистрации производителей в Kafka MQ и потребитель, который считывает его с MQ. Поскольку мое приложение находится в Go, я использую Ищите Сараму, чтобы сделать это возможным.Обработка ошибок Kafka с использованием Shopify Sarama
Прямо сейчас, я в состоянии считывать с MQ и распечатать содержимое сообщения с помощью
fmt.Printf
Howeveer, я бы очень хотел, обработку ошибки, чтобы быть лучше, чем консоли печати, и я готов Пройти лишнюю милю.
код прямо сейчас для присоединения потребителя:
mqCfg := sarama.NewConfig()
master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
panic(err) // Don't want to panic when error occurs, instead handle it
}
и обработка сообщений:
go func() {
defer wg.Done()
for message := range consumer.Messages() {
var msgContent Message
_ = json.Unmarshal(message.Value, &msgContent)
fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
}
}()
Мои вопросы (я новичок в тестировании Кафки и новый Кафка в целом):
В каких случаях могут возникать ошибки в вышеуказанной программе, чтобы я мог их обрабатывать? Любой образец кода будет для меня отличным. Условия ошибки, о которых я мог подумать, - это когда msgContent на самом деле не содержит полей типа ContentId в JSON.
В kafka есть ситуации, когда потребитель пытается читать при текущем смещении, но почему-то не смог (даже когда JSON хорошо сформирован)? Возможно ли, чтобы мой потребитель мог отступить, чтобы сказать x шагов выше неудачного смещения, прочитайте и повторно обработайте смещения? Или есть лучший способ сделать это? опять же, какими могут быть эти ситуации?
Я открыт для чтения и пробовать вещи.
json.Unmarshal может вызвать заблуждение, и если вы не хотите, чтобы бросить панику ... Просто не :) – MIkCode
Ха. Благодарю. Любая идея о том, как я мог бы сделать # 2? – premunk