В настоящее время я работаю над проектом, чтобы получить значения датчика от TI-SensorTag CC2650, отправьте эти значения с помощью скрипта python через HTTP (через Apache-TomCat-Servlet) в Apache- Kafka и подключить Kafka с Apache-Storm для обработки данных.Возможно ли, чтобы топология в кластере записывала в txt-файл в локальной файловой системе? (запуск кластера в той же системе)
Эти данные будут записаны в .txt-файл в моей локальной системе (каталог Apache-Storm-Cluster-Folder) с помощью болта в топологии.
Я только начал с Бури и Кафки несколько дней назад, и я был смущен о следующем:
Если я запускаю топологию на локальном кластере, все работает отлично. Но если я отправлю его в «обычный» кластер, работающий на localhost: 8888, то он просто ничего не делает.
The Storm-UI показывает топологию, но, похоже, на входящие сообщения от kafka нет реакции.
Не должен ли быть тест на локальном кластере и функции на реальном кластере? Или у кластера просто нет разрешения на запись/изменение файлов в моей локальной системе?
Дополнительная информация:
Обзор (соединения между "системами"):
Как это должно работать?
Когда я пишу сообщение к теме в Кафке, то Кафка-носик топологии должен захватить это сообщение и записать его в .txt-файл на моей локальной файловой системе.
Мой код (банка-с зависимостями) расположен в здании:
«/home/tobias/storm/apache-storm-0.9.2-incubating/mycode/StormKafkaTopology/target /»
и им пытаются писать в output.txt расположенного по адресу:
"/home/tobias/storm/apache-storm-0.9.2-incubating/mycode/StromKafkaTopology"/tmp/"
Код топологии:
public class StormKafkaTopology {
public static void main(String[] args) throws Exception {
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "mytopic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" +topic, UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
//kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("kafka-spout");
if (args != null && args.length >0) {
config.setNumWorkers(6);
config.setNumAckers(6);
//config.setMaxSpoutPending(100);
//config.setMessageTimeoutSecs(20);
StormSubmitter.submitTopology("StormKafkaTopology", config, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormKafkaTopology", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("StormKafkaTopology");
cluster.shutdown();
}
}}
Код для PrinterBolt:
public class PrinterBolt extends BaseBasicBolt {
/*
* execute-method will be opened if tuples are processed
*/
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String msg = tuple.getString(0);
System.out.println("======before write file======");
try {
// set file directory:
File file = new File("/home/tobias/storm/apache-storm-0.9.2-incubating/mycode/StormKafkaTopology/tmp/output.txt");
if(!file.exists()) {
file.createNewFile();
}
//create a FileWriter
FileWriter fw = new FileWriter(file.getAbsoluteFile(), true);
//create a BufferedWriter
BufferedWriter bw = new BufferedWriter(fw);
//write into the file
bw.write(msg + "\n");
//close the BufferedWriter (IMPORTANT)
bw.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("======after write file======");
//you could emit some Date here for further processing:
//collector.emit(new Values(msg));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("output"));
}}
Я был бы признателен, если кто-то смог бы указать на мои ошибки, и может дать совет.
вы проверили журналы рабочих? – mrnakumar
Я искал в 3 моих журналах worker-xxxx.log, но ничего не нашел об этом проекте. Извините за вопрос, но как определить, какой рабочий создал журнал? (Я знаю, что порты рабочих похожи на номера в имени журнала, но как они связаны?) –