2016-11-15 2 views
1

Во время работы Kafka ->Apache Apex ->Hbase, он говорит следующее исключение в задачах пряжи:InvocationTargetException в пряжи задачи с Hadoop

com.datatorrent.stram.StreamingAppMasterService: Application master, appId=4, clustertimestamp=1479188884109, attemptId=2 
2016-11-15 11:59:51,068 INFO org.apache.hadoop.service.AbstractService: Service com.datatorrent.stram.StreamingAppMasterService failed in state INITED; cause: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException 
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException 
    at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:130) 
    at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:156) 
    at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:241) 
    at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:333) 
    at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:330) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614) 
    at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:330) 
    at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:444) 

И мой журнал DataTorrent показывает следующее исключение. Я запускаю приложение, которое передает приложение Kafka -> Apex -> Hbase.

Connecting to ResourceManager at hduser1/127.0.0.1:8032 
16/11/15 17:47:38 WARN client.EventsAgent: Cannot read events for application_1479208737206_0008: java.io.FileNotFoundException: File does not exist: /user/hduser1/datatorrent/apps/application_1479208737206_0008/events/index.txt 
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66) 
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1893) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1834) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1814) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1786) 
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:552) 
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:362) 
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) 
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) 
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) 
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040) 
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2036) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:422) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) 
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2034) 

Добавление кода:

public void populateDAG(DAG dag, Configuration conf){ 
KafkaSinglePortInputOperator in 
    = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator()); 

in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); 
LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator()); 

dag.addStream("data", in.outputPort, out.input);} 

LineOutputOperator расширяет AbstractFileOutputOperator

private static final String NL = System.lineSeparator(); 
private static final Charset CS = StandardCharsets.UTF_8; 

@NotNull 
private String baseName; 

@Override 
public byte[] getBytesForTuple(byte[] t) { 
    String result = new String(t, CS) + NL; 
    return result.getBytes(CS); 
} 

@Override 
protected String getFileName(byte[] tuple) { 
return baseName; 
} 

public String getBaseName() { return baseName; } 
public void setBaseName(String v) { baseName = v; } 

Как решить эту проблему?

Спасибо.

+0

Видя журналы, я считаю, что оператор не может найти файл. Это может быть связано либо с неправильными настройками пути (проверьте путь к файлу в HDFS), либо путь к файлу не существует (менее вероятно). Предоставьте некоторые дополнительные сведения, такие как используемые операторы, которые могут быть полезны при определении проблемы. –

+0

Я реализовал свой пользовательский оператор вершины, который расширяет BaseOperator и реализует метод process() & endWindow(). И два входа и выхода переменных и их классы - DafaultInput и Output Operators. – syam

ответ

1

Вы можете поделиться некоторыми сведениями о своей среде, например, какой версией хауопа и вершины? Кроме того, в каком журнале это исключение появляется?

Так же, как простая проверка вменяемости, вы можете запустить простой Maven архетип сгенерирован применение описан в: http://docs.datatorrent.com/beginner/

Если это работает, попробуйте запустить приложения FileIO и Кафки на: https://github.com/DataTorrent/examples/tree/master/tutorials

Если те, работе хорошо, мы можем посмотреть на детали вашего кода.

+0

Привет, я использовал пример Kafka, и он работает неправильно и работает для PiDemo и randomNumbers. Моя среда: ОС: Ubuntu-14,04 Кафка: 2_11.0.10.0.1.0 Apex: 3.5.0 Hbase: 1.2.4 – syam

0

Я получил решение,

Проблема, связанная с истечением моей лицензии, поэтому переустанавливать новый и отлично работает для фактического кода.