2017-02-15 5 views
1

У меня возникла ситуация, когда искра может течь и получать сообщения только из одного раздела темы 2-patition kafka.Spark Structured Stream получает сообщения только из одного раздела Kafka

Мои темы: C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test4

Кафка Производитель:

public class KafkaFileProducer { 

// kafka producer 
Producer<String, String> producer; 

public KafkaFileProducer() { 

    // configs 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    //props.put("group.id", "testgroup"); 
    props.put("batch.size", "16384"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("linger.ms", "0"); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("block.on.buffer.full", "true"); 

    // instantiate a producer 
    producer = new KafkaProducer<String, String>(props); 
} 

/** 
* @param filePath 
*/ 
public void sendFile(String filePath) { 
    FileInputStream fis; 
    BufferedReader br = null; 

    try { 
     fis = new FileInputStream(filePath); 

     //Construct BufferedReader from InputStreamReader 
     br = new BufferedReader(new InputStreamReader(fis)); 

     int count = 0; 

     String line = null; 
     while ((line = br.readLine()) != null) { 
      count ++; 
      // dont send the header 
      if (count > 1) { 
       producer.send(new ProducerRecord<String, String>("test4", count + "", line)); 
       Thread.sleep(10); 
      } 
     } 

     System.out.println("Sent " + count + " lines of data"); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    }finally{ 
     try { 
      br.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 

     producer.close(); 
    } 
} 

}

Спарк Структурированные поток:

System.setProperty("hadoop.home.dir", "C:\\bigdata\\winutils"); 

    final SparkSession sparkSession = SparkSession.builder().appName("Spark Data Processing").master("local[2]").getOrCreate(); 

    // create kafka stream to get the lines 
    Dataset<Tuple2<String, String>> stream = sparkSession 
      .readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", "localhost:9092") 
      .option("subscribe", "test4") 
      .option("startingOffsets", "{\"test4\":{\"0\":-1,\"1\":-1}}") 
      .option("failOnDataLoss", "false") 
      .load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as(Encoders.tuple(Encoders.STRING(), Encoders.STRING())); 

    Dataset<String> lines = stream.map((MapFunction<Tuple2<String, String>, String>) (Tuple2<String, String> tuple) -> tuple._2, Encoders.STRING()); 
    Dataset<Row> result = lines.groupBy().count(); 
    // Start running the query that prints the running counts to the console 
    StreamingQuery query = result//.orderBy("callTimeBin") 
      .writeStream() 
      .outputMode("complete") 
      .format("console") 
      .start(); 


    // wait for the query to finish 
    try { 
     query.awaitTermination(); 
    } catch (StreamingQueryException e) { 
     e.printStackTrace(); 
    } 

Когда я бегу производитель отправить 100 строк в файла, запрос возвращает только 51 строку. Я читаю журналы отладки искр и заметил что-то в следующем:

17/02/15 10:52:49 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map(watermark -> 1970-01-01T00:00:00.000Z)) 
17/02/15 10:52:49 DEBUG StreamExecution: Starting Trigger Calculation 
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-1 
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-0 
17/02/15 10:52:49 DEBUG KafkaSource: Partitions assigned to consumer: [test4-1, test4-0]. Seeking to the end. 
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-1 
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-0 
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to latest offset. 
17/02/15 10:52:49 DEBUG Fetcher: **Fetched {timestamp=-1, offset=49} for partition test4-1 
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to earliest offset. 
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=0} for partition test4-1** 
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-0 to latest offset. 
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=51} for partition test4-0 
17/02/15 10:52:49 DEBUG KafkaSource: Got latest offsets for partition : Map(test4-1 -> 0, test4-0 -> 51) 
17/02/15 10:52:49 DEBUG KafkaSource: GetOffset: ArrayBuffer((test4-0,51), (test4-1,0)) 
17/02/15 10:52:49 DEBUG StreamExecution: getOffset took 0 ms 
17/02/15 10:52:49 DEBUG StreamExecution: triggerExecution took 0 ms 

Я не знаю, почему test4-1 всегда сбрасывается на ealiest смещение.

Если кто-то знает, как получить все сообщения со всех разделов, я был бы очень признателен. Спасибо,

ответ

4

Существует известная проблема Кафка в 0.10.1 * клиент:. https://issues.apache.org/jira/browse/KAFKA-4547

Прямо сейчас вы можете использовать 0.10.0.1 клиента в качестве обходного пути. Он может разговаривать с кластером Kafka 0.10.1. *.

Для получения более подробной информации см. https://issues.apache.org/jira/browse/SPARK-18779.

+0

Спасибо, сейчас он работает с использованием клиента 0.10.0.1. – taniGroup

+0

Я встретил ту же проблему и решил ее в соответствии с вашим ответом, спасибо. – Mekal