2015-12-14 5 views
0

Производитель Кодекс НижеКафка потребитель не в состоянии десериализации выбрасывает индекс вне границ

static PltPage pltPage; 
public static void main(String[] args) throws IOException { 

    Short itemtype = 1; 
    Properties props = new Properties(); 
    props.put("metadata.broker.list", "localhost:9092"); 
    props.put("partitioner.class", "com.rms.com.SimplePartitioner"); 
    props.put("serializer.class", "com.rms.com.CustomSerializer"); 
    props.put("request.required.acks", "1"); 

    ProducerConfig config = new ProducerConfig(props); 
    Producer<String,PltResultPage> producer = new Producer<String,PltResultPage>(config); 

     String folder = new File(".").getAbsoluteFile().getPath(); 
     String parent = new File(folder).getParentFile().getParent(); 
     String path = parent + "/KafkaProducerSparkConsumer/src/resources/PortfolioPLT.txt"; 
     FileReader fr = new FileReader(path); 
     BufferedReader br = new BufferedReader(fr); 
     String sCurrentLine; 
     List<Integer> periodIds = new ArrayList<Integer>(); 
     List<Integer> sampleIds = new ArrayList<Integer>(); 
     List<Integer> eventIds = new ArrayList<Integer>(); 
     List<Integer> dates = new ArrayList<Integer>(); 
     List<Double> losses = new ArrayList<Double>(); 

     while ((sCurrentLine = br.readLine()) != null) { 
      String [] entries = sCurrentLine.toString().split("~"); 
      if (entries[1].equalsIgnoreCase("GR")) 
      { 
       periodIds.add(Integer.parseInt(entries[2])); 
       sampleIds.add(Integer.parseInt(entries[3])); 
       eventIds.add(Integer.parseInt(entries[4])); 
       dates.add(2040); 
       losses.add(Double.parseDouble(entries[6])); 
      } 
     } 

    pltPage = ExportUtilities.generatepltpage(periodIds,sampleIds,eventIds,dates,losses,periodIds.size(),1000000000002L,Short.parseShort("1")); 
    PltResultPage resultPage = new PltResultPage(); 
    resultPage.setAnalysisId(1); 
    resultPage.setExternalID("1"); 
    resultPage.setItemId(1L); 
    resultPage.setItemType(itemtype); 
    resultPage.setOutputProperty(itemtype); 
    resultPage.setResultType(itemtype); 
    resultPage.setResultPage(pltPage); 
    resultPage.setJobId(1L); 
     KeyedMessage<String, PltResultPage> message = new KeyedMessage<String, PltResultPage>("test", resultPage); 
     producer.send(message); 
    producer.close(); 
} 

потребителей кодекса ниже

Properties props = new Properties(); 
     props.put("zookeeper.connect", a_zookeeper); 
     props.put("group.id", a_groupId); 
     props.put("zookeeper.session.timeout.ms", "20000"); 
     props.put("zookeeper.sync.time.ms", "3000"); 
     props.put("auto.commit.interval.ms", "2000"); 
     props.put("auto.offset.reset", "smallest"); 
     props.put("serializer.class", "com.rms.com.CustomSerializer"); 

Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(a_numThreads)); 
     final StringDecoder decoder = 
       new StringDecoder(new VerifiableProperties(returnProperties(zooKeeper,groupId))); 
     final CustomSerializer decoder2 = new CustomSerializer(new VerifiableProperties(returnProperties(zooKeeper,groupId))); 
     final Map<String, List<KafkaStream<String, PltResultPage>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, decoder, decoder2); 
     final List<KafkaStream<String, PltResultPage>> streams = consumerMap.get(topic); 
     executor = Executors.newFixedThreadPool(a_numThreads); 
     int threadNumber = a_numThreads; 
     for(KafkaStream stream : streams) { 
      executor.submit(new ExecuteConsumerClient(stream, threadNumber)); 
      threadNumber++; 
     } 

System.out.println("calling ExecuteConsumerClient.run()"); 
     ConsumerIterator<String,PltResultPage> it = m_stream.iterator(); 

     while (it.hasNext()) 
     { 
      try { 
       CreateJavaSparkContext(); 
       System.out.println("Converting to ResultPage"); 
       PltResultPage pltResultPage = (PltResultPage)it.next().message(); 
       System.out.println("Before Impl Accept"); 
       sparkExportPLTToFile.accept(pltResultPage.getJobId(), pltResultPage.getItemId(), pltResultPage.getItemType(), pltResultPage.getOutputProperty(), pltResultPage.getResultType(), pltResultPage.getResultPage(), pltResultPage.getAnalysisId(), pltResultPage.getExternalID()); 
      } 
      catch (Exception e) 
      { 
       System.out.println("Exception in it.Run" + e.getStackTrace().toString()); 
      } 
      System.out.println("Executed impl for thread " + m_threadNumber); 
     } 
     System.out.println("Shutting down Thread: " + m_threadNumber); 
    } 

сбиваться при ИЦ я пытаюсь преобразовать сообщение для объекта. Я получил код пользовательского Serializer с одного из сообщений, и он выглядит ниже. Может кто-нибудь указать, что случилось с реализацией. Я попытался использовать FromBytes из пользовательского сериализатора, и это не помогло. Сериализатору возвращает нулевой объект

Выборочная Serializer

public class CustomSerializer implements Encoder<PltResultPage>, Decoder<PltResultPage> { 
    public CustomSerializer(VerifiableProperties verifiableProperties) { 
     /* This constructor must be present for successful compile. */ 
    } 

    public CustomSerializer() { 

    } 


    public byte[] toBytes(PltResultPage o) { 
     try { 
      ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
      ObjectOutputStream oos = new ObjectOutputStream(baos); 
      oos.writeObject(o); 
      oos.close(); 
      byte[] b = baos.toByteArray(); 
      return b; 
     } catch (IOException e) { 
      return new byte[0]; 
     } 
    } 

    @Override 
    public PltResultPage fromBytes(byte[] bytes) { 
     try { 
      return (PltResultPage) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject(); 
     } catch (Exception e) { 
      return null; 
     } 
    } 
} 

PltResultPage ниже.

public class PltResultPage implements Serializable { 
    private Long jobId; 
    private Long itemId; 
    private Short itemType; 
    private Short outputProperty; 
    private Short resultType; 
    private LossPage resultPage; 
    private Integer analysisId; 
    private String externalID; 
private static final long serialVersionUID = 0L; 

    public Long getJobId() 
    {return this.jobId;} 

    public Long getItemId() 
    {return this.itemId;} 

    public String getExternalID() 
    {return this.externalID;} 

    public Short getItemType() 
    {return this.itemType;} 

    public Short getOutputProperty() 
    {return this.outputProperty;} 

    public Short getResultType() 
    { return this.resultType;} 

    public LossPage getResultPage() 
    {return this.resultPage;} 

    public Integer getAnalysisId() 
    {return this.analysisId;} 

    public void setJobId(Long jobid) 
    {this.jobId = jobid;} 

    public void setOutputProperty(Short output) 
    {this.outputProperty = output;} 

    public void setItemId(Long itemId) 
    {this.itemId = itemId;} 

    public void setItemType(Short type) 
    { 
     this.itemType = type; 
    } 

    public void setResultType(Short resultType) 
    { 
     this.resultType = resultType; 
    } 

    public void setResultPage(LossPage page) 
    {this.resultPage = page;} 

    public void setAnalysisId(Integer id) 
    { 
     this.analysisId = id; 
    } 

    public void setExternalID(String externalID) 
    { 
     this.externalID = externalID; 
    } 
} 
+0

Пожалуйста, разместите код для PltResultPage. Также убедитесь, что PltResultPage (1) реализует сериализуемое значение и (2) имеет переменную serialVersionUID, объявленную –

+0

. Спасибо за отзыв. Не могли бы вы посмотреть на код. Я добавил Plt ResultPage, пользовательский сериализатор возвращает null для FromBytes. Я получаю исключение индекса за пределами. – user3897533

ответ

0

Попробуйте добавить

private static final long serialVersionUID = 0L; 

к PltResultPage. Вы не видите этого, но это значение получает сериализован вместе с другими значениями и при десериализации это значение сравнивается со значением в загруженном классе в текущей JVM. Если значения разные, сериализация завершится неудачно, и вы получите нулевой результат, даже если вы используете тот же исходный код для PltResultPage как в JVM-клиента, так и производителя. Если вы не укажете serialVersionUID для класса, JVM будет составлять для вас значение, и можно с уверенностью утверждать, что случайное значение для serialVersionUID в потребительской JVM будет отличаться от случайного значения для serialVersionUID в JVM производителя.

Короче говоря, если вы используете стандартную сериализацию/десериализацию Java в пользовательском сериализаторе, вы должны объявить serialVersionUID в классах сериализуемых объектов.

+0

Крис, я все еще получаю индекс за пределами. Не могли бы вы полностью рассмотреть приведенный выше код. Я передаю String, PltResultPage у производителя и в Consumer, я добавил сериализатор как Custom, у KafkaStream есть у потребителя. Можете ли вы проверить свойства как у потребителя, так и у производителя и проверить, правильно ли указано все? – user3897533

+0

Можете ли вы подтвердить, что вы не читаете старые сообщения, которые были написаны до того, как вы добавили serialVersionUID в свой код? Мне нужно было изменить название темы в таких случаях, чтобы это было просто. –

+0

Я попытался, изменив имя темы и имя группы, это не помогло. Не могли бы вы взглянуть на полный код, указанный выше, и указать на то, что я просто пропал без вести. Я сейчас не в курсе! – user3897533

Смежные вопросы