2014-10-22 3 views
1

У меня есть распределенная карта, и я хочу найти самый низкий или самый высокий ключ (объект, реализующий сравнимый). Каков наиболее эффективный способ получить эти ключи? Я имею в виду, что что-то вроде каждого узла обеспечивает его самый низкий ключ, и в конце самый низкий ключ является самым низким из каждого узла.Найти самый низкий/самый высокий ключ для распределенной карты

Так что я думаю:

MyObj max = Collections.max(map.keySet()); 

является не самым эффективным способом. И если я хочу использовать

new DistributedTask<>(new Max(input), key); 

Мне нужно будет теперь ключ и, следовательно, получить все ключи по проводам. Я думаю, что в этом случае я мог бы делать Collections.max (map.keySet()); также.

Хм ... любые идеи?

+0

map.keySet крайне неэффективен и опасен в использовании. Он будет всасывать весь набор карт в память одного JVM. Из-за этого не происходит никаких дозированных операций, поэтому вы можете легко получить OOME из-за этого. – pveentjer

ответ

0

Это решение для уменьшения карты, похоже, имеет много накладных расходов, но это лучший способ получить работу. Любые лучшие идеи по-прежнему приветствуются.

public static void main(String[] args) throws ExecutionException, InterruptedException { 
    IMap<String, Integer> map = instance.getMap("test"); 
    JobTracker jobTracker = instance.getJobTracker("default"); 
    KeyValueSource<String, Integer> source = KeyValueSource.fromMap(map); 
    Job<String, Integer> job = jobTracker.newJob(source); 

    JobCompletableFuture<Map<String, String>> future = job 
      .mapper(new MaxMapper()) 
      .reducer(new MaxReducerFactory()) 
      .submit(); 

    System.out.println("mr max: " + future.get()); 
} 


public static class MaxMapper implements Mapper<String, Integer, String, String> { 
    private volatile String max = null; 

    @Override 
    public void map(String s, Integer integer, Context<String, String> ctx) { 
     if (max == null || s.compareTo(max)>0) { 
      max = s; 
      ctx.emit("max", max); 
     } 
    } 
} 

public static class MaxReducerFactory implements ReducerFactory<String,String,String> { 

    @Override 
    public Reducer<String, String> newReducer(String s) { 
     return new MaxReducer(); 
    } 

    private class MaxReducer extends Reducer<String, String> { 
     private volatile String max = null; 

     @Override 
     public void reduce(String s) { 
      if (max == null || s.compareTo(max)>0) max = s; 
     } 

     @Override 
     public String finalizeReduce() { 
      return max; // == null ? "" : max; 
     } 
    } 
} 
+0

Вы также должны добавить Combiner, чтобы значительно снизить трафик, поскольку для каждого фрагмента данных, испускаемых Combiners, вы просто должны указывать текущие значения min и max. Я приведу полный пример ниже. – noctarius

0

Вы можете использовать EntryProcessor.executeOnEntries - с состоянием EntryProcessor - и затем пусть он сделает всю работу за вас; иметь каждую ключевую карту для отправителя MIN и MAX перечисление, если они являются min и max.

Если у вас есть представление о границах, вы можете прикрепить фильтр Predicate, а также ускорить его таким образом.

+0

«Hazelcast отправляет процессор записи каждому члену кластера, и эти члены применяют его для записи в карту. Таким образом, если вы добавляете больше членов, ваша обработка будет выполнена быстрее». Вы уверены, что тот же самый объект передан? В противном случае я просто получу самый низкий ключ локального узла. – KIC

+0

Я не могу легко проверить это. Однако я бы попробовал, и если вы получите несколько результатов, просто вычислите min и max результатов, возвращаемых из локальных кластеров. Он должен быть быстрее, чем 'Collections.max', так как вам не нужно тянуть ВСЕ элементы. – durron597

+0

Хм, похоже, это не работает очень хорошо, и чтобы получить все ключи от узлов, мне понадобится очередь или другая распределенная карта. Я думаю, что это немного накладные расходы, но я буду использовать Mapper и Reducer. – KIC

0

Mapper:

import com.hazelcast.mapreduce.Context; 
import com.hazelcast.mapreduce.Mapper; 
import stock.Stock; 

public class MinMaxMapper implements Mapper<String, Stock, String, Double> { 

    static final String MIN = "min"; 
    static final String MAX = "max"; 

    @Override 
    public void map(String key, Stock value, Context<String, Double> context) { 
     context.emit(MIN, value.getPrice()); 
     context.emit(MAX, value.getPrice()); 
    } 
} 

комбинатор:

import com.hazelcast.mapreduce.Combiner; 
import com.hazelcast.mapreduce.CombinerFactory; 

public class MinMaxCombinerFactory implements CombinerFactory<String, Double, Double> { 

    @Override 
    public Combiner<Double, Double> newCombiner(String key) { 
     return new MinMaxCombiner(MinMaxMapper.MAX.equals(key) ? true : false); 
    } 

    private static class MinMaxCombiner extends Combiner<Double, Double> { 

     private final boolean maxCombiner; 

     private double value; 

     private MinMaxCombiner(boolean maxCombiner) { 
      this.maxCombiner = maxCombiner; 
      this.value = maxCombiner ? -Double.MAX_VALUE : Double.MAX_VALUE; 
     } 

     @Override 
     public void combine(Double value) { 
      if (maxCombiner) { 
       this.value = Math.max(value, this.value); 
      } else { 
       this.value = Math.min(value, this.value); 
      } 
     } 

     @Override 
     public Double finalizeChunk() { 
      return value; 
     } 

     @Override 
     public void reset() { 
      this.value = maxCombiner ? -Double.MAX_VALUE : Double.MAX_VALUE; 
     } 
    } 
} 

Разбавление:

import com.hazelcast.mapreduce.Reducer; 
import com.hazelcast.mapreduce.ReducerFactory; 

public class MinMaxReducerFactory implements ReducerFactory<String, Double, Double> { 

    @Override 
    public Reducer<Double, Double> newReducer(String key) { 
     return new MinMaxReducer(MinMaxMapper.MAX.equals(key) ? true : false); 
    } 

    private static class MinMaxReducer extends Reducer<Double, Double> { 

     private final boolean maxReducer; 

     private volatile double value; 

     private MinMaxReducer(boolean maxReducer) { 
      this.maxReducer = maxReducer; 
      this.value = maxReducer ? -Double.MAX_VALUE : Double.MAX_VALUE; 
     } 

     @Override 
     public void reduce(Double value) { 
      if (maxReducer) { 
       this.value = Math.max(value, this.value); 
      } else { 
       this.value = Math.min(value, this.value); 
      } 
     } 

     @Override 
     public Double finalizeReduce() { 
      return value; 
     } 
    } 
} 

Возвращает два элемента карта с мин и макс:

ICompletableFuture<Map<String, Double>> future = 
     job.mapper(new MinMaxMapper()) 
     .combiner(new MinMaxCombinerFactory()) 
     .reducer(new MinMaxReducerFactory()) 
     .submit(); 

Map<String, Double> result = future.get(); 
+0

это дает мне ключ от максимального значения на карте, мне действительно нужен максимальный ключ.Но, спасибо, я думаю, что могу изменить это на свои нужды. – KIC

0

Почему бы вам не создать упорядоченный индекс? Хотя я не совсем уверен, возможно ли в настоящее время найти максимальное значение с использованием предиката и после его обнаружения, прервите оценку предиката.

Смежные вопросы