Я использую бета-версию Kafka 0.8, и я просто пытаюсь запутаться с отправкой разных объектов, сериализации их с помощью собственного кодера и отправки их в существующую конфигурацию брокера. Пока я пытаюсь работать с DefaultEncoder.Apache Kafka Default Encoder Not Working
У меня есть брокер и все, что настроено и работает для StringEncoder, но я не могу получить какой-либо другой тип данных, включая только чистый байт [], который будет отправлен и получен брокером.
Мой код производитель:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
public class ProducerTest {
public static void main(String[] args) {
long events = 5;
Random rnd = new Random();
rnd.setSeed(new Date().getTime());
Properties props = new Properties();
props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094");
props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
props.setProperty("partitioner.class", "example.producer.SimplePartitioner");
props.setProperty("request.required.acks", "1");
props.setProperty("producer.type", "async");
props.setProperty("batch.num.messages", "4");
ProducerConfig config = new ProducerConfig(props);
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
byte[] a = "Hello".getBytes();
byte[] b = "There".getBytes();
KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b);
producer.send(data);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.close();
}
}
Я использовал тот же SimplePartitioner, как в примере, приведенном here, и заменяя все байтовые массивы с помощью строк и изменения сериалайзер к kafka.serializer.StringEncoder отлично работает ,
Для справки, SimplePartitioner:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner<String> {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(String key, int a_numPartitions) {
int partition = 0;
int offset = key.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(key.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
Что я делаю неправильно?
Возможно, вы должны просто придерживаться значения по умолчанию kafka.producer.DefaultPartitioner для вашего свойства partitioner.class вместо hardcoding для определенного возвращаемого значения для разделителя. – gazarsgo
Это было предназначено как тест-драйв. Но вот сценарий, в котором разделитель по умолчанию не работал: предположим, что вы хотите, чтобы определенная подпоследовательность сообщений потреблялась строго в том порядке, в котором они создаются. Это будет неудачно, если вы используете разделитель по умолчанию, потому что по умолчанию будет использоваться хэш ключа, который непредсказуем. Вместо этого, если вы пишете свой собственный пользовательский разделитель, и есть какой-то способ обнаружить подпоследовательность, мы можем назначить их одному и тому же разделу. Этот конкретный случай использования произошел в моей заявке. –