2013-12-11 2 views
0

Я пытаюсь выполнить несколько задач параллельно с CompletionService. Проблемы возникают, когда я пытаюсь осуществить отмену.Future.cancel (true) не надежно отменяет/прерывает поток

Вот набросок кода я использую:

void startTasks(int numberOfTasks) throws Exception { 

    ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); 
    CompletionService<TaskResultType> completionService = new ExecutorCompletionService<TaskResultType>(executor); 
    ConcurrentLinkedQueue<TaskResultType> results = new ConcurrentLinkedQueue<BenchmarkResult>(); 
    ArrayList<Future> futures = new ArrayList<Future>(); 

    for (int i = 0; i < numberOfTasks ; i++) { 
     TypeOfTask task = ... ; 
     Future future = completionService.submit(task); 
     futures.add(future); 
    } 

    boolean failed = false; 
    Throwable cause = null; 
    for (int i = 0; i < numberOfThreads; i++) { 
     try { 
      Future<TaskResultType> resultFuture = completionService.take(); 
      TaskResultType result = resultFuture.get(); 
      results.add(result); 
     } catch (ExecutionException e) { 
      failed = true; 
      cause = e.getCause(); 
      /* cancel all other running tasks in case of failure in one task */ 
      for (Future future : futures) { 
       future.cancel(true); 
      } 
     } catch (CancellationException e) { 
      // consume (planned cancellation from calling future.cancel()) 
     } 

    } 
    executor.shutdown(); 
    // code to throw an exception using cause 
} 

Задачи реализации Callable.

Когда я сейчас делаю исключение в одной из задач в большинстве случаев, это отлично работает, то есть я сразу получаю исключения от других задач и задач, которые заканчиваются немедленно (позволяет вызвать этот случай A). Но иногда (позволяет называть этот случай B), некоторые из задач заканчиваются первым, а затем бросают исключение CancellationException. Функция Future.cancel (true) вернулась в обоих случаях в обоих случаях для всех задач (кроме тех, которые были с исходным ExecutionException, потому что этот уже был отменен).

Я проверяю прерванный флаг с помощью Thread.currentThread.isInterrupted() в завершаемых задачах (т. Е. В тех случаях, когда аннулирование завершается неудачно), прерванный флаг имеет значение false.

Все это кажется очень странным поведением в моих глазах. Кто-нибудь может понять, в чем проблема?

Update

Лучшая идея до сих пор у меня есть, что где-то глубоко внутри кода, включающего задачу (только часть кода высокого уровня от себя) прерванное состояние потребляемый, например, с помощью уловленного InterruptedException, который не вызывает Thread.interrupt(), чтобы восстановить статус. Точное время, когда прерванный флаг задается функцией Future.cancel(), может незначительно отличаться из-за планирования потоков, что объясняет непоследовательное поведение. Будет ли потребленный прерванный статус объяснять поведение случая B?

+1

Как вы можете проверить прерванный флаг в задании, которое уже выполнено? Если он уже завершен, код не работает, правильно? – Gray

+0

Не видя кода в своей Задаче, мы не можем помочь вам – dkatzel

+1

Я подозреваю, что ваша проблема в том, что одна из задач выполняет IO или иным образом заблокирована и не проверяет 'Thread.currentThread.isInterrupted()'. Поэтому, даже если вы отменили «Будущее», и поток прерывается, это не обнаруживается. Ваша программа зависает или заканчивается? Если он зависает, вы можете сделать дамп потока, чтобы увидеть, какой из потоков зависает? – Gray

ответ

0

Но иногда некоторые задачи заканчиваются первым, а затем бросают исключение CancellationException. Функция Future.cancel (true) вернулась в обоих случаях в обоих случаях для всех задач (кроме тех, которые были с исходным ExecutionException, потому что этот уже был отменен).

Если ваша программа не заканчивали (или занимает много времени, чтобы закончить после того, как исключение), то я подозреваю, что ваша проблема в том, что одна из задач делают IO или иным образом блокируются и не проверяя Thread.currentThread().isInterrupted(). Поэтому, несмотря на то, что вы отменили будущее, и поток прерывается, это не обнаруживается.

Однако, похоже, что программа заканчивается. Поэтому я не уверен, что это за ошибка. Если вы поймаете исключение, вы вызываете future.cancel(true) на все фьючерсы в списке. Тот, который бросил, и те, которые уже закончили, должны вернуть false, так как они не могут быть отменены. Те, которые возвратили true от отмены, должны были быть прерваны.

Например. если второй-последний поток генерирует исключение, то future.cancel(true) должен возвращать только true для последнего потока.

Одна вещь, которую нужно сделать, - удалить фьючерсы по мере их завершения, чтобы вы не отменяли уже выполненные задания.Но все, что могли бы сделать это маскировать проблему вы видите сейчас:

Future<TaskResultType> resultFuture = completionService.take(); 
futures.remove(resultFuture); 

Update:

Вполне возможно, что какой-то код глотает прерывание. Это случается все время, к сожалению. Если некоторые потоки не заканчиваются сразу же после их отмены и заканчиваются, то это, вероятно, происходит.

+0

Так что я до сих пор не понимаю, в чем проблема @jvf. – Gray

+0

Все задачи выполняются долго. В одной из задач я бросаю исключение сразу после его запуска. Это приводит к ExecutionException, что приводит к вызову Future.cancel(). В тех случаях, когда все работает нормально, это немедленно приводит к отмене других задач, поэтому из службы завершения становится доступно больше фьючерсов, и все эти фьючерсы вызывают исключение CancellationException on Future.get(). Я бы ожидал, что вызов функции cancel() для выполняемых задач всегда должен немедленно отменить его ... – jvf

+0

Итак, когда это не работает, что происходит @jvf? – Gray

0

Но иногда некоторые из задач заканчиваются первым, а затем бросают 10 Исключение CancellationException.

Может быть случай, когда вы отменить задание, это обычно прерывается (в этом состоянии, вы можете думать, что она возвращает результат, но для CompletionService это аннулированного), будущее возвращается на take(), вы звоните future.get() и есть CancellationException.

Вы также можете посмотреть на гуавы-х Futures.allAsList, который, кажется, делает очень похожую вещь:

Создает новый ListenableFuture, значение которого представляет собой список, содержащий значения всех входных фьючерсов, если все добиться успеха. Если какой-либо вход завершается с ошибкой, возвращаемое будущее терпит неудачу.

Смежные вопросы