2013-09-26 6 views
5

Я использую бета-версию Kafka 0.8, и я просто пытаюсь запутаться с отправкой разных объектов, сериализации их с помощью собственного кодера и отправки их в существующую конфигурацию брокера. Пока я пытаюсь работать с DefaultEncoder.Apache Kafka Default Encoder Not Working

У меня есть брокер и все, что настроено и работает для StringEncoder, но я не могу получить какой-либо другой тип данных, включая только чистый байт [], который будет отправлен и получен брокером.

Мой код производитель:

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

import java.util.Date; 
import java.util.Properties; 
import java.util.Random; 


public class ProducerTest { 
    public static void main(String[] args) { 
     long events = 5; 
     Random rnd = new Random(); 
     rnd.setSeed(new Date().getTime()); 
     Properties props = new Properties(); 
     props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094"); 
     props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); 
     props.setProperty("partitioner.class", "example.producer.SimplePartitioner"); 
     props.setProperty("request.required.acks", "1"); 
     props.setProperty("producer.type", "async"); 
     props.setProperty("batch.num.messages", "4"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config); 
     for (long nEvents = 0; nEvents < events; nEvents++) { 
      byte[] a = "Hello".getBytes(); 
      byte[] b = "There".getBytes(); 

      KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b); 
      producer.send(data); 
     } 
     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     producer.close(); 
    } 
} 

Я использовал тот же SimplePartitioner, как в примере, приведенном here, и заменяя все байтовые массивы с помощью строк и изменения сериалайзер к kafka.serializer.StringEncoder отлично работает ,

Для справки, SimplePartitioner:

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner<String> { 
    public SimplePartitioner (VerifiableProperties props) { 

    } 

    public int partition(String key, int a_numPartitions) { 
     int partition = 0; 
     int offset = key.lastIndexOf('.'); 
     if (offset > 0) { 
      partition = Integer.parseInt(key.substring(offset+1)) % a_numPartitions; 
     } 
     return partition; 
    } 

} 

Что я делаю неправильно?

ответ

6

Ответ заключается в том, что класс секционирования SimplePartitioner применим только для строк. Когда я пытаюсь запустить Producer асинхронно, он создает отдельный поток, который обрабатывает кодировку и разделение перед отправкой брокеру. Этот поток попадает в блокпост, когда он понимает, что SimplePartitioner работает только для строк, но поскольку это отдельный поток, никаких исключений не выбрасывается, и поэтому поток просто выходит без каких-либо указаний на проступки.

Если мы изменим SimplePartitioner принять байт [], например:

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner<byte[]> { 
    public SimplePartitioner (VerifiableProperties props) { 

    } 

    public int partition(byte[] key, int a_numPartitions) { 
     int partition = 0; 
     return partition; 
    } 

} 

Это отлично работает в настоящее время.

+0

Возможно, вы должны просто придерживаться значения по умолчанию kafka.producer.DefaultPartitioner для вашего свойства partitioner.class вместо hardcoding для определенного возвращаемого значения для разделителя. – gazarsgo

+0

Это было предназначено как тест-драйв. Но вот сценарий, в котором разделитель по умолчанию не работал: предположим, что вы хотите, чтобы определенная подпоследовательность сообщений потреблялась строго в том порядке, в котором они создаются. Это будет неудачно, если вы используете разделитель по умолчанию, потому что по умолчанию будет использоваться хэш ключа, который непредсказуем. Вместо этого, если вы пишете свой собственный пользовательский разделитель, и есть какой-то способ обнаружить подпоследовательность, мы можем назначить их одному и тому же разделу. Этот конкретный случай использования произошел в моей заявке. –