2016-11-08 5 views
2

Я использую Kafka для обработки событий журнала. У меня есть базовые знания о потоках Kafka Connect и Kafka для простых коннекторов и потоковых преобразований.Kafka объединяет отдельные строки журнала событий в комбинированное событие журнала

Теперь у меня есть файл журнала со следующей структурой:

timestamp event_id event 

Событие журнала имеет несколько строк журналов, которые соединены с помощью event_id (например, почтовый лог)

Пример:

1234 1 START 
1235 1 INFO1 
1236 1 INFO2 
1237 1 END 

И вообще есть несколько событий:

Examp le:

1234 1 START 
1234 2 START 
1235 1 INFO1 
1236 1 INFO2 
1236 2 INFO3 
1237 1 END 
1237 2 END 

Временное окно (между START и END) может составлять до 5 минут.

В результате я хочу тема как

event_id combined_log 

Пример:

1 START,INFO1,INFO2,END 
2 START,INFO2,END 

Каковы правильные инструменты для достижения этой цели? Я попытался решить это с помощью потоков Kafka, но я могу понять, как это сделать.

ответ

2

В вашем случае использования вы, по сути, восстанавливаете сеансы или транзакции, основанные на полезной нагрузке сообщения. На данный момент нет встроенной, готовой к использованию поддержки для таких функций. Однако вы можете использовать часть API-интерфейса процессора API-интерфейсов Kafka's Streams для реализации этой функции самостоятельно. Вы можете написать пользовательские процессоры, которые используют хранилище состояний, чтобы отслеживать, когда для данного ключа сеанс/транзакция начинается, добавляется и ENDed.

Некоторые пользователи в списках рассылки делают это IIRC, хотя я не знаю о существующем примере кода, на который я мог бы указать вам.

Что нужно, чтобы следить за тем, чтобы правильно обрабатывать данные не по порядку. В вашем примере выше вы перечислили все входные данные в правильном порядке:

1234 1 START 
1234 2 START 
1235 1 INFO1 
1236 1 INFO2 
1236 2 INFO3 
1237 1 END 
1237 2 END 

На практике, однако, сообщения/записи могут поступать испорченный, как это (я только показывать сообщения с ключом 1 для упрощения примера) :

1234 1 START 
1237 1 END 
1236 1 INFO2 
1235 1 INFO1 

Даже если это произойдет, я понимаю, что в вашем случае использования вы все еще хотите, чтобы интерпретировать эти данные как: START -> INFO1 -> INFO2 -> END, а не START -> END (пренебрегая/сбросив INFO1 и INFO2 = потери данных) или START -> END -> INFO2 -> INFO1 (неправильный порядок, вероятно, также нарушает ваши семантические ограничения).

+0

Спасибо за ответ. Я рассмотрю API-интерфейс Процессора. Да и проблема заказа должна быть рассмотрена. – imehl

+1

Процессор API - это решение - еще раз спасибо! – imehl

+0

@imehl: Возможно, вы хотите обновить свой вопрос выше с некоторой информацией о том, что вы на самом деле делали, чтобы решить вашу проблему, теперь, когда вы нашли решение? –

Смежные вопросы