2016-07-25 4 views
0

Я попытался запустить этот код, но он не работает из-за того, что производитель.send() не принимает тип KeyedMessage.производитель.send не принимает тип KeyedMessage

Я попытался импортировать kafka.javaapi.producer.Producer вместо kafka.producer.Producer; но до сих пор не работает

Код:

package sources; 
import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.Properties; 

//import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
import kafka.javaapi.producer.Producer; 
//import kafka.producer.Producer; 


public class ProducerCode { 


private static Producer<Integer, String> producer; 
private static final String topic= "mytopic"; 

public void initialize() { 
    Properties producerProps = new Properties(); 
    producerProps.put("metadata.broker.list", "localhost:9092"); 
    producerProps.put("serializer.class", "kafka.serializer.StringEncoder"); 
    producerProps.put("request.required.acks", "1"); 
    // ProducerConfig producerConfig = new ProducerConfig(producerProps); 
    // have a change here ** 
    producer = new Producer<Integer, String>(new ProducerConfig(producerProps)); 

}

public void publishMesssage() throws Exception{    
    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));    
while (true){ 
    System.out.print("Enter message to send to kafka broker (Press 'Y' to close producer): "); 
    String msg = null; 
    msg = reader.readLine(); // Read message from console 
    //Define topic name and message 
    KeyedMessage<Integer, String> keyedMsg = new KeyedMessage<Integer, String>(topic, msg); 

    producer.send(keyedMsg); 
    // producer.send(keyedMsg); // This publishes message on given topic 

    if("Y".equals(msg)){ break; } 
    System.out.println("--> Message [" + msg + "] sent.Check message on Consumer's program console"); 
} 
return; 
} 




public static void main(String[] args) throws Exception { 

    KafkaProducer kafkaProducer = new KafkaProducer(); 
    // Initialize producer 
    kafkaProducer.initialize();    
    // Publish message 
    kafkaProducer.publishMesssage(); 
    //Close the producer 
    producer.close(); 

} 

} 
+0

Какую версию Kafka вы используете? –

+0

Я использую версию 10 (kafka-0.10.0.0) – Shaimaa

ответ