2016-10-09 3 views
3

Я следующий сценарий:Весна Облако поток Кафка> потребляя сообщения Avro от Confluent REST Proxy

  • Производителя отправляет Avro закодированы сообщения в Кафки тему с помощью Confluent отдохнет Proxy (который регистрирует схему на реестре схемы Confluent), сколько описанный в http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce-and-consume-avro-messages
  • Spring Облако поток позволило сообщение прослушивает темы для новых сообщений

Мое приложение выглядит следующим образом:

@SpringBootApplication 
@EnableBinding(Sink.class) 
public class MyApplication { 
    private static Logger log = LoggerFactory.getLogger(MyApplication.class); 

    public static void main(String[] args) { 
    SpringApplication.run(MyApplication.class, args); 
    } 

    @StreamListener(Sink.INPUT) 
    public void myMessageSink(MyMessage message) { 
    log.info("Received new message: {}", message); 
    } 
} 

Принимая во внимание, что MyMessage - это класс, созданный Avro из схемы Avro.

Мои application.properties выглядит следующим образом:

spring.cloud.stream.bindings.input.destination=myTopic 
spring.cloud.stream.bindings.input.group=${spring.application.name} 
spring.cloud.stream.bindings.input.contentType=application/*+avro 

Моя проблема теперь, что каждый раз, когда принимается новое сообщение, следующее исключение:

org.springframework.messaging.MessagingException: Exception thrown while invoking MyApplication#myMessageSink[1 args]; nested exception is org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27 
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:316) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
    ... 
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27 
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1] 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.8.1.jar:1.8.1] 
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertFromInternal(AbstractAvroMessageConverter.java:91) ~[spring-cloud-stream-schema-1.1.0.RELEASE.jar:1.1.0.RELEASE] 
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:67) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:117) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE] 
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:307) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE] 
    ... 35 common frames omitted 

Из того, что я понимаю, проблема заключается в том, что стек Confluent включает идентификатор схемы сообщения как часть полезной нагрузки сообщения, и ожидается, что клиенты начнут считывать фактическое сообщение Avro после идентификатора схемы. Кажется, мне нужно настроить привязку Kafka для использования KafkaAvroDeserializer Confluent, но я не могу понять, как этого добиться.

(я могу прекрасно получить сообщения с помощью консоли Avro потребителя Confluent так, чтобы он не кажется, что проблема с Avro кодирования)

Я также пытался играть вокруг с @EnableSchemaRegistry аннотацию и настройка ConfluentSchemaRegistryClient bean, но мне кажется, что это только контролирует, где хранятся/извлекаются схемы, но не фактическая десериализация.

Возможно, это работает как-то?

ответ

0

Работает ли это при установке per-binding объекта spring.cloud.stream.kafka.bindings.input.consumer.configuration.value.deserializer, чтобы иметь Confluent's KafkaAvroDeserializer class name?

+1

Похоже, что конечная точка 'KafkaMessageChannelBinder' всегда использует' ByteArrayDeserializer' для сериализаторов ключей/значений. Это может быть результатом производителя 'KafkaMessageChannelBinder', использующего ByteArraySerializer по умолчанию. В случае производителей, не связанных с Spring Cloud Stream, потребитель должен иметь возможность переопределить требуемый десериализатор, используя вышеуказанное свойство. –

+0

К сожалению, нет, я уже пробовал это. Из того, что я увидел в [исходном коде] (https: // github.ком/весна облако/весна-облако поток связующее вещество-Кафка/блоб/ведущий/пружинно-облако поток связующего вещество-Кафка/SRC/Основной/Java/орг/springframework/облако/поток/связующий/Kafka/KafkaMessageChannelBinder. java # L254) десериализатор жестко запрограммирован как ByteArrayDeserializer по умолчанию. – neptoon

+1

Да, пожалуйста, создайте проблему здесь: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues. Мы можем отследить его оттуда. Благодаря! –

0

Вид ответа на мой собственный вопрос. То, что я сделал сейчас, - это реализовать MessageConverter, который просто удаляет первые 4 байта любого сообщения, прежде чем передавать их в декодер Avro. Код в основном взят из AbstractAvroMessageConverter весенне-облачного ручья:

public class ConfluentAvroSchemaMessageConverter extends AvroSchemaMessageConverter { 

public ConfluentAvroSchemaMessageConverter() { 
    super(new MimeType("application", "avro+confluent")); 
} 

@Override 
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) { 
    Object result = null; 
    try { 
     byte[] payload = (byte[]) message.getPayload(); 

     // byte array to contain the message without the confluent header (first 4 bytes) 
     byte[] payloadWithoutConfluentHeader = new byte[payload.length - 4]; 
     ByteBuffer buf = ByteBuffer.wrap(payload); 
     MimeType mimeType = getContentTypeResolver().resolve(message.getHeaders()); 
     if (mimeType == null) { 
      if (conversionHint instanceof MimeType) { 
       mimeType = (MimeType) conversionHint; 
      } 
      else { 
       return null; 
      } 
     } 

     // read first 4 bytes and copy the rest to the new byte array 
     // see https://groups.google.com/forum/#!topic/confluent-platform/rI1WNPp8DJU 
     buf.getInt(); 
     buf.get(payloadWithoutConfluentHeader); 

     Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType); 
     Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass); 
     DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema); 
     Decoder decoder = DecoderFactory.get().binaryDecoder(payloadWithoutConfluentHeader, null); 
     result = reader.read(null, decoder); 
    } 
    catch (IOException e) { 
      throw new MessageConversionException(message, "Failed to read payload", e); 
    } 
    return result; 

} 

Я тогда установить тип содержимого для входящего Кафки темы для приложения/AVRO + конфлюэнтностей через application.properties.

Это по крайней мере позволяет мне получать сообщения, закодированные в стеке Confluent, но, конечно же, он никак не взаимодействует с реестром схемы.