2016-11-30 7 views
0

Я работаю с Kafka и пытаюсь настроить группу потребителей, обращаясь к этому article. Единственное различие заключается в том, что я создал свой собственный абстрактный класс, обработчик для упрощения дизайна.InstanceAlreadyExistsException, исходящий от потребителя kafka

Ниже мой абстрактный класс:

public abstract class Consumer implements Runnable { 
    private final Properties consumerProps; 
    private final String consumerName; 

    public Consumer(String consumerName, Properties consumerProps) { 
    this.consumerName = consumerName; 
    this.consumerProps = consumerProps; 
    } 

    protected abstract void shutdown(); 

    protected abstract void run(String consumerName, Properties consumerProps); 

    @Override 
    public final void run() { 
    run(consumerName, consumerProps); 
    } 
} 

Ниже мой KafkaConsumerA, который проходит над абстрактным классом:

public class KafkaConsumerA extends Consumer { 
    private KafkaConsumer<byte[], DataHolder> consumer; 

    public KafkaConsumerA(String consumerName, Properties consumerProps) { 
    super(consumerName, consumerProps); 
    } 

    @Override 
    public void shutdown() { 
    consumer.wakeup(); 
    } 

    @Override 
    protected void run(String consumerName, Properties consumerProps) { 
    // exception comes from below line from two of the threads and the remaining one thread works fine. 
    consumer = new KafkaConsumer<>(consumerProps); 
    List<String> topics = getTopicsBasisOnConsumerName(consumerName); 
    try { 
     consumer.subscribe(topics); 
     // Setup the schema config 
     Map<String, Object> config = new HashMap<>(); 
     config.put("urls", "https://abc.qa.host.com"); 

     GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config); 
     while (true) { 
     ConsumerRecords<byte[], DataHolder> records = consumer.poll(200); 
     for (ConsumerRecord<byte[], DataHolder> record : records) { 
      Map<String, Object> data = new HashMap<>(); 
      data.put("partition", record.partition()); 
      data.put("offset", record.offset()); 
      data.put("value", record.value()); 
      System.out 
       .println((Thread.currentThread().getId() % 3) + 1 + ": " + decoder.decode(record.value())); 
     } 
     } 
    } catch (WakeupException ex) { 
     ex.printStackTrace(); 
    } catch (Exception ex) { 
     ex.printStackTrace(); 
    } finally { 
     consumer.close(); 
    } 
    } 
} 

и ниже мой Handler класс:

// looks like something is wrong in this class 
public final class ConsumerHandler { 
    private final ExecutorService executorServiceProcess; 
    private final Consumer consumer; 
    private final List<Consumer> consumers = new ArrayList<>(); 

    public ConsumerHandler(Consumer consumer, int poolSize) { 
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize); 
    this.consumer = consumer; 
    for (int i = 0; i < poolSize; i++) { 
     consumers.add(consumer); 
     executorServiceProcess.submit(consumer); 
    } 
    } 

    public void shutdown() { 
    Runtime.getRuntime().addShutdownHook(new Thread() { 
     @Override 
     public void run() { 
     for (Consumer consumer : consumers) { 
      consumer.shutdown(); 
     } 
     executorServiceProcess.shutdown(); 
     try { 
      executorServiceProcess.awaitTermination(1000, TimeUnit.MILLISECONDS); 
     } catch (InterruptedException ex) { 
      Thread.currentThread().interrupt(); 
     } 
     } 
    }); 
    } 
} 

И здесь Я начинаю всех своих потребителей в группе потребителей из основного класса:

public static void main(String[] args) { 
    ConsumerHandler handlerA = 
     new ConsumerHandler(new KafkaConsumerA("KafkaConsumerA", getConsumerProps()), 3); 
    // run KafkaConsumerB here 

    handlerA.shutdown(); 
    // shutdown KafkaConsumerB here 
    } 

Так с этим - мой план установки потребитель группа с тремя потребителями в KafkaConsumerA и всех трех подписался на те же темы.

Ошибка: -

Всякий раз, когда я запускаю это, похоже, только один потребитель в группе потребителей работ и другие два не работает. И я вижу это исключение на консоли из этих двух:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=a97716e0-0e05-4938-8fa1-6b872cf24e34 
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_79] 
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_79] 
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_79] 
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_79] 
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_79] 
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_79] 
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.10.0.0-SASL.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.0-SASL.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.0-SASL.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.0-SASL.jar:na] 

Что не так, что я здесь делаю? getConsumerProps() метод возвращает свойства объекта, который имеет client.id и group.id в нем с одинаковым значением для всех трех потребителей в этой группе потребителей.

Ниже мои детали конструкции:

  • Мои KafkaConsumerA будет иметь три потребителей в потребительской группе, и каждый потребитель будет работать на topicA.
  • My KafkaConsumerB (аналогично KafkaConsumerA) будет иметь двух потребителей в другой группе потребителей, и каждый из этих потребителей будет работать на topicB.

И эти два потребителя KafkaConsumerA и KafkaConsumerB будут работать на одной коробке с другой группой потребителей независимо друг от друга.

ответ

3

Kafka пытается зарегистрировать MBeans для мониторинга приложений и использует client.id для этого. Как вы сказали, у вас есть свойства, введенные в ваш абстрактный класс, и вводите для каждого потребителя те же client.id и group.id в группе A. Однако у вас разные клиенты, поэтому вы должны дать им свой собственный client.id, но сохраните то же самое group.id. Это позволит зарегистрировать разных клиентов/потребителей в одной и той же группе потребителей и заставить их работать вместе, но не столкнуться с регистрацией MBeans.

+0

Я вижу. Спасибо за информацию. Я попробую. Также вы думаете, что мой дизайн и логика правильны, что я пытаюсь сделать. Я упомянул об этом в конце моего вопроса. Или вы видите какие-либо улучшения в дизайне, которые можно сделать? – user1950349

+0

Это совершенно разумная настройка, но я, конечно, не знаю ваш домен. –

+1

Моя идея работает с несколькими потребителями кафки на одной JVM. И каждый потребитель кафки будет иметь несколько потоков. Например: «KafkaConsumerA» будет иметь трех потребителей в группе потребителей, работающих над «topicA». 'KafkaConsumerB' будет иметь 2 потребителя в другой группе потребителей, работающей над' topicB'. И оба этих 'KafkaConsumerA' и' KafkaConsumerB' будут работать на одной JVM. – user1950349

Смежные вопросы