2016-07-17 2 views
1

Я хочу расширить SinkTask, чтобы создать мой собственный разъем для раковины.Расширьте Kafka Connect SinkTask и начните потреблять с заданных смещений

Если я сохраню смещения во время флеша, и в следующий раз, когда я запустил соединительный разъем, я хотел бы возобновить чтение из моих сохраненных смещений, что было бы правильным способом сделать это?

Я попытался с помощью SinkTaskContext из переопределенного initialize(SinkTaskContext context) назначить свои собственные смещения:

@Override 
public void initialize(SinkTaskContext context) { 
    HashMap<TopicPartition, Long> offsetMap = new HashMap<>(); 
    ... 
    context.offset(offsetMap); 
} 

Но это не работает, потому что разделы еще не назначены. Я получаю исключение.

Должен ли я затем сохранить контекст (от initialize()) в глобальную переменную, а затем использовать его для назначения смещения к нему внутри метода open(Collection<TopicPartition> partitions) (переопределен из SinkTask) таким же образом, я делаю внутри initialize? например .:

@Override 
public void open(Collection<TopicPartition> partitions) { 
    HashMap<TopicPartition, Long> offsetMapNew = new HashMap<>(); 
    for (TopicPartition tp : partitions) // for each partition assigned 
    { 
    Long offset = myOffsetMap.get(tp.topic() + "-" + tp.partition()); 
    if (offset == null) { offset = 0l; } // 0 Long 
    offsetMapNew.put(tp, offset); 
    } 
    mySavedTaskContext.offset(offsetMapNew); // sync offsets ? 
} 

ответ

0

Переустановка смещение во время open()должен быть правильным подходом, но из-за bug который до сих пор не решен, он в настоящее время не будет обработан должным образом.

Обходным решением на данный момент является обращение к смещению смещений в put(). Это может быть немного противоречивым, но поскольку вы управляете своими смещениями, вы можете фактически игнорировать данные, если хотите. Когда вы получаете первый вызов put(), вы можете обработать загрузку смещений и сбросить их. Все последующие данные будут из смещений, которые вы указали при сбросе. Таким образом, HDFS connector в настоящее время реализует его ровно за один раз. (Это хороший пример того, как вы можете получить ровно один раз, но относительно сложный код, к сожалению.) Фактически, поскольку разъем HDFS был тем, что управляло функциональностью управления смещением в Kafka Connect, тот факт, что он не выполняет сброс по ребалансировке именно то, как это было упущено в реализации.

+0

Спасибо Ewen. Я только хотел поделиться более подробной информацией об этом вопросе, в нашем обсуждении [здесь] (https://groups.google.com/forum/#!topic/confluent-platform/QsgWbXeSGZA) –

Смежные вопросы