2017-02-13 1 views
2

Я использую Scala & потребляющих данные от Кафки, используя ниже Спарк Streaming подхода:Преобразование искровым Кафка InputDStream в массив [Bytes]

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2) 

Над переменных возвращается InputDStream, через которые я могу видеть данные в сыре/бинарный формат, используя приведенный ниже код: println (строка)

Но мне нужно применить формат avro (доступная схема) в формате raw/binary, чтобы видеть данные в ожидаемом формате json. Чтобы применить формат avro, мне нужно преобразовать выше InputDStream в Array [Bytes], который используется avro.

Может кто-нибудь, пожалуйста, дайте мне знать, чтобы преобразовать InputDStream в Array [Bytes]?

Или

Если вы знаете, какой лучший способ применить Avro схему на InputDStream (искры Streaming), пожалуйста, поделитесь.

ответ

2

Две вещи, которые вам нужно сделать. Во-первых, использовать DefaultDecoder для Кафки, который дает вам Array[Byte] для типа значения:

val lines: DStream[(String, Array[Byte])] = 
    KafkaUtils 
    .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics) 

И тогда вам нужно применить десериализации логику Avro через дополнительный map:

lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) } 

Где avroDeserializer является произвольный класс, который знает, как создать свой тип из байтов Avro.

Лично я использую avro4s, чтобы получить десериализацию класса класса с помощью макросов.

+0

Удивительный, большое спасибо! Мне просто нужно значение DStream в качестве массива [Byte], поэтому я получил его, используя: val lines: DStream [(Array [Byte])] = KafkaUtils.createDirectStream [ String, Array [Byte], StringDecoder, DefaultDecoder] ( ssc , kafkaParams, темы) .map (_._ 2) –

Смежные вопросы