Я написал программу искры для приема данных от textSocketStream
, и я вычисляю среднее значение температуры. Когда я прекращаю отправку данных в свой Spark-кластер после ~1 min
, среднее значение не должно меняться на время окна, то есть 1h
, поэтому есть около 59 min
, где ничего не должно меняться!Spark Streaming: разные средние значения, возвращаемые PairDStream.print
Теперь к проблеме я нашел: для меня это правильное количество данных: 100 записей в оконном DStream
, но рассчитанная сумма значений (а также расчетное среднее значение путем принятия avg = sum/count
) колеблются между несколькими разные средние значения.
Здесь выход консоли фрагмент (после того, как перестали посылать данные для windowedTempJoinPairDStream.print()
(сумма & счета) и windowedTempAvg.print()
(в среднем), каждый, как PairDStream<deviceId, [value]>
:
-------------------------------------------
Time: 1472801338000 ms
-------------------------------------------
(1-2-a-b-c,(49.159008,100))
-------------------------------------------
Time: 1472801338000 ms
-------------------------------------------
(1-2-a-b-c,0.49159008)
-------------------------------------------
Time: 1472801339000 ms
-------------------------------------------
(1-2-a-b-c,(49.159016,100))
-------------------------------------------
Time: 1472801339000 ms
-------------------------------------------
(1-2-a-b-c,0.49159014)
-------------------------------------------
Time: 1472801340000 ms
-------------------------------------------
(1-2-a-b-c,(49.159008,100))
-------------------------------------------
Time: 1472801340000 ms
-------------------------------------------
(1-2-a-b-c,0.49159008)
-------------------------------------------
Time: 1472801341000 ms
-------------------------------------------
(1-2-a-b-c,(49.159008,100))
-------------------------------------------
Time: 1472801341000 ms
-------------------------------------------
(1-2-a-b-c,0.49159008)
-------------------------------------------
Time: 1472801342000 ms
-------------------------------------------
(1-2-a-b-c,(49.159008,100))
-------------------------------------------
Time: 1472801342000 ms
-------------------------------------------
(1-2-a-b-c,0.49159008)
-------------------------------------------
Time: 1472801343000 ms
-------------------------------------------
(1-2-a-b-c,(49.159008,100))
-------------------------------------------
Time: 1472801343000 ms
-------------------------------------------
(1-2-a-b-c,0.49159008)
-------------------------------------------
Time: 1472801344000 ms
-------------------------------------------
(1-2-a-b-c,(49.15901,100))
-------------------------------------------
Time: 1472801344000 ms
-------------------------------------------
(1-2-a-b-c,0.4915901)
Здесь различные средние значения сверху, короче:
(1-2-a-b-c,0.49159008)
(1-2-a-b-c,0.49159014)
(1-2-a-b-c,0.49159008)
(1-2-a-b-c,0.49159008)
(1-2-a-b-c,0.49159008)
(1-2-a-b-c,0.49159008)
(1-2-a-b-c,0.4915901)
для меня, это, кажется, округления проблемы, так как мои значения температуры имеют типа Float
. Если это может быть возможным, как решить эту проблему?
При значениях температуры типа Integer
все работало нормально, не флуктуирующей ...
Если полезно, здесь соответствующий фрагмент кода программы:
JavaReceiverInputDStream<String> ingoingStream = streamingContext.socketTextStream(serverIp, 11833);
// 2. Map the DStream<String> to a DStream<SensorData> by deserializing JSON objects
JavaDStream<SensorData> sensorDStream = ingoingStream.map(new Function<String, SensorData>() {
public SensorData call(String json) throws Exception {
ObjectMapper om = new ObjectMapper();
return (SensorData)om.readValue(json, SensorData.class);
}
}).cache();
/************************************************ MOVIING AVERAGE OF TEMPERATURE *******************************************************************/
// Collect the data to a window of time (this is the time period for average calculation, older data is removed from stream!)
JavaDStream<SensorData> windowMovingAverageSensorDataTemp = sensorDStream.window(windowSizeMovingAverageTemperature);
windowMovingAverageSensorDataTemp.print();
// Map this SensorData stream to a new PairDStream, with key = deviceId (so we can make calculations by grouping by the id)
// .cache the Stream, because we re-use it more than 1 time!
JavaPairDStream<String, SensorData> windowMovingAverageSensorDataTempPairDStream = windowMovingAverageSensorDataTemp.mapToPair(new PairFunction<SensorData, String, SensorData>() {
public Tuple2<String, SensorData> call(SensorData data) throws Exception {
return new Tuple2<String, SensorData>(data.getIdSensor(), data);
}
}).cache();
// a) Map the PairDStream from above to a new PairDStream of form <deviceID, temperature>
// b) Sum up the values to the total sum, grouped also by key (= device id)
// => combined these two transactions, could also be called separately (like above)
JavaPairDStream<String, Float> windowMovingAverageSensorDataTempPairDStreamSum = windowMovingAverageSensorDataTempPairDStream.mapToPair(new PairFunction<Tuple2<String,SensorData>, String, Float>() {
public Tuple2<String, Float> call(Tuple2<String, SensorData> sensorDataPair) throws Exception {
String key = sensorDataPair._1();
Float value = sensorDataPair._2().getValTemp();
return new Tuple2<String, Float>(key, value);
}
}).reduceByKey(new Function2<Float, Float, Float>() {
public Float call(Float sumA, Float sumB) throws Exception {
return sumA + sumB;
}
});
// a) Map the PairDStream from above to a new PairDStream of form <deviceID, 1L> to prepare the counting (1 = 1 entry)
// b) Sum up the values to the total count of entries, grouped by key (= device id)
// => also combined both calls
JavaPairDStream<String, Long> windowMovingAverageSensorDataTempPairDStreamCount = windowMovingAverageSensorDataTempPairDStream.mapToPair(new PairFunction<Tuple2<String,SensorData>, String, Long>() {
public Tuple2<String, Long> call(Tuple2<String, SensorData> sensorDataPair) throws Exception {
String key = sensorDataPair._1();
Long value = 1L;
return new Tuple2<String, Long>(key, value);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
public Long call(Long countA, Long countB) throws Exception {
return countA + countB;
}
});
// Make a join of the sum and count Streams, so this puts together data with same keys (device id)
// This results in a new PairDStream of <deviceID, <sumOfTemp, countOfEntries>>
JavaPairDStream<String, Tuple2<Float, Long>> windowedTempJoinPairDStream = windowMovingAverageSensorDataTempPairDStreamSum.join(windowMovingAverageSensorDataTempPairDStreamCount).cache();
// Calculate the average temperature by avg = sumOfTemp/countOfEntries, do this for each key (device id)
JavaPairDStream<String, Float> windowedTempAvg = windowedTempJoinPairDStream.mapToPair(new PairFunction<Tuple2<String,Tuple2<Float,Long>>, String, Float>() {
public Tuple2<String, Float> call(Tuple2<String, Tuple2<Float, Long>> joinedData) throws Exception {
String key = joinedData._1();
float tempSum = joinedData._2()._1();
long count = joinedData._2()._2();
float avg = tempSum/(float)count;
return new Tuple2<String, Float>(key, avg);
}
});
// print the joined PairDStream from above to check sum & count visually
windowedTempJoinPairDStream.print();
// print the final, calculated average values for each device id in form (deviceId, avgTemperature)
windowedTempAvg.print();
// ========================================================= START THE STREAM ============================================================
// Start streaming & listen until stream is closed
streamingContext.start();
streamingContext.awaitTermination();
EDIT: Спарк App с помощью StatCounter
для средних расчетов:
Только что изменил мой код для работы с StatCounter
для th е средний расчет, но по-прежнему получать разные средние значения:
-------------------------------------------
Time: 1473077627000 ms
-------------------------------------------
(1-2-a-b-c,0.4779797872435302)
-------------------------------------------
Time: 1473077628000 ms
-------------------------------------------
(1-2-a-b-c,0.4779797872435303)
-------------------------------------------
Time: 1473077629000 ms
-------------------------------------------
(1-2-a-b-c,0.4779797872435301)
-------------------------------------------
Time: 1473077630000 ms
-------------------------------------------
(1-2-a-b-c,0.4779797872435302)
-------------------------------------------
Time: 1473077631000 ms
-------------------------------------------
(1-2-a-b-c,0.4779797872435301)
-------------------------------------------
Time: 1473077632000 ms
-------------------------------------------
(1-2-a-b-c,0.47797978724353024)
-------------------------------------------
Time: 1473077633000 ms
-------------------------------------------
(1-2-a-b-c,0.47797978724353013)
Здесь новый фрагмент кода:
/************************************************ MOVIING AVERAGE OF TEMPERATURE *******************************************************************/
JavaDStream<SensorData> windowMovingAverageSensorDataTemp = sensorDStream.window(windowSizeMovingAverageTemperature);
JavaPairDStream<String, SensorData> windowMovingAverageSensorDataTempPairDStream = windowMovingAverageSensorDataTemp.mapToPair(new PairFunction<SensorData, String, SensorData>() {
public Tuple2<String, SensorData> call(SensorData data) throws Exception {
return new Tuple2<String, SensorData>(data.getIdSensor(), data);
}
}).cache();
JavaPairDStream<String, StatCounter> preparedAvgPairStream = windowMovingAverageSensorDataTempPairDStream.combineByKey(new Function<SensorData, StatCounter>() {
public StatCounter call(SensorData data) throws Exception {
return new StatCounter().merge(data.getValTemp());
}
}, new Function2<StatCounter, SensorData, StatCounter>() {
public StatCounter call(StatCounter sc, SensorData sensorData) throws Exception {
return sc.merge(sensorData.getValTemp());
}
}, new Function2<StatCounter, StatCounter, StatCounter>() {
public StatCounter call(StatCounter sc1, StatCounter sc2) throws Exception {
return sc1.merge(sc2);
}
}, new HashPartitioner(60));
JavaPairDStream<String, Double> avgPairStream = preparedAvgPairStream.mapToPair(new PairFunction<Tuple2<String,StatCounter>, String, Double>() {
public Tuple2<String, Double> call(Tuple2<String, StatCounter> statCounterByKey) throws Exception {
String key = statCounterByKey._1();
double value = statCounterByKey._2().mean();
return new Tuple2<String, Double> (key, value);
}
});
avgPairStream.print();
Спасибо, тогда моя догадка была права ...Возможно ли, что «StatCounter» капирует статистику во все времена? Поскольку я просто хочу рассчитать среднее значение для оконных данных, это будет неправильным способом ... Как насчет округления моих значений, например. две позиции после десятичной точки? –
Счетчик Stat может использоваться в любом контексте, он не является специфичным для потоковой передачи или RDD. – zero323
Я изменил свой код, чтобы использовать StatCounter, см. Редактирование моего вопроса выше. Но я все еще получаю разные значения. –