2016-03-16 1 views
3

Я попытался перенести некоторые простые задачи до версии Flink 1.0.0, но она не со следующим исключением:Apache Flink 1.0.0. Проблемы, связанные с временной миграции событий

java.lang.RuntimeException: Запись имеет Long.MIN_VALUE метку времени (= нет отметка времени). Является ли характеристика времени для «ProcessingTime», или вы забыли назвать «DataStream.assignTimestampsAndWatermarks (...)»?

Код состоит из двух разделенных задач, связанных с темой Kafka, где одна задача - простой генератор сообщений, а другая задача - простой потребитель сообщений, который использует timeWindowAll для вычисления скорости поступления сообщений.

Опять же, аналогичный код работал с версией 0.10.2 без каких-либо проблем, но теперь похоже, что система wronly интерпретирует некоторые временные метки событий, такие как Long.MIN_VALUE, что приводит к сбою задачи.

Вопрос в том, что я сделал что-то неправильно или это ошибка, которая будет исправлена ​​в будущих выпусках?

Основная задача:

public class Test1_0_0 { 
    // Max Time lag between events time to current System time 
    static final Time maxTimeLag = Time.of(3, TimeUnit.SECONDS); 

    public static void main(String[] args) throws Exception { 
     // set up the execution environment 
     final StreamExecutionEnvironment env = StreamExecutionEnvironment 
       .getExecutionEnvironment(); 
     // Setting Event Time usage 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     env.setBufferTimeout(1); 
     // Properties initialization 
     Properties properties = new Properties(); 

     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "test"); 

     // Overwrites the default properties by one provided by command line 
     ParameterTool parameterTool = ParameterTool.fromArgs(args); 
     for(Map.Entry<String, String> e: parameterTool.toMap().entrySet()) { 
      properties.setProperty(e.getKey(),e.getValue()); 
     } 
     //properties.setProperty("auto.offset.reset", "smallest"); 
     System.out.println("Properties: " + properties); 
     DataStream<Message> stream = env 
     .addSource(new FlinkKafkaConsumer09<Message>("test", new MessageSDSchema(), properties)).filter(message -> message != null); 
     // The call to rebalance() causes data to be re-partitioned so that all machines receive messages 
     // (for example, when the number of Kafka partitions is fewer than the number of Flink parallel instances). 
     stream.rebalance() 
     .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag)); 
     // Counts messages 
     stream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() { 
      @Override 
      public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception { 
       Integer count = 0; 
       if (values.iterator().hasNext()) { 
        for (Message value : values) { 
         count++; 
        } 
        collector.collect("Arrived last minute: " + count); 
       } 
      } 
     }).print(); 
     // execute program 
     env.execute("Messages Counting"); 
    } 
} 

Отметка экстрактор:

public class MessageTimestampExtractor implements AssignerWithPeriodicWatermarks<Message>, Serializable { 

    private static final long serialVersionUID = 7526472295622776147L; 
    // Maximum lag between the current processing time and the timestamp of an event 
    long maxTimeLag = 0L; 
    long currentWatermarkTimestamp = 0L; 

    public MessageTimestampExtractor() { 
    } 

    public MessageTimestampExtractor(Time maxTimeLag) { 
     this.maxTimeLag = maxTimeLag.toMilliseconds(); 
    } 


    /** 
    * Assigns a timestamp to an element, in milliseconds since the Epoch. 
    * 
    * <p>The method is passed the previously assigned timestamp of the element. 
    * That previous timestamp may have been assigned from a previous assigner, 
    * by ingestion time. If the element did not carry a timestamp before, this value is 
    * {@code Long.MIN_VALUE}. 
    * 
    * @param message The element that the timestamp is wil be assigned to. 
    * @param previousElementTimestamp The previous internal timestamp of the element, 
    *         or a negative value, if no timestamp has been assigned, yet. 
    * @return The new timestamp. 
    */ 
    @Override 
    public long extractTimestamp(Message message, long previousElementTimestamp) { 
     long timestamp = message.getTimestamp(); 
     currentWatermarkTimestamp = Math.max(timestamp, currentWatermarkTimestamp); 
     return timestamp; 
    } 


    /** 
    * Returns the current watermark. This method is periodically called by the 
    * system to retrieve the current watermark. The method may return null to 
    * indicate that no new Watermark is available. 
    * 
    * <p>The returned watermark will be emitted only if it is non-null and larger 
    * than the previously emitted watermark. If the current watermark is still 
    * identical to the previous one, no progress in event time has happened since 
    * the previous call to this method. 
    * 
    * <p>If this method returns a value that is smaller than the previously returned watermark, 
    * then the implementation does not properly handle the event stream timestamps. 
    * In that case, the returned watermark will not be emitted (to preserve the contract of 
    * ascending watermarks), and the violation will be logged and registered in the metrics. 
    * 
    * <p>The interval in which this method is called and Watermarks are generated 
    * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. 
    * 
    * @see org.apache.flink.streaming.api.watermark.Watermark 
    * @see ExecutionConfig#getAutoWatermarkInterval() 
    */ 
    @Override 
    public Watermark getCurrentWatermark() { 
     if(currentWatermarkTimestamp <= 0) { 
      return new Watermark(Long.MIN_VALUE); 
     } 
     return new Watermark(currentWatermarkTimestamp - maxTimeLag); 
    } 

    public long getMaxTimeLag() { 
     return maxTimeLag; 
    } 

    public void setMaxTimeLag(long maxTimeLag) { 
     this.maxTimeLag = maxTimeLag; 
    } 
} 

ответ

2

Проблема заключается в том, что вызов assignTimestampsAndWatermarks возвращает новый DataStream, который использует метку времени экстрактор. Таким образом, вы должны использовать возвращаемый DataStream для выполнения последующих операций над ним.

DataStream<Message> timestampStream = stream.rebalance() 
     .assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag)); 
// Counts Strings 
timestampStream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() { 
    @Override 
    public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception { 
     Integer count = 0; 
     if (values.iterator().hasNext()) { 
      for (Message value : values) { 
       count++; 
      } 
      collector.collect("Arrived last minute: " + count); 
     } 
    } 
}).print(); 
+0

Спасибо, моя вина. Вероятно, это изменилось во время миграции! –

Смежные вопросы