2016-10-10 6 views
0

Я пытаюсь использовать ниже API для Spark streaming с Kafka. Я должен передавать данные, связанные с сериализацией avro с помощью искры, данные находятся в Кафке.Что такое "Класс <R> recordClass" в JavaInputDStream?

static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> 
    JavaInputDStream<R> createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler) 
    :: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver. 

Могу ли я знать Что мне нужно для параметра Class recordClass в API? Я использовал API, как показано ниже, но он дает ошибку компиляции.

Все, что я хочу, - это получить данные о потоках в искровом потоке от кафки.

JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class, 
     StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap, 
     (Function<MessageAndMetadata<String, String>, String>) MessageAndMetadata::message); 

Exception in thread "main" java.lang.Error: Unresolved compilation problems: The method createDirectStream(JavaStreamingContext, Class, Class, Class, Class, Class, Map, Map, Function,R>) in the type KafkaUtils is not applicable for the arguments (JavaStreamingContext, Class, Class, Class, Class, Class, Map, Map, Function,String>)

ответ

4

Попробуйте это.

JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, byte[].class, 
       StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, fromOffset, 
       (Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message); 

Here статья для Кафки, Avro и Спарк.