2016-12-05 2 views
1

В настоящее время я работаю над проектом, чтобы получить значения датчика от TI-SensorTag CC2650, отправьте эти значения с помощью скрипта python через HTTP (через Apache-TomCat-Servlet) в Apache- Kafka и подключить Kafka с Apache-Storm для обработки данных.Возможно ли, чтобы топология в кластере записывала в txt-файл в локальной файловой системе? (запуск кластера в той же системе)

Эти данные будут записаны в .txt-файл в моей локальной системе (каталог Apache-Storm-Cluster-Folder) с помощью болта в топологии.

Я только начал с Бури и Кафки несколько дней назад, и я был смущен о следующем:

Если я запускаю топологию на локальном кластере, все работает отлично. Но если я отправлю его в «обычный» кластер, работающий на localhost: 8888, то он просто ничего не делает.

The Storm-UI показывает топологию, но, похоже, на входящие сообщения от kafka нет реакции.

Не должен ли быть тест на локальном кластере и функции на реальном кластере? Или у кластера просто нет разрешения на запись/изменение файлов в моей локальной системе?


Дополнительная информация:


Обзор (соединения между "системами"): Overview of the connections of this project

Как это должно работать?

Когда я пишу сообщение к теме в Кафке, то Кафка-носик топологии должен захватить это сообщение и записать его в .txt-файл на моей локальной файловой системе.

Мой код (банка-с зависимостями) расположен в здании:

«/home/tobias/storm/apache-storm-0.9.2-incubating/mycode/StormKafkaTopology/target /»

и им пытаются писать в output.txt расположенного по адресу:

"/home/tobias/storm/apache-storm-0.9.2-incubating/mycode/StromKafkaTopology"/tmp/"

Код топологии:

public class StormKafkaTopology { 

public static void main(String[] args) throws Exception { 

    Config config = new Config(); 
    config.setDebug(true); 
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); 
    String zkConnString = "localhost:2181"; 
    String topic = "mytopic"; 
    BrokerHosts hosts = new ZkHosts(zkConnString); 

    SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" +topic, UUID.randomUUID().toString()); 
    kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4; 
    kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4; 
    //kafkaSpoutConfig.forceFromStart = true; 
    kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

    TopologyBuilder builder = new TopologyBuilder(); 
    builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig)); 
    builder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("kafka-spout"); 

    if (args != null && args.length >0) { 
     config.setNumWorkers(6); 
     config.setNumAckers(6); 
     //config.setMaxSpoutPending(100); 
     //config.setMessageTimeoutSecs(20); 
     StormSubmitter.submitTopology("StormKafkaTopology", config, builder.createTopology()); 
    } else { 
     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("StormKafkaTopology", config, builder.createTopology()); 
     Utils.sleep(10000); 
     cluster.killTopology("StormKafkaTopology"); 
     cluster.shutdown(); 
    } 
}} 

Код для PrinterBolt:

public class PrinterBolt extends BaseBasicBolt { 
/* 
* execute-method will be opened if tuples are processed 
*/ 
@Override 
public void execute(Tuple tuple, BasicOutputCollector collector) { 
    String msg = tuple.getString(0); 
    System.out.println("======before write file======"); 
    try { 
     // set file directory: 
     File file = new File("/home/tobias/storm/apache-storm-0.9.2-incubating/mycode/StormKafkaTopology/tmp/output.txt"); 
     if(!file.exists()) { 
      file.createNewFile(); 
     } 
    //create a FileWriter 
    FileWriter fw = new FileWriter(file.getAbsoluteFile(), true); 
    //create a BufferedWriter 
    BufferedWriter bw = new BufferedWriter(fw); 
    //write into the file 
    bw.write(msg + "\n"); 
    //close the BufferedWriter (IMPORTANT) 
    bw.close(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
    System.out.println("======after write file======"); 
    //you could emit some Date here for further processing: 
    //collector.emit(new Values(msg)); 
} 

@Override 
public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    declarer.declare(new Fields("output")); 
}} 

Я был бы признателен, если кто-то смог бы указать на мои ошибки, и может дать совет.

+0

вы проверили журналы рабочих? – mrnakumar

+0

Я искал в 3 моих журналах worker-xxxx.log, но ничего не нашел об этом проекте. Извините за вопрос, но как определить, какой рабочий создал журнал? (Я знаю, что порты рабочих похожи на номера в имени журнала, но как они связаны?) –

ответ

1

Если вы можете запустить его «локально», то это хороший первый шаг. Также звучит так, как будто вы можете добраться до пользовательского интерфейса Storm, что хорошо. После того, как вы отправите топологию, она должна появиться в пользовательском интерфейсе Storm, а затем вы можете щелкнуть по ней, чтобы увидеть втулки и болты в топологии. Нажмите каждый из отверстий/болтов, а затем щелкните по портам (по одному для каждого рабочего), чтобы просмотреть журналы в пользовательском интерфейсе.

Я предполагаю, что где-то есть ошибка. Время, чтобы начать рыть через штормовые/кафковые журналы, чтобы узнать, что это.

В: Как определить, какой рабочий создал журнал? Каждому работнику назначается порт. Каждый рабочий журнал представляет собой комбинацию топологического имени + порт. Для вас просто найдите самый последний журнал и посмотрите, что в нем.

Пара вещей:

  • Начать с 1 работника, то проще
  • Update the logging in your PrinterBolt to use SLF4J, так что вы можете увидеть сообщения в журналах шторма и в пользовательском интерфейсе
  • Добавить TRY/поймать и в событие исключения использует collector.reportError(e);, чтобы сообщить об ошибке. Тогда он будет красным в буре ui!
  • Небольшое уточнение на диаграмме, брокер порт Кафка 9092 не 2181 ... 2181 для зоопарка только
+0

Большое спасибо за ваш ответ. Я посмотрю на ваш список «вещей» и уверен, что там есть действительно полезные качества жизни. Я просто признал, что топология не отображает ни исполнителей, ни работников (оба 0) в интерфейсе шторма. Я что-то упускаю? –

+0

Хорошо, это покажет исполнителям и работникам, если я снова заведующий супервизором ... но когда я использую команду «bin/storm supervisor», тогда супервизор появляется только в течение короткого периода времени (<3 сек) в шторм-уй. Это нормальный случай? –

+0

Звучит так, как будто ваш диспетчер рушится. Вы пропустили настройку шторма и отредактировали свой storm.yaml? http://storm.apache.org/releases/1.0.1/Setting-up-a-Storm-cluster.html –

Смежные вопросы