Я использую qubole's S3 sink для загрузки данных Avro в S3 в формате Паркет.Kafka Connect S3 раковина выбрасывает IllegalArgumentException при загрузке Avro
В моей Java приложения создать производителя
Properties props = new Properties();
props.put("bootstrap.servers", KafkaHelper.getServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<byte[], byte[]>(props);
Затем конвертировать GenericRecord
в byte[]
формат с:
GenericRecord avroRecord = new GenericData.Record(avroSchema);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);
for (Map.Entry<String, ?> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
avroRecord.put(key, value);
}
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord));
producer.send(record);
я использую следующие значения в моих свойствах Кафка Connect:
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
И следующие параметры конфигурации в m у свойства файла раковины:
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
Когда я запускаю разъем я получаю следующее сообщение об ошибке: "java.lang.IllegalArgumentException: Avro схема должна быть запись.
Я довольно новичок в Kafka Connect, и я знаю, что может быть настроен сервер реестра Schema, но я не понимаю, нужен ли приемник для преобразования данных Avro в Parquet или если это это какая-то проблема форматирования или конфигурации на моем конце. Какой формат данных относится к «записи» в контексте этой ошибки? Любое направление или помощь будут высоко оценены.
Благодарим за объяснение. Я действительно смог разработать решение, используя 'io.confluent.kafka.serializers.KafkaAvroSerializer'. В интересах вопроса вы могли бы подробнее рассказать о своем ответе и предложить решение в дополнение к объяснению проблемы? – kellanburket