2015-12-20 2 views
1

Ошибки при создании DataStream использования fromElements функцииОшибки при создании DataStream в Apache Флинка

Ниже expeption -

Вызванный: java.io.IOException: Не удалась десериализация элемент от источник. Если вы используете пользовательскую сериализацию (Value и Writable types), проверьте функции сериализации. Сериализатор - [email protected]599fcdda at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run (FromElementsFunction.java:121) at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:55) at org. apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:218) на org.apache.flink.runtime.taskmanager.Task.run (Task.java:584) на java.lang.Thread. (Thread.java:745)

+1

Каков тип, который вы предоставляете методу 'fromElements'? –

+0

это InputStreamReader –

+0

Ниже приведен пример - частный статический DataStream getStream (StreamExecutionEnvironment env) { InputStreamReader isr = null; try { URL url = новый URL ("http://www.ex.in/res"); HttpURLConnection httpconn = (HttpURLConnection) url.openConnection(); if (httpconn.getResponseCode()! = 200) throw new RuntimeException («Ошибка: код ошибки HTTP:» + httpconn.getResponseCode()); isr = new InputStreamReader ((httpconn.getInputStream())); } catch (исключение e) {} return env.fromElements (isr); } –

ответ

0

Вы не можете создать DataStream<InputStreamReader> с fromElements, так как InputStreamReader не является сериализуемым. Это требуется методом fromElements. Кроме того, вероятно, не имеет большого смысла работать на InputStreamReaders. Я думаю, было бы лучше просто прочитать данные из HttpURLConnection, а затем продолжить работу над этими данными.

1

Почему вы хотите обработать InputStreamReader кортежи? Наверное, здесь есть понимание промахов. Тип общего типа указывает тип данных, которые вы хотите обработать. Например

DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5); 

Генерировать конечный поток данных с 5 Integer кортежей. Я предполагаю, что вы действительно хотите использовать InputStreamReader для генерации фактических кортежей.

Если вы хотите прочитать с помощью HttpURLConnection вы можете реализовать свой собственный SourceFunction (или RichSourceFunction) следующим образом (заменить OUT с вами фактические данные типа вы хотите использовать - также рассмотреть Flinks Tuple0 к Tuple25 типов):

env.addSource(new SourceFunction<OUT> { 
    private volatile boolean isRunning = true; 

    @Override 
    public void run(SourceContext<OUT> ctx) { 
     InputStreamReader isr = null; 
     try { 
      URL url = new URL("ex.in/res"); 
      HttpURLConnection httpconn = (HttpURLConnection) url.openConnection(); 
      if (httpconn.getResponseCode() != 200) 
       throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode()); 
      isr = new InputStreamReader((httpconn.getInputStream())); 
     } catch (Exception e) { 
      // clean up; log error 
      return; 
     } 

     while(isRunning) { 
      OUT tuple = ... // get data from isr 
      ctx.collect(tuple); 
     } 
    } 

    @Override 
    public void cancel() { 
     this.isRunning = false; 
    } 
}); 
+0

Я очень знаком с Flink. Я пытаюсь прочитать веб-журнал, доступный через URL-адрес HTTP. Поэтому я так пытался. Какие методы/объекты предоставляет Flink, чтобы пользователи могли читать HTTP-потоки? –

+0

Возможно, было бы разумно реализовать пользовательский источник. Просто взгляните на интерфейс SourceFunction.«SourceContext» из метода 'run()' позволяет испускать элементы. –

+0

Просто протяните мой ответ. –