Я пытаюсь использовать Storm Apache для обработки потоков кода GeoHash. Я использую this library и Apache Storm 0.9.3. Детали geohash для python можно найти по адресу enter link description here.Проблема с синхронизацией при использовании Apache Storm
В настоящее время я столкнулся с проблемой синхронизации в методе выполнения одного класса BOLT. Я попытался использовать один жирный шрифт, который дает мне правильный результат. Но в тот момент, когда я перехожу от одной болтовой нити к двум или более. Выход забит.
фрагмент кода для одного из БОЛТ (только это возникают проблемы) является:
public static int PRECISION=6;
private OutputCollector collector;
BufferedReader br;
String lastGeoHash="NONE";
HashMap<String,Integer> map;
HashMap<String,String[]> zcd;
TreeMap<Integer,String> counts=new TreeMap<Integer,String>();
public void prepare(Map conf, TopologyContext context, OutputCollector collector)
{
String line="";
this.collector = collector;
map=new HashMap<String,Integer>();
zcd=new HashMap<String,String[]>();
try {
br = new BufferedReader(new FileReader("/tmp/zip_code_database.csv"));
int i=0;
while ((line = br.readLine()) != null) {
if(i==0){
String columns[]=line.split(",");
for(int j=0;j<columns.length;j++){
map.put(columns[j],j);
}
}else{
String []split=line.split(",");
zcd.put(split[map.get("\"zip\"")],new String[]{split[map.get("\"state\"")],split[map.get("\"primary_city\"")]});
}
i++;
}
br.close();
// System.out.println(zcd);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Initialize");
initializeTreeMapAsPerOurRequirement(counts);
}
public void execute(Tuple tuple)
{
String completeFile = tuple.getStringByField("string");//So, this data is generated by Spout and it contains the complete shape file where each line is separated by a new line character i.e. "\n"
String lines[]=completeFile.split("\t");
String geohash=lines[0];
int count=Integer.parseInt(lines[1]);
String zip=lines[2];
String best="";
String city="";
String state="";
if(!(geohash.equals(lastGeoHash)) && !(lastGeoHash.equals("NONE"))){
//if(counts.size()!=0){
//System.out.println(counts.firstKey());
best=counts.get(counts.lastKey());
//System.out.println(geohash);
if(zcd.containsKey("\""+best+"\"")){
city = zcd.get("\""+best+"\"")[0];
state = zcd.get("\""+best+"\"")[1];
System.out.println(lastGeoHash+","+best+","+state+","+city+","+"US");
}else if(!best.equals("NONE")){
System.out.println(lastGeoHash);
city="MISSING";
state="MISSING";
}
// initializeTreeMapAsPerOurRequirement(counts);
//}else{
//System.out.println("else"+geohash);
//}
//}
}
lastGeoHash=geohash;
counts.put(count, zip);
collector.ack(tuple);
}
private void initializeTreeMapAsPerOurRequirement(TreeMap<Integer,String> counts){
counts.clear();
counts.put(-1,"NONE");
}
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
System.out.println("here");
declarer.declare(new Fields("number"));
}
код Топология:
public static void main(String[] args)
{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new SendWholeFileDataSpout(),2);
builder.setBolt("map", new GeoHashBolt(),2).shuffleGrouping("spout");
builder.setBolt("reduce",new GeoHashReduceBolt(),2).fieldsGrouping("map", new Fields("value"));
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
Может кто-то посмотреть в код и направлять мне немного ,
что вы установили для 'parallelism_hint' за ваш болт? Также покажите свой код топологии. – Shams
@shizan добавил код топологии. Я не добавил количество задач для каждого исполнителя, учитывая, что это не принесет пользы для моего приложения. –