2015-11-06 4 views
2

Я тестировал удаленную отправку топологий Storm с использованием IDE (Eclipse). И мне удалось загрузить простую штормовую топологию в удаленный кластер Storm, но странно, когда я проверил интерфейс Storm, чтобы убедиться, что топология, которая была отправлена ​​удаленно, работает без проблем, я видел только болтовку в пользовательском интерфейсе, но другие болтов и желоба нет. После этого я отправил топологию вручную из командной строки и снова проверил Storm UI, и он работает так, как будто он должен работать без проблем. Я искал причину, но не смог найти. Я прикрепил как топологию и удаленный класс податель ниже и соответствующие Грозовые фотографии интерфейса:Apache Storm Remote Topology Submission

Это выход из консоли Eclipse (после удаленного представления)

225 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar  T:\STORM_TOPOLOGIES\Benchmark.jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar 
234 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar 

Вот топология:

public class StormBenchmark { 


// ****************************************************************************************** 
public static class GenSpout extends BaseRichSpout { 

    //private static final Logger logger = Logger.getLogger(StormBenchmark.class.getName()); 

    private Long count = 1L; 
    private Object msgID; 
    private static final long serialVersionUID = 1L; 
    private static final Character[] CHARS = new Character[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'}; 
    private static final String[] newsagencies = {"bbc", "cnn", "reuters", "aljazeera", "nytimes", "nbc news", "fox news", "interfax"}; 

    SpoutOutputCollector _collector; 
    int _size; 
    Random _rand; 
    String _id; 
    String _val; 
    // Constructor 
    public GenSpout(int size) { 
     _size = size; 
    } 

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     _collector = collector; 
     _rand = new Random(); 
     _id = randString(5); 
     _val = randString2(_size); 
    } 
    //Business logic 
    public void nextTuple() { 

     count++; 
     msgID = count; 
     _collector.emit(new Values(_id, _val), msgID); 
    } 

    public void ack(Object msgID) { 
     this.msgID = msgID; 
    } 

    private String randString(int size) { 

     StringBuffer buf = new StringBuffer(); 
     for(int i=0; i<size; i++) { 
      buf.append(CHARS[_rand.nextInt(CHARS.length)]); 
     } 
     return buf.toString(); 
    } 

    private String randString2(int size) { 

     StringBuffer buf = new StringBuffer(); 
     for(int i=0; i<size; i++) { 
      buf.append(newsagencies[_rand.nextInt(newsagencies.length)]); 
     } 
     return buf.toString(); 
    } 

    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("id", "item")); 
    }   
} 
// ======================================================================================================= 
// =================================== B O L T =========================================================== 
public static class IdentityBolt extends BaseBasicBolt {  

    private static final long serialVersionUID = 1L; 

    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("id", "item")); 
    } 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 

     String character = tuple.getString(0); 
     String agency = tuple.getString(1); 
     List<String> box = new ArrayList<String>(); 
     box.add(character); 
     box.add(agency); 
     try { 
      fileWriter(box); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
     box.clear(); 
    } 

    public void fileWriter(List<String> listjon) throws IOException { 

     String pathname = "/home/hduser/logOfStormTops/logs.txt"; 
     File file = new File(pathname); 
     if (!file.exists()){ 
      file.createNewFile(); 
     } 
     BufferedWriter writer = new BufferedWriter(new FileWriter(file, true)); 

     writer.write(listjon.get(0) + " : " + listjon.get(1)); 
     writer.newLine(); 
     writer.flush(); 
     writer.close();   
    } 
} 


//storm jar storm-benchmark-0.0.1-SNAPSHOT-standalone.jar storm.benchmark.ThroughputTest demo 100 8 8 8 10000 
public static void main(String[] args) throws Exception { 


    TopologyBuilder builder = new TopologyBuilder(); 

    builder.setSpout("spout", new GenSpout(8), 2).setNumTasks(4); 

    builder.setBolt("bolt", new IdentityBolt(), 4).setNumTasks(8) 
      .shuffleGrouping("spout"); 


    Config conf = new Config(); 
    conf.setMaxSpoutPending(200); 
    conf.setStatsSampleRate(0.0001); 
    //topology.executor.receive.buffer.size: 8192 #batched 
    //topology.executor.send.buffer.size: 8192 #individual messages 
    //topology.transfer.buffer.size: 1024 # batched 

    conf.put("topology.executor.send.buffer.size", 1024); 
    conf.put("topology.transfer.buffer.size", 8); 
    conf.put("topology.receiver.buffer.size", 8); 
    conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xdebug -Xrunjdwp:transport=dt_socket,address=1%ID%,server=y,suspend=n"); 

    StormSubmitter.submitTopology("SampleTop", conf, builder.createTopology()); 
} 
} 

И вот The RemoteSubmitter класс:

public class RemoteSubmissionTopo { 

@SuppressWarnings({ "unchecked", "rawtypes", "unused" }) 
public static void main(String... args) { 


    Config conf = new Config(); 
    TopologyBuilder topoBuilder = new TopologyBuilder(); 
    conf.put(Config.NIMBUS_HOST, "117.16.142.49"); 
    conf.setDebug(true); 
    Map stormConf = Utils.readStormConfig(); 
    stormConf.put("nimbus.host", "117.16.142.49"); 
    String jar_path = "T:\\STORM_TOPOLOGIES\\Benchmark.jar"; 


    Client client = NimbusClient.getConfiguredClient(stormConf).getClient(); 

    try { 
     NimbusClient nimbus = new NimbusClient(stormConf, "117.16.142.49", 6627); 
     String uploadedJarLocation = StormSubmitter.submitJar(stormConf, jar_path); 
     String jsonConf = JSONValue.toJSONString(stormConf); 

     nimbus.getClient().submitTopology("benchmark-tp", uploadedJarLocation, jsonConf, topoBuilder.createTopology()); 



    } catch (TTransportException e) { 
     e.printStackTrace(); 
    } catch (AlreadyAliveException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } catch (InvalidTopologyException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } catch (TException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 

    try { 
     Thread.sleep(6000); 
    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
} 
} 

И вот пикт Сторм UI (в случае удаленного представления)

enter image description here

А вот другой шторм UI пикт (в случае ручной подачи)

enter image description here

ответ

2

В RemoteSubmissionTopo вы используете TopologyBuilder topoBuilder = new TopologyBuilder();, но не звоните setSpout(...)/setBolt(...). Таким образом, вы отправляете топологию без операторов ...

КПП: RemoteSubmissionTopo на самом деле вообще не требуется. Вы можете использовать StormBenchmark для отправки удаленно. Просто добавьте conf.put(Config.NIMBUS_HOST, "117.16.142.49"); в main и установите опцию JVM -Dstorm.jar=/path/to/topology.jar, и вы готовы к запуску.

+0

спасибо, я думал, что носик и болт определены в классе топологии Bechmark.jar было достаточно – Humoyun

+0

Если вы используете 'RemoteSubmissionTopo', основной метод' StormBenchmark' никогда не выполняется, а 'RemoteSubmissionTopo' использует собственный экземпляр' TopologyBuilder ' '. Оба класса завершены независимо друг от друга. Таким образом, ясно, что он не может работать. –

Смежные вопросы