2016-03-22 2 views
0

Я получаю поток сообщений 3MB из темы kafka, но значение по умолчанию - 1 МБ. Теперь я изменил свойства kafka с 1 МБ на 3 МБ, добавив ниже строки в файле kafa consumer.properties и server.properties.Как изменить значение по умолчанию kafka Класс SpoutConfig

fetch.message.max.bytes=2048576 (consumer.properties) 
filemessage.max.bytes=2048576 (server.properties) 
replica.fetch.max.bytes=2048576 (server.properties) 

Теперь после добавления вышеуказанных строк в Кафке, данные сообщений 3MB происходит в журналы Кафка данных. Но STORM не может обработать данные 3 МБ и может читать только размер по умолчанию, то есть 1 МБ данных.

Итак, как изменить эти конфигурации для обработки/чтения данных 3 МБ. Вот мой класс топологии.

String argument = args[0]; 
    Config conf = new Config(); 
    conf.put(JDBC_CONF, map); 
    conf.setDebug(true); 
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); 
    //set the number of workers 
    conf.setNumWorkers(3); 

    TopologyBuilder builder = new TopologyBuilder(); 

    //Setup Kafka spout 
    BrokerHosts hosts = new ZkHosts("localhost:2181"); 
    String topic = "year1234"; 
    String zkRoot = ""; 
    String consumerGroupId = "group1"; 
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId); 

     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 
    builder.setSpout("KafkaSpout", kafkaSpout,1); 


    builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout"); 

    builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details"); 
+0

http://stackoverflow.com/questions/36159768/how-to-set-spoutconfig-from-default-setting дубликата – LiozM

ответ

0

Добавить следующие строки ниже

SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId); 

spoutConfig.fetchSizeBytes = 3048576; 
spoutConfig.bufferSizeBytes = 3048576; 
Смежные вопросы