Я хочу расширить SinkTask
, чтобы создать мой собственный разъем для раковины.Расширьте Kafka Connect SinkTask и начните потреблять с заданных смещений
Если я сохраню смещения во время флеша, и в следующий раз, когда я запустил соединительный разъем, я хотел бы возобновить чтение из моих сохраненных смещений, что было бы правильным способом сделать это?
Я попытался с помощью SinkTaskContext
из переопределенного initialize(SinkTaskContext context)
назначить свои собственные смещения:
@Override
public void initialize(SinkTaskContext context) {
HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
...
context.offset(offsetMap);
}
Но это не работает, потому что разделы еще не назначены. Я получаю исключение.
Должен ли я затем сохранить контекст (от initialize()
) в глобальную переменную, а затем использовать его для назначения смещения к нему внутри метода open(Collection<TopicPartition> partitions)
(переопределен из SinkTask
) таким же образом, я делаю внутри initialize
? например .:
@Override
public void open(Collection<TopicPartition> partitions) {
HashMap<TopicPartition, Long> offsetMapNew = new HashMap<>();
for (TopicPartition tp : partitions) // for each partition assigned
{
Long offset = myOffsetMap.get(tp.topic() + "-" + tp.partition());
if (offset == null) { offset = 0l; } // 0 Long
offsetMapNew.put(tp, offset);
}
mySavedTaskContext.offset(offsetMapNew); // sync offsets ?
}
Спасибо Ewen. Я только хотел поделиться более подробной информацией об этом вопросе, в нашем обсуждении [здесь] (https://groups.google.com/forum/#!topic/confluent-platform/QsgWbXeSGZA) –