2015-12-07 2 views
-1

Я написал код для получения твиттер-твитов с использованием kafka, его работа отлично, но он не работает для разделов. Я хочу создать 3 раздела на одной теме .. как передать значения в класс .. Любые управления разделами предложения, где я делаю неправильноKafka создает разделы с использованием java API

public class kafkaSpoutFetchingRealTweets { 


private String consumerKey; 
private String consumerSecret; 
private String accessToken; 
private String accessTokenSecret; 
private TwitterStream twitterStream; 

/** 
* @param contxt 
*/ 
void start(final Context context) { 

    /** Producer properties **/ 
    Properties props = new Properties(); 
    props.put("metadata.broker.list", 
      context.getString(Constant.BROKER_LIST)); 
    props.put("partitioner.class","SimplePartitioner"); 
    props.put("serializer.class", context.getString(Constant.SERIALIZER)); 
    props.put("request.required.acks", 
      context.getString(Constant.REQUIRED_ACKS)); 
    props.put("producer.type", "async"); 
    // props.put("partitioner.class", context.getClass()); 
    ProducerConfig config = new ProducerConfig(props); 

    final Producer<String, String> producer = new Producer<String, String>(
      config); 

    /** Twitter properties **/ 
    consumerKey = context.getString(Constant.CONSUMER_KEY_KEY); 
    consumerSecret = context.getString(Constant.CONSUMER_SECRET_KEY); 
    accessToken = context.getString(Constant.ACCESS_TOKEN_KEY); 
    accessTokenSecret = context.getString(Constant.ACCESS_TOKEN_SECRET_KEY); 

    ConfigurationBuilder cb = new ConfigurationBuilder(); 
    cb.setOAuthConsumerKey(consumerKey); 
    cb.setOAuthConsumerSecret(consumerSecret); 
    cb.setOAuthAccessToken(accessToken); 
    cb.setOAuthAccessTokenSecret(accessTokenSecret); 
    cb.setJSONStoreEnabled(true); 
    cb.setIncludeEntitiesEnabled(true); 

    twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); 

    /** Twitter listener **/ 
    StatusListener listener = new StatusListener() { 
     // The onStatus method is executed every time a new tweet comes 
     // in. 
     public void onStatus(Status status) { 


      if(("en".equals(status.getLang())) && ("en".equals(status.getUser().getLang()))){ 

       KeyedMessage<String, String> data = new KeyedMessage<String, String>(
         context.getString(Constant.data), 
         DataObjectFactory.getRawJSON(status)); 
       producer.send(data); 
       System.out.println(DataObjectFactory.getRawJSON(status)); 

      } 
     } 
     } 


     public void onDeletionNotice(
       StatusDeletionNotice statusDeletionNotice) { 
     } 

     public void onTrackLimitationNotice(int numberOfLimitedStatuses) { 
     } 

     public void onScrubGeo(long userId, long upToStatusId) { 
     } 

     public void onException(Exception ex) { 
      ex.printStackTrace(); 
      logger.info("Shutting down Twitter sample stream..."); 
      twitterStream.shutdown(); 
     } 

     public void onStallWarning(StallWarning warning) { 
      System.out.println("stallWarning"); 
     } 
    }; 


    String[] lang = { "en" }; 
    fq.language(lang); 
    twitterStream.addListener(listener); 
    twitterStream.sample(); 

} 

public static void main(String[] args) { 
    try { 

     Context context = new Context(args[0]); 
     kafkaSpoutFetchingRealTweets tp = new kafkaSpoutFetchingRealTweets(); 
     tp.start(context); 

    } catch (Exception e) { 
     e.printStackTrace(); 
     logger.info(e.getMessage()); 
    } 

} 

}

+0

Где вы читали, что вы можете создать разделы с помощью API Java? Все, что я прочитал, заключается в том, что нет возможности создавать разделы по API. – morganw09dev

+0

@ morganw09dev ... (http://fbi.wf/dir/Books/Tech/Learning%20Apache%20Kafka,%20Second%20Edition%20by%20Nishant%20Garg.pdf) ... вы можете узнать здесь .. Я не могу выполнить его – Anji

+0

Я не буду читать весь PDF, чтобы помочь решить вашу проблему. Просматривая протокол Kafka, https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol, похоже, нет поддержки для создания разделов через Java API. – morganw09dev

ответ

0

Так есть несколько проблем.

  • Ваш вопрос и код не совпадают. Ваши вопросы задают вопрос о создании темы с тремя разделами. Но приведенный вами код и пример объясняют, как определить, какой раздел следует отправить сообщению, если вы уже создали тему с тремя разделами.
  • Если вы действительно хотите создать тему с тремя разделами, вам нужно использовать клиент командной строки. Образец можно найти здесь: http://kafka.apache.org/documentation.html#quickstart
  • Если вы действительно хотите просто определить, какой раздел вам нужен для отправки данных. Вам нужно будет предоставить дополнительную информацию о реальной проблеме, с которой вы сталкиваетесь? Все ли они идут в один раздел? Затем вам нужно посмотреть, как вы вычисляете раздел в своем классе SimplePartitioner, который вы указываете в своей конфигурации. Что находится в классе SimplePartitioner?

    props.put("partitioner.class","SimplePartitioner"); 
    
+0

@Matthias J. Sax ... о .. Кажется, я смутил вас .. На самом деле я хочу создать kafkaSpout, который получает твиттер-твиты с 3 раздела. Так что в шторме я могу определить параллельный параллельный вывод на 3. На самом деле у меня большое количество латентности в штормовом носике. Для лучшей настройки производительности, которую я прочитал в статье, увеличьте параллелизм носика. Поэтому я хочу создать kafkaSpout с 3 разделами, чтобы я мог получать твиты из этих разделов из шторма. – Anji

+0

Что? Я не Маттиас Дж. Сакс ?? Да, вы меня смущаете. – morganw09dev

+0

Я понял ... что это значит ... 1. Создайте выше kafkaSpout 2. Из командной строки создайте имя темы с разделами, которые нам нужны 3. Выполните банку kafkaSpout. – Anji

Смежные вопросы