2017-01-18 1 views
1

Я могу сохранить данные Кафки в виде строки, как это:Как сэкономить класс класса Akka Stream для Kafka напрямую?

val producerSettings = ProducerSettings(actorSystem, new StringSerializer, new StringSerializer) 
    .withBootstrapServers("localhost:9092") 


def kafkaSink(source: Source[ModbusMessage, NotUsed]) = source.map(a => s"Id:${a.sensorId}:Values:${a.values.mkString(",")}").map { m => 
    new ProducerRecord[String, String]("sampleData", m) 
    }. 
    runWith(Producer.plainSink(producerSettings)) 

, но есть способ, чтобы сохранить мой случай класс непосредственно в Кафка. Например, если я хочу сохранить свои данные в форме класса My case ModbusMessage.

Если кто-то может предоставить мне быстрый пример, который был бы замечательным!

Спасибо. Помощь приветствуется!

+0

Вы говорите о сериализации? Если да, хотите ли вы текстовое представление (например, JSON) или двоичное представление? –

+0

Какая бы ни была лучше для производительности! Нужно ли преобразовать его в Json, используя библиотеку, например liftJson, а затем сохранить ее как строку внутри Kafka? –

+0

Это вообще широко. Сначала я хотел бы использовать упрощенный подход JSON. –

ответ

2

Модель сообщения данных Kafka, состоящая из ключа сообщения и значения сообщения, основана на необработанных байтах (byte[]) для ключей и значений. Поэтому вам необходимо предоставить сериализатор, который преобразует ваш класс case в byte[]. В приведенном выше примере вы настроили использование StringSerializer для обоих ключей и значений, а сериализатор конвертирует String -> byte[].

но есть ли способ сохранить класс моего дела непосредственно в Кафке. Например, если я хочу сохранить свои данные в форме класса My case ModbusMessage.

Если у вас есть случай класса ModbusMessage, то вам нужно реализовать + настроить ModbusMessage -> byte[] сериалайзер. Вы можете реализовать такой сериализатор самостоятельно, но, поскольку другие комментируют ваш вопрос, вы также можете выбрать рамки для сериализации, такие как Avro или Protobuf. Вы также можете посмотреть, например, https://github.com/scala/pickling.

Нужно ли преобразовать его в Json, используя библиотеку, например liftJson, а затем сохранить ее как строку внутри Kafka?

Нет, вам не нужно. Вы можете - и, скорее всего, также должны - конвертировать непосредственно из вашего класса case ModbusMessage в byte[] (и, для десериализации, в обратном направлении). Нет смысла конвертировать ваш класс case сначала в JSON, потому что представление JSON тоже должно быть сериализовано до byte[], чтобы его можно было отправить в Kafka (так что вы понесли бы в два раза больше стоимости перевода здесь).

+0

Если этот ответ помог вам @ShivanshSrivastava, вы должны отметить его как принятый. –

Смежные вопросы