2015-07-14 4 views
3

Я загрузил Kafka с https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz и создал кластер kafka на своей машине с помощью виртуальных машин. Кластер работает нормально - он был протестирован с использованием консольного производителя и потребителя с пакетом kafka.Компиляция произвольного производителя для Kafka

Теперь я реализовал пользовательский класс Producer для Kafka. Но я не мог понять, как скомпилировать этот класс и каковы зависимости.

Вопросы

  • Может кто-нибудь объяснить, как мне нужно идти о выборке зависимостей для продюсера, создание класса и запустить его?

  • Нужен ли мне sbt для его сборки? Я не мог найти ни одного онлайн-ресурса, в котором четко объяснялось, как идти о создании пользовательского класса производителей кафки.

Ниже перечислены пакеты, импортированные в классе Производитель:

org.apache.kafka.clients.producer.Callback; 
org.apache.kafka.clients.producer.KafkaProducer; 
org.apache.kafka.clients.producer.ProducerConfig; 
org.apache.kafka.clients.producer.ProducerRecord; 
org.apache.kafka.clients.producer.RecordMetadata; 
org.apache.kafka.common.record.Records 

Заранее спасибо

ответ

1

я разработал собственный продюсер Кафка, как проект Maven, зависимость я использовал:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.0</version> 
</dependency> 

Импорт Я использовал:

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.serialization.ByteArraySerializer; 
import org.apache.kafka.common.serialization.StringSerializer; 

Фрагмент моего продюсера отправки сообщения код:

Properties props = new Properties(); 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, zkConnection); 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); 

byte[] byteData = null; 
File myInputFile = new File(...); 
try (InputStream inputStream = new FileInputStream(myInputFile)) { 
    byteData = IOUtils.toByteArray(inputStream); 
} 

try (KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props)) { 
    producer.send(new ProducerRecord<String, byte[]>(topic, byteData)); 
} 
Смежные вопросы