2017-01-17 1 views
0

При экспериментировании с потоковой передачей Flink вместе с Cassandra у меня возникла интересная проблема при попытке генерировать инструкции INSERT в MapFunction. Если бы я использовал DataStream<Insert>, я получил бы запутанный RuntimeException, брошенный на меня. Однако, используя DataStream<Statement>, все работало так, как я ожидал, хотя я все еще использую экземпляр Insert в коде, который выполняется.RuntimeException при использовании MapFunction Flink с Cassandra «Insert», но не с «Statement»

Я нашел решение (используя DataStream<Statement>) методом проб и ошибок, но я все еще смущен тем, что вызывает это. Это намеренно или ошибка? Я не смог найти никаких объяснений по поиску в Google, так что спросите, если кто-нибудь знает, что происходит.

Ожидаемые результаты (с использованием DataStream<Statement>):

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor). 
log4j:WARN Please initialize the log4j system properly. 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-638132790] 
01/17/2017 15:57:42 Job execution switched to status RUNNING. 
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to RUNNING 
INSERT INTO tablename (name,age) VALUES ('Test Nameson',27); 
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to FINISHED 
01/17/2017 15:57:42 Job execution switched to status FINISHED. 

вывод ошибок (с использованием DataStream<Insert>):

Exception in thread "main" java.lang.RuntimeException: The field private java.util.List com.datastax.driver.core.querybuilder.BuiltStatement.values is already contained in the hierarchy of the class com.datastax.driver.core.querybuilder.BuiltStatement.Please use unique field names through your classes hierarchy 
    at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1762) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1683) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1580) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1479) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:737) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:565) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:366) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:305) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:120) 
    at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:506) 
    at se.hiq.bjornper.testenv.cassandra.SOCassandraQueryTest.main(SOCassandraQueryTest.java:51) 

пример кода (переключатель комментируемой код для двух различных случаев):

import java.util.HashMap; 
import java.util.Map; 
import java.util.Map.Entry; 

import org.apache.flink.api.common.functions.MapFunction; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; 

import com.datastax.driver.core.Statement; 
import com.datastax.driver.core.querybuilder.Insert; 
import com.datastax.driver.core.querybuilder.QueryBuilder; 

public class SOCassandraQueryTest { 

    public static void main(String[] args) throws Exception { 

     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setParallelism(1); 

     DataStream<Map<String, Object>> myDataStream = env.addSource(new RichSourceFunction<Map<String, Object>>() { 

      @Override 
      public void run(SourceContext<Map<String, Object>> ctx) throws Exception { 
       Map<String, Object> map = new HashMap<String, Object>(); 
       map.put("name", "Test Nameson"); 
       map.put("age", 27); 
       ctx.collect(map); 
      } 

      @Override 
      public void cancel() { 
      } 
     }); 

     /* Works just fine */ 
     DataStream<Statement> debugDatastream = myDataStream.map(new MapFunction<Map<String, Object>, Statement>() { 

      @Override 
      public Statement map(Map<String, Object> datarow) throws Exception { 
       Insert insert = QueryBuilder.insertInto("tablename"); 

       for (Entry<String, Object> e : datarow.entrySet()) { 
        insert.value(e.getKey(), e.getValue()); 
       } 
       return insert; 
      } 
     }); 

     /* Throws RuntimeException if using "Insert" instead of "Statement" */ 
//  DataStream<Insert> debugDatastream = myDataStream.map(new MapFunction<Map<String, Object>, Insert>() { 
// 
//   @Override 
//   public Insert map(Map<String, Object> datarow) throws Exception { 
//    Insert insert = QueryBuilder.insertInto("tablename"); 
// 
//    for (Entry<String, Object> e : datarow.entrySet()) { 
//     insert.value(e.getKey(), e.getValue()); 
//    } 
//    return insert; 
//   } 
//  }); 

     debugDatastream.print(); 

     env.execute("CassandraQueryTest"); 
    } 
} 

Окружающая среда:

  • Java 8
  • Flink 1.1.3 (драйвер Cassabdra из этого пакета Maven)
  • Eclipse IDE

ответ

0

Flink анализирует типы вы отправляете по проводам для создания быстрых сериализаторов, и для доступа к вашим ключам при построении окон или при перетасовке данных по сети.

Проблема здесь, вероятно, следующее: - При использовании Insert как тип пользователя, Flink пытается сгенерировать PojoSerializer для типа, но не может с этим RuntimeException. Я считаю, что поведение неверно. Я зарегистрировал bug report в проекте Flink для проблемы. - Для Statement Флинк видит, что он не может сериализовать тип как POJO, поэтому он возвращается к универсальному сериализатору (в случае Flink Kryo).

Я думаю, что эта документация страница ближе мы описываем, как стек сериализацию FLiNK работает: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html

+1

Я открыл PR для этой ошибки: https://github.com/apache/flink/pull/3154 – twalthr

Смежные вопросы