У меня есть класс, который отвечает за получение некоторых собранных данных из источника данных и запись сериализованного содержимого этих данных в файл (всегда один и тот же файл). Для этого первое, что я делаю, когда создаю экземпляр, это проверить, существует ли этот файл и создать его, если нет. Кажется, что файл создается без проблем, но проблема возникает, когда я пытаюсь добавить сериализованный объект в файл, используя метод onOperationsBatchSynchronization
.Не удалось добавить к HDFS
Это код указанного класса:
public class HDFSSpaceSynchronizationEndpoint extends SpaceSynchronizationEndpoint {
private final static Logger LOG = LoggerFactory.getLogger(HDFSSpaceSynchronizationEndpoint.class);
private final String uriToFileToWrite;
private final HDFSFileUtil hdfsFileUtil;
public HDFSSpaceSynchronizationEndpoint(HDFSFileUtil hdfsFileUtil) {
Validate.notNull(hdfsFileUtil);
this.hdfsFileUtil = hdfsFileUtil;
uriToFileToWrite = hdfsFileUtil.getUriToHdfs() + "/object-container";
createFileIfNeeded();
}
private void createFileIfNeeded() {
final String methodName = "createFileIfNeeded";
synchronized (this) {
try {
if (!hdfsFileUtil.fileExistsInCluster(uriToFileToWrite)) {
hdfsFileUtil.createFileInCluster(uriToFileToWrite);
}
} catch (IOException e) {
LOG.error(methodName, "", "Error creating the file in the cluster: {}", e);
}
}
}
@Override
public void onOperationsBatchSynchronization(OperationsBatchData batchData) {
final String methodName = "onOperationsBatchSynchronization";
LOG.error(methodName, "", "Batch operation received: {}", batchData.getSourceDetails().getName());
DataSyncOperation[] operations = batchData.getBatchDataItems();
synchronized (this) {
for (DataSyncOperation operation : operations) {
try {
hdfsFileUtil.writeObjectToAFile((Serializable) operation.getDataAsObject(), uriToFileToWrite);
} catch (IOException e) {
LOG.error(methodName, "", "Error writing the object to a file in the cluster: {}", e);
}
}
}
}
}
И это код класса, отвечающий за взаимодействия с пространством:
public class HDFSFileUtilImpl implements HDFSFileUtil {
private final static Logger LOG = LoggerFactory.getLogger(HDFSFileUtilImpl.class);
private final static boolean DELETE_RECURSIVELY = true;
private final String uriToHdfs;
private final FileSystem fileSystem;
public HDFSFileUtilImpl(HDFSConfiguration config, String uriToHdfs, String user) {
Validate.notNull(config);
Validate.notEmpty(uriToHdfs);
Validate.notEmpty(user);
this.uriToHdfs = uriToHdfs;
try {
fileSystem = FileSystem.get(new URI(uriToHdfs), config.getConfiguration(), user);
} catch (IOException | URISyntaxException | InterruptedException e) {
LOG.error("constructor", "", "HDFSFileUtilImpl constructor failed: {}", e);
throw new IllegalStateException(e);
}
}
@Override
public String getUriToHdfs() {
return uriToHdfs;
}
@Override
public void writeObjectToAFile(Serializable obj, String fileUri) throws IOException {
Validate.notNull(obj);
Validate.notEmpty(fileUri);
FSDataOutputStream out;
if (!fileExistsInCluster(fileUri)) {
throw new IllegalArgumentException("File with URI: " + fileUri + " does not exist in the cluster");
}
out = fileSystem.append(new Path(fileUri));
byte[] objByteArray = getBytesFromObject(obj);
out.write(objByteArray);
out.close();
}
private byte[] getBytesFromObject(Object obj) throws IOException {
byte[] retByteArray = null;
// try/catch used only to be able to use "try with resources" feature
try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = new ObjectOutputStream(bos);) {
out.writeObject(obj);
retByteArray = bos.toByteArray();
} catch (IOException e) {
throw new IOException(e);
}
return retByteArray;
}
@Override
public void createFileInCluster(String uriOfFile) throws IOException {
Validate.notEmpty(uriOfFile);
fileSystem.create(new Path(uriOfFile));
}
@Override
public boolean fileExistsInCluster(String uri) throws IOException {
Validate.notEmpty(uri);
boolean result = false;
result = fileSystem.exists(new Path(uri));
return result;
}
...
}
Источник данных имеет три соединения, установленные с моим компонентом, и метод onOperationsBatchSynchronization
получает вызов одновременно. Вот почему используются блоки синхронизации, но даже с ними я получаю следующее исключение из журналов:
10:09:23.727 ERROR - onOperationsBatchSynchronization
org.apache.hadoop.ipc.RemoteException: Failed to create file [/object-container] for [DFSClient_NONMAPREDUCE_1587728611_73] for client [127.0.0.1], because this file is already being created by [DFSClient_NONMAPREDUCE_1972611521_106] on [127.0.0.1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2636)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2462)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2700)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2663)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:559)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:388)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)
Так что может быть проблема? У меня есть некоторые модульные тесты (больше похоже на интеграцию, поскольку они полагаются на текущую настройку Hadoop), и все методы на HDFSFileUtilImpl
работают правильно и дают ожидаемые результаты.
EDIT: Я просто попытался записать файлы в кластере, а не добавлять к одному файлу, и он отлично работает. Поэтому я откажусь от любой проблемы с разрешением.
Я думаю, что есть некоторые разрешения проблемы из-за которой его не удалось создать файл – Mr37037
Но он способен создайте файл. Выполняя некоторые тесты, я пытался создавать файлы, а не просто добавлять их, и он создает новые файлы без каких-либо проблем. Я отредактирую сообщение, чтобы сообщить об этом – jbarren