2017-01-23 4 views
0

Я использую qubole's S3 sink для загрузки данных Avro в S3 в формате Паркет.Kafka Connect S3 раковина выбрасывает IllegalArgumentException при загрузке Avro

В моей Java приложения создать производителя

Properties props = new Properties(); 
props.put("bootstrap.servers", KafkaHelper.getServers()); 
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
return new KafkaProducer<byte[], byte[]>(props); 

Затем конвертировать GenericRecord в byte[] формат с:

GenericRecord avroRecord = new GenericData.Record(avroSchema); 
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema); 

for (Map.Entry<String, ?> entry : map.entrySet()) { 
    String key = entry.getKey(); 
    Object value = entry.getValue(); 
    avroRecord.put(key, value); 
} 

ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord)); 
producer.send(record); 

я использую следующие значения в моих свойствах Кафка Connect:

key.converter=com.qubole.streamx.ByteArrayConverter 
value.converter=com.qubole.streamx.ByteArrayConverter 
internal.key.converter=org.apache.kafka.connect.json.JsonConverter 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter 
internal.key.converter.schemas.enable=false 
internal.value.converter.schemas.enable=false 

И следующие параметры конфигурации в m у свойства файла раковины:

connector.class=com.qubole.streamx.s3.S3SinkConnector 
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat 

Когда я запускаю разъем я получаю следующее сообщение об ошибке: "java.lang.IllegalArgumentException: Avro схема должна быть запись.

Я довольно новичок в Kafka Connect, и я знаю, что может быть настроен сервер реестра Schema, но я не понимаю, нужен ли приемник для преобразования данных Avro в Parquet или если это это какая-то проблема форматирования или конфигурации на моем конце. Какой формат данных относится к «записи» в контексте этой ошибки? Любое направление или помощь будут высоко оценены.

ответ

4

ByteArrayConverter не собирается делать перевод данных: вместо фактического выполнения сериализации/десериализации он предполагает, что соединитель знает, как обрабатывать необработанные данные byte[]. Однако ParquetFormat (и фактически большинство форматов) не могут обрабатывать только необработанные данные. Вместо этого они ожидают, что данные будут десериализованы и структурированы как запись (которую вы можете рассматривать как структуру C, POJO и т. Д.).

Обратите внимание, что поток qubolex README отмечает, что ByteArrayConverter полезен в случаях, когда вы можете безопасно скопировать данные напрямую. Например, если у вас есть данные как JSON или CSV. Они не требуют десериализации, поскольку байты для каждого значения записи Kafka могут быть просто скопированы в выходной файл. Это хорошая оптимизация в этих случаях, но не в целом применима ко всем форматам выходных файлов.

+0

Благодарим за объяснение. Я действительно смог разработать решение, используя 'io.confluent.kafka.serializers.KafkaAvroSerializer'. В интересах вопроса вы могли бы подробнее рассказать о своем ответе и предложить решение в дополнение к объяснению проблемы? – kellanburket

Смежные вопросы