2014-12-05 3 views
2

У меня есть класс, который отвечает за получение некоторых собранных данных из источника данных и запись сериализованного содержимого этих данных в файл (всегда один и тот же файл). Для этого первое, что я делаю, когда создаю экземпляр, это проверить, существует ли этот файл и создать его, если нет. Кажется, что файл создается без проблем, но проблема возникает, когда я пытаюсь добавить сериализованный объект в файл, используя метод onOperationsBatchSynchronization.Не удалось добавить к HDFS

Это код указанного класса:

public class HDFSSpaceSynchronizationEndpoint extends SpaceSynchronizationEndpoint { 

    private final static Logger LOG = LoggerFactory.getLogger(HDFSSpaceSynchronizationEndpoint.class); 
    private final String uriToFileToWrite; 
    private final HDFSFileUtil hdfsFileUtil; 

    public HDFSSpaceSynchronizationEndpoint(HDFSFileUtil hdfsFileUtil) { 
     Validate.notNull(hdfsFileUtil); 
     this.hdfsFileUtil = hdfsFileUtil; 
     uriToFileToWrite = hdfsFileUtil.getUriToHdfs() + "/object-container"; 
     createFileIfNeeded(); 
    } 

    private void createFileIfNeeded() { 
     final String methodName = "createFileIfNeeded"; 
     synchronized (this) { 
      try { 
       if (!hdfsFileUtil.fileExistsInCluster(uriToFileToWrite)) { 
        hdfsFileUtil.createFileInCluster(uriToFileToWrite); 
       } 
      } catch (IOException e) { 
       LOG.error(methodName, "", "Error creating the file in the cluster: {}", e); 
      } 
     } 
    } 

    @Override 
    public void onOperationsBatchSynchronization(OperationsBatchData batchData) { 
     final String methodName = "onOperationsBatchSynchronization"; 
     LOG.error(methodName, "", "Batch operation received: {}", batchData.getSourceDetails().getName()); 
     DataSyncOperation[] operations = batchData.getBatchDataItems(); 
     synchronized (this) { 
      for (DataSyncOperation operation : operations) { 
       try { 
        hdfsFileUtil.writeObjectToAFile((Serializable) operation.getDataAsObject(), uriToFileToWrite); 
       } catch (IOException e) { 
        LOG.error(methodName, "", "Error writing the object to a file in the cluster: {}", e); 
       } 
      } 
     } 
    } 
} 

И это код класса, отвечающий за взаимодействия с пространством:

public class HDFSFileUtilImpl implements HDFSFileUtil { 

    private final static Logger LOG = LoggerFactory.getLogger(HDFSFileUtilImpl.class); 
    private final static boolean DELETE_RECURSIVELY = true; 
    private final String uriToHdfs; 
    private final FileSystem fileSystem; 

    public HDFSFileUtilImpl(HDFSConfiguration config, String uriToHdfs, String user) { 
     Validate.notNull(config); 
     Validate.notEmpty(uriToHdfs); 
     Validate.notEmpty(user); 
     this.uriToHdfs = uriToHdfs; 
     try { 
      fileSystem = FileSystem.get(new URI(uriToHdfs), config.getConfiguration(), user); 
     } catch (IOException | URISyntaxException | InterruptedException e) { 
      LOG.error("constructor", "", "HDFSFileUtilImpl constructor failed: {}", e); 
      throw new IllegalStateException(e); 
     } 
    } 

    @Override 
    public String getUriToHdfs() { 
     return uriToHdfs; 
    } 

    @Override 
    public void writeObjectToAFile(Serializable obj, String fileUri) throws IOException { 
     Validate.notNull(obj); 
     Validate.notEmpty(fileUri); 
     FSDataOutputStream out; 
     if (!fileExistsInCluster(fileUri)) { 
      throw new IllegalArgumentException("File with URI: " + fileUri + " does not exist in the cluster"); 
     } 
     out = fileSystem.append(new Path(fileUri)); 
     byte[] objByteArray = getBytesFromObject(obj); 
     out.write(objByteArray); 
     out.close(); 
    } 

    private byte[] getBytesFromObject(Object obj) throws IOException { 
     byte[] retByteArray = null; 
     // try/catch used only to be able to use "try with resources" feature 
     try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = new ObjectOutputStream(bos);) { 
      out.writeObject(obj); 
      retByteArray = bos.toByteArray(); 
     } catch (IOException e) { 
      throw new IOException(e); 
     } 
     return retByteArray; 
    } 

    @Override 
    public void createFileInCluster(String uriOfFile) throws IOException { 
      Validate.notEmpty(uriOfFile); 
      fileSystem.create(new Path(uriOfFile)); 
    } 

    @Override 
    public boolean fileExistsInCluster(String uri) throws IOException { 
     Validate.notEmpty(uri); 
     boolean result = false; 
     result = fileSystem.exists(new Path(uri)); 
     return result; 
    } 

    ... 
} 

Источник данных имеет три соединения, установленные с моим компонентом, и метод onOperationsBatchSynchronization получает вызов одновременно. Вот почему используются блоки синхронизации, но даже с ними я получаю следующее исключение из журналов:

10:09:23.727 ERROR - onOperationsBatchSynchronization 
org.apache.hadoop.ipc.RemoteException: Failed to create file [/object-container] for [DFSClient_NONMAPREDUCE_1587728611_73] for client [127.0.0.1], because this file is already being created by [DFSClient_NONMAPREDUCE_1972611521_106] on [127.0.0.1] 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2636) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2462) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2700) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2663) 
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:559) 
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:388) 
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) 
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) 
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) 
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013) 
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:415) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614) 
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007) 

Так что может быть проблема? У меня есть некоторые модульные тесты (больше похоже на интеграцию, поскольку они полагаются на текущую настройку Hadoop), и все методы на HDFSFileUtilImpl работают правильно и дают ожидаемые результаты.

EDIT: Я просто попытался записать файлы в кластере, а не добавлять к одному файлу, и он отлично работает. Поэтому я откажусь от любой проблемы с разрешением.

+0

Я думаю, что есть некоторые разрешения проблемы из-за которой его не удалось создать файл – Mr37037

+0

Но он способен создайте файл. Выполняя некоторые тесты, я пытался создавать файлы, а не просто добавлять их, и он создает новые файлы без каких-либо проблем. Я отредактирую сообщение, чтобы сообщить об этом – jbarren

ответ

1

Наконец-то избавился от этой ошибки. По-видимому, необходимо закрыть FSDataOutputStream, который возвращается, когда вы звоните create от filesystem.

Это, как говорится, это как метод createFileInCluster от HDFSFileUtilImpl реализуется в настоящее время:

@Override 
public void createFileInCluster(String uriOfFile) throws IOException { 
     Validate.notEmpty(uriOfFile); 
     FSDataOutputStream out = fileSystem.create(new Path(uriOfFile)); 
     out.close(); 
} 
+0

действительно ли это приложение к существующему файлу? Я нашел противоречивую информацию о том, было ли добавлено дополнение к файлу в hadoop после того, как оно было вытеснено около 2009 года. – matanster

+1

Нет, этот код указывает только на то, что не так в коде, представленном сверху. Проблема заключалась в том, что файл не был закрыт после создания, но код, добавленный к файлу, все тот же, что и в исходном коде. Посмотрите на метод, называемый 'writeObjectToAFile' – jbarren

+0

На самом деле я ничего не нашел о методе writeObjectToAFile hdfsFileUtil, в Google. – matanster

Смежные вопросы