2017-01-12 2 views
0

Исключение в потоке "основного" java.nio.channels.ClosedChannelException на kafka.network.BlockingChannel.send (BlockingChannel.scala: 100) в kafka.consumer.SimpleConsumer.liftedTree1 $ 1 (SimpleConsumer.scala: 78) в kafka.consumer.SimpleConsumer.kafka $ $ SimpleConsumer потребитель $$ SendRequest (SimpleConsumer.scala: 68) на kafka.consumer.SimpleConsumer.send (SimpleConsumer.scala: 91) на Кафки. javaapi.consumer.SimpleConsumer.send (SimpleConsumer.scala: 68) at cmb.SparkStream.kafka.kafkaOffsetTool.getTopicOffsets (kafkaOffsetTool.java:47) at cmb.SparkStream.LogClassify.main (LogClassify.java:95) в sun.reflect.NativeMethodAccessorImpl.invoke0 (нативный метод) на sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang .reflect.Method.invoke (Method.java:606) at org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain (SparkSubmit.scala: 729) at org.apache .spark.deploy.SparkSubmit $ .doRunMain $ 1 (SparkSubmit.scala: 185) at org.apache.spark.deploy.SparkSubmit $ .submit (SparkSubmit.scala: 210) at org.apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 124) at org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)получить тему и раздел смещения

мой код:

public static Map<TopicAndPartition, Long> getTopicOffsets(String zkServers, String topic) { 

    Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>(); 
    for (String zkserver : zkServers.split(",")) { 
    SimpleConsumer simpleConsumer = new SimpleConsumer(zkserver.split(":")[0], 
    Integer.valueOf(zkserver.split(":")[1]), Consts.getKafkaConfigBean().getDuration(), 1024, 
    "consumser"); 
    TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic)); 

    TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest); 

    for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) { 
    for (PartitionMetadata part : metadata.partitionsMetadata()) { 
    Broker leader = part.leader(); 
    if (leader != null) { 
     TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId()); 

     PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(
     kafka.api.OffsetRequest.LatestTime(), 10000); 
     OffsetRequest offsetRequest = new OffsetRequest(
     ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), 
     kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()); 
     OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest); 
     if (!offsetResponse.hasError()) { 
     long[] offsets = offsetResponse.offsets(topic, part.partitionId()); 
     retVals.put(topicAndPartition, offsets[0]); 
     } 
    } 

    } 

    } 
    simpleConsumer.close(); 
    } 
    return retVals; 
} 

ответ

1

Я думаю, что вы могли бы быть усложнять вещи. Используйте org.apache.kafka.clients.consumer.KafkaConsumer (потребительское здесь) и сделать что-то похожее на

val partitions = consumer.partitionsFor(topic).map[new TopicPartition(topic,it.partition)] 
    consumer.assign(partitions) 
    consumer.seekToEnd(partitions) 
    val offsets = partitions.map[ it -> consumer.position(it)] 
    println(offsets) 

и вы получите результаты, как

[topicname-8-> 1917258, topicname-2- > 1876810, topicname-5-> 1857012, topicname-4-> 3844, topicname-7-> 4043972, topicname-1-> 1811078, topicname-9-> 12217819, topicname-3-> 3844, topicname-6-> 1430021, topicname-0-> 2808969]

Смежные вопросы