2015-02-23 4 views
0

Я пытаюсь использовать Storm Apache для обработки потоков кода GeoHash. Я использую this library и Apache Storm 0.9.3. Детали geohash для python можно найти по адресу enter link description here.Проблема с синхронизацией при использовании Apache Storm

В настоящее время я столкнулся с проблемой синхронизации в методе выполнения одного класса BOLT. Я попытался использовать один жирный шрифт, который дает мне правильный результат. Но в тот момент, когда я перехожу от одной болтовой нити к двум или более. Выход забит.

фрагмент кода для одного из БОЛТ (только это возникают проблемы) является:

public static int PRECISION=6; 
private OutputCollector collector; 
BufferedReader br; 
String lastGeoHash="NONE"; 
HashMap<String,Integer> map; 
HashMap<String,String[]> zcd; 
TreeMap<Integer,String> counts=new TreeMap<Integer,String>(); 
public void prepare(Map conf, TopologyContext context, OutputCollector collector) 
{ 
    String line=""; 
    this.collector = collector; 
    map=new HashMap<String,Integer>(); 
    zcd=new HashMap<String,String[]>(); 
    try { 
     br = new BufferedReader(new FileReader("/tmp/zip_code_database.csv")); 
     int i=0; 
     while ((line = br.readLine()) != null) { 
      if(i==0){ 
       String columns[]=line.split(","); 
       for(int j=0;j<columns.length;j++){ 
        map.put(columns[j],j); 
       } 
      }else{ 
       String []split=line.split(","); 
       zcd.put(split[map.get("\"zip\"")],new String[]{split[map.get("\"state\"")],split[map.get("\"primary_city\"")]}); 
      } 
      i++; 
     } 
     br.close(); 
    // System.out.println(zcd); 
    } catch (FileNotFoundException e) { 
     e.printStackTrace(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
    System.out.println("Initialize"); 
    initializeTreeMapAsPerOurRequirement(counts); 
} 

public void execute(Tuple tuple) 
{ 
    String completeFile = tuple.getStringByField("string");//So, this data is generated by Spout and it contains the complete shape file where each line is separated by a new line character i.e. "\n" 
    String lines[]=completeFile.split("\t"); 
    String geohash=lines[0]; 
    int count=Integer.parseInt(lines[1]); 
    String zip=lines[2]; 
    String best=""; 
    String city=""; 
    String state=""; 

    if(!(geohash.equals(lastGeoHash)) && !(lastGeoHash.equals("NONE"))){ 
     //if(counts.size()!=0){ 
      //System.out.println(counts.firstKey()); 
       best=counts.get(counts.lastKey()); 
       //System.out.println(geohash); 
       if(zcd.containsKey("\""+best+"\"")){ 
        city = zcd.get("\""+best+"\"")[0]; 
        state = zcd.get("\""+best+"\"")[1]; 
        System.out.println(lastGeoHash+","+best+","+state+","+city+","+"US"); 
       }else if(!best.equals("NONE")){ 
        System.out.println(lastGeoHash); 
        city="MISSING"; 
        state="MISSING"; 
       } 
     //  initializeTreeMapAsPerOurRequirement(counts); 
      //}else{ 
       //System.out.println("else"+geohash); 
      //} 

     //} 
    } 
    lastGeoHash=geohash; 
    counts.put(count, zip); 

    collector.ack(tuple); 
} 

private void initializeTreeMapAsPerOurRequirement(TreeMap<Integer,String> counts){ 
    counts.clear(); 
    counts.put(-1,"NONE"); 
} 

public void declareOutputFields(OutputFieldsDeclarer declarer) 
{ 
    System.out.println("here"); 
    declarer.declare(new Fields("number")); 
} 

код Топология:

public static void main(String[] args) 
{ 
    TopologyBuilder builder = new TopologyBuilder(); 

    builder.setSpout("spout", new SendWholeFileDataSpout(),2); 
    builder.setBolt("map", new GeoHashBolt(),2).shuffleGrouping("spout"); 
    builder.setBolt("reduce",new GeoHashReduceBolt(),2).fieldsGrouping("map", new Fields("value")); 

    Config conf = new Config(); 

    LocalCluster cluster = new LocalCluster(); 
    cluster.submitTopology("test", conf, builder.createTopology()); 
    Utils.sleep(10000); 
    cluster.killTopology("test"); 
    cluster.shutdown(); 
} 

Может кто-то посмотреть в код и направлять мне немного ,

+0

что вы установили для 'parallelism_hint' за ваш болт? Также покажите свой код топологии. – Shams

+0

@shizan добавил код топологии. Я не добавил количество задач для каждого исполнителя, учитывая, что это не принесет пользы для моего приложения. –

ответ

0

Вы установили parallelism_hint на 2 для вашего желоба и обоих ваших болтов. Это означает, что на каждого компонента будут выполняться 2 исполнителя, что может испортить ваш результат.
Установив parallelism_hint в 1, вы можете достичь желаемого результата.

Смежные вопросы