Я пытаюсь использовать метод invokeAll()
для параллельной загрузки файла с клиента на разные серверы. Это код:Почему future.get() застрял?
List<UploadTask> uploadTasks = new ArrayList<>();
ExecutorService taskExecutor = Executors.newFixedThreadPool(result.size());
for(ClusterElement clusterElement : result)
uploadTasks.add(new UploadTask(localFile, fileName, clusterElement));
//wait at most 10 seconds for the uploads of the file on all the servers
List<Future<ClusterElement>> uploadResult = taskExecutor.invokeAll(uploadTasks, 13, TimeUnit.MINUTES);
System.out.println("Timeout!");
System.out.println("Printing size " + uploadResult.size());
//after timeout abort all FTPClient (if already finish nothing happens)
for(int i=0;i<uploadResult.size();i++)
{
Future<ClusterElement> future = uploadResult.get(i);
ClusterElement clusterElement = result.get(i);
System.out.println("Result of "+clusterElement.getId()+" cancelled="+future.isCancelled());
try {
if(future.isCancelled()) {//if task not finished, then future cancelled (interrupted)
System.out.println("Deleting file on "+clusterElement.getId());
removeFile(clusterElement.getAddress(),clusterElement.getPort(),fileName,clusterElement);
}
else {
System.out.println("Getting result...");
ClusterElement serverUploaded = future.get();//task finished: uploaded complete (or failed)
if(serverUploaded!=null)//successfully uploaded
{
System.out.println("Successfully uploaded on "+serverUploaded.getId());
servers.add(serverUploaded);
}
else
System.out.println("Uploading failed on "+servers.get(i));
}
}catch (ExecutionException e) {
e.printStackTrace();
}
}
Где UploadTask.call()
является:
@Override
public ClusterElement call() throws Exception {
ClusterElement result;//null = uploading failed, server where the file was uploaded otherwise
try (FileInputStream file = new FileInputStream(localFile)) {
FTPClient ftpClient = SingleClientRequest.createClient(clusterElement.getAddress(),clusterElement.getPort());
ftpClient.enterLocalPassiveMode();
System.out.println("Task uploading file on "+clusterElement.getId());
if(ftpClient.storeFile(fileName+".tmp", file)) {//so the Garbage Collector will not delete it during upload
ftpClient.rename(fileName + ".tmp", fileName);//rename the file with its real name
result = clusterElement;
}
else {
System.out.println("File " + fileName + " not uploaded on" + clusterElement.getId());
result = null;
}
} catch (IOException e) {
System.err.println("Error uploading "+localFile+" on "+clusterElement.getId());
result = null;
}
System.out.println("Returning "+result);
return result;
}
Другими словами, если файл не загружен в течение тайм-аута, то он удаляется из FS (к сожалению FTPClient не имеет любая операция прерывания для этой цели, поэтому я должен удалить ее, пока она все еще загружается).
Сейчас проблема в том, что, если я убью один из серверов, то UploadTask.call()
возвращает null
(так как IOException
отбрасывается) и так, что в будущем будет завершена, и поэтому не отмененные ... НО когда я пытаюсь ClusterElement serverUploaded = future.get();
он застрянет там вместо того, чтобы принимать null
!
Почему это происходит?
выбрасывается исключение? –