Mapper:
import com.hazelcast.mapreduce.Context;
import com.hazelcast.mapreduce.Mapper;
import stock.Stock;
public class MinMaxMapper implements Mapper<String, Stock, String, Double> {
static final String MIN = "min";
static final String MAX = "max";
@Override
public void map(String key, Stock value, Context<String, Double> context) {
context.emit(MIN, value.getPrice());
context.emit(MAX, value.getPrice());
}
}
комбинатор:
import com.hazelcast.mapreduce.Combiner;
import com.hazelcast.mapreduce.CombinerFactory;
public class MinMaxCombinerFactory implements CombinerFactory<String, Double, Double> {
@Override
public Combiner<Double, Double> newCombiner(String key) {
return new MinMaxCombiner(MinMaxMapper.MAX.equals(key) ? true : false);
}
private static class MinMaxCombiner extends Combiner<Double, Double> {
private final boolean maxCombiner;
private double value;
private MinMaxCombiner(boolean maxCombiner) {
this.maxCombiner = maxCombiner;
this.value = maxCombiner ? -Double.MAX_VALUE : Double.MAX_VALUE;
}
@Override
public void combine(Double value) {
if (maxCombiner) {
this.value = Math.max(value, this.value);
} else {
this.value = Math.min(value, this.value);
}
}
@Override
public Double finalizeChunk() {
return value;
}
@Override
public void reset() {
this.value = maxCombiner ? -Double.MAX_VALUE : Double.MAX_VALUE;
}
}
}
Разбавление:
import com.hazelcast.mapreduce.Reducer;
import com.hazelcast.mapreduce.ReducerFactory;
public class MinMaxReducerFactory implements ReducerFactory<String, Double, Double> {
@Override
public Reducer<Double, Double> newReducer(String key) {
return new MinMaxReducer(MinMaxMapper.MAX.equals(key) ? true : false);
}
private static class MinMaxReducer extends Reducer<Double, Double> {
private final boolean maxReducer;
private volatile double value;
private MinMaxReducer(boolean maxReducer) {
this.maxReducer = maxReducer;
this.value = maxReducer ? -Double.MAX_VALUE : Double.MAX_VALUE;
}
@Override
public void reduce(Double value) {
if (maxReducer) {
this.value = Math.max(value, this.value);
} else {
this.value = Math.min(value, this.value);
}
}
@Override
public Double finalizeReduce() {
return value;
}
}
}
Возвращает два элемента карта с мин и макс:
ICompletableFuture<Map<String, Double>> future =
job.mapper(new MinMaxMapper())
.combiner(new MinMaxCombinerFactory())
.reducer(new MinMaxReducerFactory())
.submit();
Map<String, Double> result = future.get();
map.keySet крайне неэффективен и опасен в использовании. Он будет всасывать весь набор карт в память одного JVM. Из-за этого не происходит никаких дозированных операций, поэтому вы можете легко получить OOME из-за этого. – pveentjer