1

Я написал программу искры для приема данных от textSocketStream, и я вычисляю среднее значение температуры. Когда я прекращаю отправку данных в свой Spark-кластер после ~1 min, среднее значение не должно меняться на время окна, то есть 1h, поэтому есть около 59 min, где ничего не должно меняться!Spark Streaming: разные средние значения, возвращаемые PairDStream.print

Теперь к проблеме я нашел: для меня это правильное количество данных: 100 записей в оконном DStream, но рассчитанная сумма значений (а также расчетное среднее значение путем принятия avg = sum/count) колеблются между несколькими разные средние значения.

Здесь выход консоли фрагмент (после того, как перестали посылать данные для windowedTempJoinPairDStream.print() (сумма & счета) и windowedTempAvg.print() (в среднем), каждый, как PairDStream<deviceId, [value]>:

------------------------------------------- 
Time: 1472801338000 ms 
------------------------------------------- 
(1-2-a-b-c,(49.159008,100)) 

------------------------------------------- 
Time: 1472801338000 ms 
------------------------------------------- 
(1-2-a-b-c,0.49159008) 

------------------------------------------- 
Time: 1472801339000 ms 
------------------------------------------- 
(1-2-a-b-c,(49.159016,100)) 

------------------------------------------- 
Time: 1472801339000 ms 
------------------------------------------- 
(1-2-a-b-c,0.49159014) 

------------------------------------------- 
Time: 1472801340000 ms 
------------------------------------------- 
(1-2-a-b-c,(49.159008,100)) 

------------------------------------------- 
Time: 1472801340000 ms 
------------------------------------------- 
(1-2-a-b-c,0.49159008) 

------------------------------------------- 
Time: 1472801341000 ms 
------------------------------------------- 
(1-2-a-b-c,(49.159008,100)) 

------------------------------------------- 
Time: 1472801341000 ms 
------------------------------------------- 
(1-2-a-b-c,0.49159008) 

------------------------------------------- 
Time: 1472801342000 ms 
------------------------------------------- 
(1-2-a-b-c,(49.159008,100)) 

------------------------------------------- 
Time: 1472801342000 ms 
------------------------------------------- 
(1-2-a-b-c,0.49159008) 

------------------------------------------- 
Time: 1472801343000 ms 
------------------------------------------- 
(1-2-a-b-c,(49.159008,100)) 

------------------------------------------- 
Time: 1472801343000 ms 
------------------------------------------- 
(1-2-a-b-c,0.49159008) 

------------------------------------------- 
Time: 1472801344000 ms 
------------------------------------------- 
(1-2-a-b-c,(49.15901,100)) 

------------------------------------------- 
Time: 1472801344000 ms 
------------------------------------------- 
(1-2-a-b-c,0.4915901) 

Здесь различные средние значения сверху, короче:

(1-2-a-b-c,0.49159008) 
(1-2-a-b-c,0.49159014) 
(1-2-a-b-c,0.49159008) 
(1-2-a-b-c,0.49159008) 
(1-2-a-b-c,0.49159008) 
(1-2-a-b-c,0.49159008) 
(1-2-a-b-c,0.4915901) 

для меня, это, кажется, округления проблемы, так как мои значения температуры имеют типа Float. Если это может быть возможным, как решить эту проблему?

При значениях температуры типа Integer все работало нормально, не флуктуирующей ...

Если полезно, здесь соответствующий фрагмент кода программы:

 JavaReceiverInputDStream<String> ingoingStream = streamingContext.socketTextStream(serverIp, 11833); 

     // 2. Map the DStream<String> to a DStream<SensorData> by deserializing JSON objects 
     JavaDStream<SensorData> sensorDStream = ingoingStream.map(new Function<String, SensorData>() { 
      public SensorData call(String json) throws Exception { 
       ObjectMapper om = new ObjectMapper(); 
       return (SensorData)om.readValue(json, SensorData.class); 
      } 
     }).cache(); 


     /************************************************ MOVIING AVERAGE OF TEMPERATURE *******************************************************************/ 

     // Collect the data to a window of time (this is the time period for average calculation, older data is removed from stream!) 
     JavaDStream<SensorData> windowMovingAverageSensorDataTemp = sensorDStream.window(windowSizeMovingAverageTemperature); 

     windowMovingAverageSensorDataTemp.print(); 

     // Map this SensorData stream to a new PairDStream, with key = deviceId (so we can make calculations by grouping by the id) 
     // .cache the Stream, because we re-use it more than 1 time! 
     JavaPairDStream<String, SensorData> windowMovingAverageSensorDataTempPairDStream = windowMovingAverageSensorDataTemp.mapToPair(new PairFunction<SensorData, String, SensorData>() { 
      public Tuple2<String, SensorData> call(SensorData data) throws Exception { 
       return new Tuple2<String, SensorData>(data.getIdSensor(), data); 
      } 
     }).cache(); 

     // a) Map the PairDStream from above to a new PairDStream of form <deviceID, temperature> 
     // b) Sum up the values to the total sum, grouped also by key (= device id) 
     // => combined these two transactions, could also be called separately (like above) 
     JavaPairDStream<String, Float> windowMovingAverageSensorDataTempPairDStreamSum = windowMovingAverageSensorDataTempPairDStream.mapToPair(new PairFunction<Tuple2<String,SensorData>, String, Float>() { 
      public Tuple2<String, Float> call(Tuple2<String, SensorData> sensorDataPair) throws Exception { 
       String key = sensorDataPair._1(); 
       Float value = sensorDataPair._2().getValTemp(); 
       return new Tuple2<String, Float>(key, value); 
      } 
     }).reduceByKey(new Function2<Float, Float, Float>() { 
      public Float call(Float sumA, Float sumB) throws Exception { 
       return sumA + sumB; 
      } 
     }); 

     // a) Map the PairDStream from above to a new PairDStream of form <deviceID, 1L> to prepare the counting (1 = 1 entry) 
     // b) Sum up the values to the total count of entries, grouped by key (= device id) 
     // => also combined both calls 
     JavaPairDStream<String, Long> windowMovingAverageSensorDataTempPairDStreamCount = windowMovingAverageSensorDataTempPairDStream.mapToPair(new PairFunction<Tuple2<String,SensorData>, String, Long>() { 
      public Tuple2<String, Long> call(Tuple2<String, SensorData> sensorDataPair) throws Exception { 
       String key = sensorDataPair._1(); 
       Long value = 1L; 
       return new Tuple2<String, Long>(key, value); 
      } 
     }).reduceByKey(new Function2<Long, Long, Long>() { 
      public Long call(Long countA, Long countB) throws Exception { 
       return countA + countB; 
      } 
     }); 

     // Make a join of the sum and count Streams, so this puts together data with same keys (device id) 
     // This results in a new PairDStream of <deviceID, <sumOfTemp, countOfEntries>> 
     JavaPairDStream<String, Tuple2<Float, Long>> windowedTempJoinPairDStream = windowMovingAverageSensorDataTempPairDStreamSum.join(windowMovingAverageSensorDataTempPairDStreamCount).cache(); 

     // Calculate the average temperature by avg = sumOfTemp/countOfEntries, do this for each key (device id) 
     JavaPairDStream<String, Float> windowedTempAvg = windowedTempJoinPairDStream.mapToPair(new PairFunction<Tuple2<String,Tuple2<Float,Long>>, String, Float>() { 
      public Tuple2<String, Float> call(Tuple2<String, Tuple2<Float, Long>> joinedData) throws Exception { 
       String key = joinedData._1(); 
       float tempSum = joinedData._2()._1(); 
       long count = joinedData._2()._2(); 

       float avg = tempSum/(float)count; 
       return new Tuple2<String, Float>(key, avg); 
      } 
     }); 

     // print the joined PairDStream from above to check sum & count visually 
     windowedTempJoinPairDStream.print(); 

     // print the final, calculated average values for each device id in form (deviceId, avgTemperature) 
     windowedTempAvg.print(); 

     // ========================================================= START THE STREAM ============================================================ 

     // Start streaming & listen until stream is closed 
     streamingContext.start(); 
     streamingContext.awaitTermination(); 

EDIT: Спарк App с помощью StatCounter для средних расчетов:

Только что изменил мой код для работы с StatCounter для th е средний расчет, но по-прежнему получать разные средние значения:

------------------------------------------- 
Time: 1473077627000 ms 
------------------------------------------- 
(1-2-a-b-c,0.4779797872435302) 

------------------------------------------- 
Time: 1473077628000 ms 
------------------------------------------- 
(1-2-a-b-c,0.4779797872435303) 

------------------------------------------- 
Time: 1473077629000 ms 
------------------------------------------- 
(1-2-a-b-c,0.4779797872435301) 

------------------------------------------- 
Time: 1473077630000 ms 
------------------------------------------- 
(1-2-a-b-c,0.4779797872435302) 

------------------------------------------- 
Time: 1473077631000 ms 
------------------------------------------- 
(1-2-a-b-c,0.4779797872435301) 

------------------------------------------- 
Time: 1473077632000 ms 
------------------------------------------- 
(1-2-a-b-c,0.47797978724353024) 

------------------------------------------- 
Time: 1473077633000 ms 
------------------------------------------- 
(1-2-a-b-c,0.47797978724353013) 

Здесь новый фрагмент кода:

/************************************************ MOVIING AVERAGE OF TEMPERATURE *******************************************************************/ 

JavaDStream<SensorData> windowMovingAverageSensorDataTemp = sensorDStream.window(windowSizeMovingAverageTemperature); 

JavaPairDStream<String, SensorData> windowMovingAverageSensorDataTempPairDStream = windowMovingAverageSensorDataTemp.mapToPair(new PairFunction<SensorData, String, SensorData>() { 
    public Tuple2<String, SensorData> call(SensorData data) throws Exception { 
     return new Tuple2<String, SensorData>(data.getIdSensor(), data); 
    } 
}).cache(); 

JavaPairDStream<String, StatCounter> preparedAvgPairStream = windowMovingAverageSensorDataTempPairDStream.combineByKey(new Function<SensorData, StatCounter>() { 
    public StatCounter call(SensorData data) throws Exception { 
     return new StatCounter().merge(data.getValTemp()); 
    } 
}, new Function2<StatCounter, SensorData, StatCounter>() { 
    public StatCounter call(StatCounter sc, SensorData sensorData) throws Exception { 
     return sc.merge(sensorData.getValTemp()); 
    } 
}, new Function2<StatCounter, StatCounter, StatCounter>() { 
    public StatCounter call(StatCounter sc1, StatCounter sc2) throws Exception { 
     return sc1.merge(sc2); 
    } 
}, new HashPartitioner(60)); 

JavaPairDStream<String, Double> avgPairStream = preparedAvgPairStream.mapToPair(new PairFunction<Tuple2<String,StatCounter>, String, Double>() { 
    public Tuple2<String, Double> call(Tuple2<String, StatCounter> statCounterByKey) throws Exception { 
     String key = statCounterByKey._1(); 
     double value = statCounterByKey._2().mean(); 
     return new Tuple2<String, Double> (key, value); 
    } 
}); 

avgPairStream.print(); 

ответ

1

По крайней мере, на первый взгляд, это не особенно странно. Как вы уже сказали, это, скорее всего, связано с ошибками округления. Поскольку арифметика FP не является ни associative, ни коммутативными, а искажения Spark не детерминированы, мы можем ожидать, что результаты будут колебаться от запуска до запуска.

Сколько можно делать сильно зависит от ограничений:

  • Для начала вычислительного среднего непосредственно не является численно стабильным. Лучше использовать o.a.s.util.StatCounter, который реализует вариант the Online algorithm, который имеет намного лучшие числовые свойства.
  • Если вы можете позволить себе использовать произвольные номера точности, такие как BigDecimal.
  • Наконец, принудительный порядок суммирования с небольшим количеством перераспределения и магии вторичного сорта может обеспечить согласованные (хотя и не необходимые точные) результаты.
+0

Спасибо, тогда моя догадка была права ...Возможно ли, что «StatCounter» капирует статистику во все времена? Поскольку я просто хочу рассчитать среднее значение для оконных данных, это будет неправильным способом ... Как насчет округления моих значений, например. две позиции после десятичной точки? –

+0

Счетчик Stat может использоваться в любом контексте, он не является специфичным для потоковой передачи или RDD. – zero323

+0

Я изменил свой код, чтобы использовать StatCounter, см. Редактирование моего вопроса выше. Но я все еще получаю разные значения. –