Я использую Kafka 0.8.0 и пытаюсь выполнить описанный ниже сценарий.Задержка в сообщениях потребителей в Apache Kafka
JCA API (Действует как производитель и посылают данные) -----> Потребительские ------> HBase
Я посылаю каждое сообщение к потребителю, как только я принести данных с использованием JCA Client. Например, как только производитель отправляет сообщение №1, я хочу получить его от пользователя и «положить» в HBase. Но мой потребитель начинает получать сообщения после некоторых случайных n сообщений. Я хочу синхронизировать производителя и потребителя, чтобы они оба начали работать вместе.
Я использовал:
1 брокер
1 сингл тему
1 одного производителя и потребителя высокий уровень
Может кто-нибудь предложить, что мне нужно сделать сделать для достижения того же?
EDITED:
Добавление некоторых соответствующий фрагмент кода.
Consumer.java
public class Consumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
PrintWriter pw = null;
int t = 0;
StringDecoder kd = new StringDecoder(null);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
Map<String, List<KafkaStream<String, Signal>>> consumerMap;
KafkaStream<String, Signal> stream;
ConsumerIterator<String, Signal> it;
public Consumer(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
topicCountMap.put(topic, new Integer(1));
consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
new VerifiableProperties()));
stream = consumerMap.get(topic).get(0);
it = stream.iterator();
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("fetch.size", "1024");
return new ConsumerConfig(props);
}
synchronized public void run() {
while (it.hasNext()) {
t = (it.next().message()).getChannelid();
System.out.println("In Consumer received msg" + t);
}
}
}
producer.java
public class Producer {
public final kafka.javaapi.producer.Producer<String, Signal> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic)
{
props.put("serializer.class", "org.bigdata.kafka.Serializer");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type userdefined Object .
producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
this.topic = topic;
}
}
KafkaProperties.java
public interface KafkaProperties {
final static String zkConnect = "127.0.0.1:2181";
final static String groupId = "group1";
final static String topic = "test00";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 100000;
final static int reconnectInterval = 10000;
final static String clientId = "SimpleConsumerDemoClient";
}
Это то, как потребитель ведет себя в течение первых 10 сообщений, которые он не отправляет этому сообщению, полученному потребителем, но с 11-го сообщения он начинает работать правильно.
producer sending msg1
producer sending msg2
producer sending msg3
producer sending msg4
producer sending msg5
producer sending msg6
producer sending msg7
producer sending msg8
producer sending msg9
producer sending msg10
producer sending msg11
producer sending msg12
In Consumer received msg12
producer sending msg13
In Consumer received msg13
producer sending msg14
In Consumer received msg14
producer sending msg15
In Consumer received msg15
producer sending msg16
In Consumer received msg16
producer sending msg17
In Consumer received msg17
producer sending msg18
In Consumer received msg18
producer sending msg19
In Consumer received msg19
producer sending msg20
In Consumer received msg20
producer sending msg21
In Consumer received msg21
Редакция: добавление функции слушателя, где производитель отправляют сообщения потребителя. И я использую по умолчанию производителем конфигурационном не перезаписать его
public synchronized void onValueChanged(final MonitorEvent event_) {
// Get the value from the DBR
try {
final DBR dbr = event_.getDBR();
final String[] val = (String[]) dbr.getValue();
producer1.producer.send(new KeyedMessage<String, Signal>
(KafkaProperties.topic,new Signal(messageNo)));
System.out.println("producer sending msg"+messageNo);
messageNo++;
} catch (Exception ex) {
ex.printStackTrace();
}
}
Можете ли вы показать свой производитель и потребительский код/конфигурацию? Похоже, что некоторые из них работают с партиями (что хорошо, на самом деле). – Dmitry
@Dmitry добавил фрагмент кода. – Ankita
Потребитель, кажется, в порядке (кроме свойства fetch.size = 1K - это означает, что потребитель не может получать большие сообщения, но, вероятно, это не проблема, которую мы ищем). Можете ли вы поделиться кодами методов newProducerConfig() и run() для производителя? – Dmitry