2015-10-27 3 views
0

Я пытаюсь отправить несколько заданий и получить результаты по мере их поступления. Однако после окончания цикла я должен обеспечить, чтобы все задачи выполнялись в течение определенного периода времени. Если нет, сделайте ошибку. Изначально все, что у меня было, - invokeAll, shutdown и callerService, и awaitTermination звонки, которые были использованы для обеспечения выполнения всех задач (несмотря на ошибки или нет). Я перенес код, чтобы использовать CompletionService, чтобы отобразить результаты. Где я могу принудительно выполнить предложение waitaitTermination в вызовах CompletionService?Enforce executorService.awaitTermination при использовании CompletionService

CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); 
      logger.info("Submitting all tasks"); 
      for (Callable<String> task : tasks) 
       completionService.submit(task); 
      executor.shutdown(); 
      logger.info("Tasks submitted. Now checking the status."); 
      while (!executor.isTerminated()) 
      { 
       final Future<String> future = completionService.take(); 
       String itemValue; 
       try 
       { 
        itemValue = future.get(); 
        if (!itemValue.equals("Bulk")) 
         logger.info("Backup completed for " + itemValue); 
       } 
       catch (InterruptedException | ExecutionException e) 
       { 
        String message = e.getCause().getMessage(); 
        String objName = "Bulk"; 
        if (message.contains("(") && message.contains(")")) 
         objName = message.substring(message.indexOf("(") + 1, message.indexOf(")")); 
        logger.error("Failed retrieving the task status for " + objName, e); 
       } 
      } 
executor.awaitTermination(24, TimeUnit.HOURS); 

Другими словами, как я могу использовать тайм-аут для CompletionService?

EDIT:

Исходный код, который я имел отображалось ниже. Проблема в том, что я повторяю в будущем списке, а затем печатаю их как завершенные. Однако мое требование состоит в том, чтобы отобразить те, которые были заполнены на основе FCFS.

List<Future<String>> results = executor.invokeAll(tasks); 
     executor.shutdown(); 
     executor.awaitTermination(24, TimeUnit.HOURS); 

     while (results.size() > 0) 
     { 
      for (Iterator<Future<String>> iterator = results.iterator(); iterator.hasNext();) 
      { 
       Future<String> item = iterator.next(); 
       if (item.isDone()) 
       { 
        String itemValue; 
        try 
        { 
         itemValue = item.get(); 
         if (!itemValue.equals("Bulk")) 
          logger.info("Backup completed for " + itemValue); 
        } 
        catch (InterruptedException | ExecutionException e) 
        { 
         String message = e.getCause().getMessage(); 
         String objName = "Bulk"; 
         if (message.contains("(") && message.contains(")")) 
          objName = message.substring(message.indexOf("(") + 1, message.indexOf(")")); 
         logger.error("Failed retrieving the task status for " + objName, e); 
        } 
        finally 
        { 
         iterator.remove(); 
        } 
       } 
      } 
     } 
+0

Но почему? Похоже, что 'CompletionService' не подходит для проблемы, которую вы пытаетесь решить. Что случилось с 'invokeAll'? – kan

+0

Кроме того, непонятно, почему вам нужно 'awaitTermination'. Просто используйте 'invokeAll', как описано здесь: http://stackoverflow.com/a/3269888/438742 – kan

+0

Чтобы отобразить, какие процессы были выполнены сначала в журнале, мне требуется CompletionService. Если я использую invokeAll, он блокируется до тех пор, пока все задачи не будут завершены, и я должен отобразить статус в том порядке, в котором будут представлены задачи. – dmachop

ответ

1

Я предлагаю вам ждать исполнителя прекратить в другом потоке

Таким образом, вы можете добиться предоставления результатов Fcfs, а также следить за соблюдением тайм-аут.

Это может быть легко достигнуто с чем-то, что будет выглядеть следующим

CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); 

// place all the work in a function (an Anonymous Runnable in this case) 
// completionService.submit(() ->{work}); 
// as soon as the work is submitted it is handled by another Thread 

completionService.submit(() ->{ 
    logger.info("Submitting all tasks"); 
    for (Callable<String> task : tasks) 
    completionService.submit(task); 
    logger.info("Tasks submitted. Now checking the status."); 
    int counter = tasks.size(); 
    for(int i = counter; counter >=1; counter--) // Replaced the while loop 
    { 
     final Future<String> future = completionService.take(); 
     String itemValue; 
     try 
     { 
      itemValue = future.get(); 
      if (!itemValue.equals("Bulk")) 
       logger.info("Backup completed for " + itemValue); 
     } 
     catch (InterruptedException | ExecutionException e) 
     { 
      String message = e.getCause().getMessage(); 
      String objName = "Bulk"; 
      if (message.contains("(") && message.contains(")")) 
       objName = message.substring(message.indexOf("(") + 1, message.indexOf(")")); 
      logger.error("Failed retrieving the task status for " + objName, e); 
     } 
    } 
}); 

// After submitting the work to another Thread 
// Wait in your Main Thread, and enforce termination if needed 
shutdownAndAwaitTermination(executor); 

Вы обрабатываете прекращение исполнители & & ждут с помощью этого (взято из ExecutorsService)

void shutdownAndAwaitTermination(ExecutorService pool) { 
    pool.shutdown(); // Disable new tasks from being submitted 
    try { 
    // Wait a while for existing tasks to terminate 
    if (!pool.awaitTermination(24, TimeUnit.HOURS)) { 
     pool.shutdownNow(); // Cancel currently executing tasks 
     // Wait a while for tasks to respond to being cancelled 
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 
      System.err.println("Pool did not terminate"); 
    } 
    } catch (InterruptedException ie) { 
    // (Re-)Cancel if current thread also interrupted 
    pool.shutdownNow(); 
    // Preserve interrupt status 
    Thread.currentThread().interrupt(); 
    } 
} 
+0

Ваша строка: 'while (! Executor.isTerminated())' повторяет до тех пор, пока исполнитель не будет завершен. Если я не ошибаюсь, 'awaitTermination()' избыточно здесь. – dmachop

+0

@dmachop Я просто хотел дать вам представление о том, что я говорю, поэтому я скопировал ваш код, на самом деле не работал. Используйте любое условие остановки, которое вы хотите. Возможно, счетчик поставленных задач. Также сообщите нам, если бы это сработало для вас – Xipo

+0

Таким образом, единственный способ - запустить таймер после отправки задач и вручную проверить условие в цикле while, прошел ли таймер? – dmachop

0

Ok тогда, вы необходимо следить за завершением. Итак, почему yon не используется в соответствии с документацией? https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html Итак, он отправляет n задания на новый экземпляр ExecutorCompletionService и ждет n. Без повторного завершения вы можете просто повторно использовать одного и того же исполнителя (обычно пул потоков, создание нового потока дороже, чем повторное использование из пула). Так что, если я адаптировать код из документации к вашему сценарию это будет что-то вроде:

CompletionService<Result> ecs 
     = new ExecutorCompletionService<String>(executor); 
for (Callable<Result> task : tasks) 
    ecs.submit(task); 
logger.info("Tasks submitted. Now checking the status."); 
int n = tasks.size(); 
for (int i = 0; i < n; ++i) { 
    try { 
     String r = ecs.take().get(); 
     logger.info("Backup completed for " + r); 
    } 
    catch(InterruptedException | ExecutionException e) { 
     ... 
    } 
} 

Кроме того, это плохая идея, чтобы разобрать сообщение об исключении, лучше, если вы создаете свой собственный класс исключений и использовать instanceof.

Если вам нужен тайм-аут для завершения - используйте poll с временными параметрами вместо take.

+0

Спасибо. Я получил понятие об исключении цикла while, использовал цикл for и затем применял таймаут позже, как это делал Xipo. – dmachop