2016-07-13 4 views
1

Привет, я новичок в Storm and Kafka. Я использую шторм 1.0.1 и kafka 0.10.0 у нас есть kafkaspout, который получит java bean из темы kafka. Я провел несколько часов, чтобы найти правильный подход для этого. Нашли несколько статей, которые полезны, но ни один из подходов не работал для меня до сих пор.Storm Kafkaspout KryoSerialization issue for java bean from kafka topic

Ниже мои коды:

StormTopology:

public class StormTopology { 

public static void main(String[] args) throws Exception { 
    //Topo test /zkroot test 
    if (args.length == 4) { 
     System.out.println("started"); 
     BrokerHosts hosts = new ZkHosts("localhost:2181"); 

     SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2], 
       args[3]); 

     kafkaConf1.zkRoot = args[2]; 
     kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true; 
     kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 
     kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme()); 
     KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1); 

     System.out.println("started"); 

     ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]); 
     AnalysisBolt analysisBolt = new AnalysisBolt(args[1]); 
     TopologyBuilder topologyBuilder = new TopologyBuilder(); 
     topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1); 
     //builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout"); 
     //This is for field grouping in bolt we need two bolt for field grouping or it wont work 
     topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout"); 
     topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip")); 
     Config config = new Config(); 
     config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class); 
     config.setDebug(true); 
     config.setNumWorkers(1); 

     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology(args[0], config, topologyBuilder.createTopology()); 

     // StormSubmitter.submitTopology(args[0], config, 
     // builder.createTopology()); 

    } else { 
     System.out 
       .println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID"); 
    } 
} 

}

Я сериализации данные в Кафки, используя Kryo

KafkaProducer:

public class StreamKafkaProducer { 

private static Producer producer; 
private final Properties props = new Properties(); 
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer(); 

private StreamKafkaProducer(){ 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "com.abc.serializer.MySerializer"); 
    producer = new org.apache.kafka.clients.producer.KafkaProducer(props); 
} 

public static StreamKafkaProducer getStreamKafkaProducer(){ 
    return KAFKA_PRODUCER; 
} 

public void produce(String topic, VehicleTrip vehicleTrip){ 
    ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip); 
    producer.send(producerRecord); 
    //producer.close(); 
} 

public static void closeProducer(){ 
    producer.close(); 
} 

}

Kyro Serializer:

public class DataKyroSerializer extends Serializer<Data> implements Serializable { 
@Override 
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) { 
    output.writeLong(data.getStartedOn().getTime()); 
    output.writeLong(data.getEndedOn().getTime()); 
} 

@Override 
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) { 
    Data data = new Data(); 
    data.setStartedOn(new Date(input.readLong())); 
    data.setEndedOn(new Date(input.readLong())); 
    return data; 
} 

Мне нужно, чтобы получить данные обратно в бобе данных.

Согласно несколько статей необходимо предоставить с пользовательской схемы и сделать его частью топологии, но до сих пор я не везёт

Код для болта и схемы

Схема:

public class KryoScheme implements Scheme { 

    private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() { 
     protected Kryo initialValue() { 
      Kryo kryo = new Kryo(); 
      kryo.addDefaultSerializer(Data.class, new DataKyroSerializer()); 
      return kryo; 
     }; 
    }; 

    @Override 
    public List<Object> deserialize(ByteBuffer ser) { 
     return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class)); 
    } 

    @Override 
    public Fields getOutputFields() { 
     return new Fields("data"); 
    } 

} 

и болт:

public class AnalysisBolt implements IBasicBolt { 
/** 
* 
*/ 
private static final long serialVersionUID = 1L; 
private String topicname = null; 

public AnalysisBolt(String topicname) { 
    this.topicname = topicname; 
} 

public void prepare(Map stormConf, TopologyContext topologyContext) { 
    System.out.println("prepare"); 
} 

public void execute(Tuple input, BasicOutputCollector collector) { 
    System.out.println("execute"); 

    Fields fields = input.getFields(); 
    try { 

     JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input 
       .getValueByField(fields.get(1))); 
     String StartTime = (String) eventJson.get("startedOn"); 
     String EndTime = (String) eventJson.get("endedOn"); 
     String Oid = (String) eventJson.get("_id"); 
     int V_id = (Integer) eventJson.get("vehicleId"); 
     //call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime) 

     System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime); 

} catch (Exception e) { 
    e.printStackTrace(); 

} 

} 

но если представить топологию штормовой я получаю ошибка:

java.lang.IllegalStateException: Spout 'kafkaspout' contains a 
non-serializable field of type com.abc.topology.KryoScheme$1, which 
was instantiated prior to topology creation. 
com.minda.iconnect.topology.KryoScheme$1 should be instantiated within 
the prepare method of 'kafkaspout at the earliest. 

Цените помощь для отладки проблемы и указания по правильному пути.

Благодаря

ответ

1

Ваш ThreadLocal не Сериализуемый. Предпочтительным решением было бы сделать ваш сериализатор как Serializable, так и потокобезопасным. Если это невозможно, я вижу 2 альтернативы, так как нет способа подготовки, так как вы попадаете в болт.

  1. Заявить об этом как статическом, который по своей сути является временным.
  2. Заявить об этом переходный процесс и получить доступ к нему через частный метод получения. Затем вы можете инициализировать переменную при первом доступе.
0

В жизненном цикле шторма, топология экземпляра, а затем в последовательной форме в формат байт для хранения в Zookeeper, до топологии выполняются. В течение этого шага, если носик или болт в топологии имеет инициализированное свойство неэриализуемого значения, сериализация завершится неудачей.

Если есть необходимость в несериализуемом поле, инициализируйте его в методе подготовки болта или желоба, который запускается после того, как топология доставляется работнику.

Источник: Best Practices for implementing Apache Storm

Смежные вопросы