13

У меня есть следующий фрагмент кода, который в основном сканирует список задач, который должен быть выполнен, и каждая задача затем предоставляется исполнителю для выполнения.Обработка исключений для ThreadPoolExecutor

В свою очередь, JobExecutor создает другого исполнителя (для выполнения операций db ... чтение и запись данных в очередь) и завершает задачу.

JobExecutor возвращает Future<Boolean> для поставленных задач. Когда одна из задач выходит из строя, я хочу изящно прервать все потоки и завершить работу исполнителя, перехватив все исключения. Какие изменения мне нужно сделать?

public class DataMovingClass { 
    private static final AtomicInteger uniqueId = new AtomicInteger(0); 

    private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator(); 

    ThreadPoolExecutor threadPoolExecutor = null ; 

    private List<Source> sources = new ArrayList<Source>(); 

    private static class IDGenerator extends ThreadLocal<Integer> { 
     @Override 
     public Integer get() { 
      return uniqueId.incrementAndGet(); 
     } 
    } 

    public void init(){ 

    // load sources list 

    } 

    public boolean execute() { 

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10, 
       10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
       new ThreadFactory() { 
        public Thread newThread(Runnable r) { 
         Thread t = new Thread(r); 
         t.setName("DataMigration-" + uniqueNumber.get()); 
         return t; 
        }// End method 
       }, new ThreadPoolExecutor.CallerRunsPolicy()); 

    List<Future<Boolean>> result = new ArrayList<Future<Boolean>>(); 

    for (Source source : sources) { 
        result.add(threadPoolExecutor.submit(new JobExecutor(source))); 
    } 

    for (Future<Boolean> jobDone : result) { 
       try { 
        if (!jobDone.get(100000, TimeUnit.SECONDS) && success) { 
         // in case of successful DbWriterClass, we don't need to change 
         // it. 
         success = false; 
        } 
       } catch (Exception ex) { 
        // handle exceptions 
       } 
      } 

    } 

    public class JobExecutor implements Callable<Boolean> { 

     private ThreadPoolExecutor threadPoolExecutor ; 
     Source jobSource ; 
     public SourceJobExecutor(Source source) { 
      this.jobSource = source; 
      threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
        new ThreadFactory() { 
         public Thread newThread(Runnable r) { 
          Thread t = new Thread(r); 
          t.setName("Job Executor-" + uniqueNumber.get()); 
          return t; 
         }// End method 
        }, new ThreadPoolExecutor.CallerRunsPolicy()); 
     } 

     public Boolean call() throws Exception { 
      boolean status = true ; 
      System.out.println("Starting Job = " + jobSource.getName()); 
      try { 

         // do the specified task ; 


      }catch (InterruptedException intrEx) { 
       logger.warn("InterruptedException", intrEx); 
       status = false ; 
      } catch(Exception e) { 
       logger.fatal("Exception occurred while executing task "+jobSource.getName(),e); 
       status = false ; 
      } 
      System.out.println("Ending Job = " + jobSource.getName()); 
      return status ; 
     } 
    } 
} 

ответ

14

Когда вы отправляете задание исполнителю, он возвращает вам FutureTask экземпляр.

FutureTask.get() будет повторно выбрасывать любое исключение, заданное задачей, как ExecutorException.

Итак, когда вы перебираете List<Future> и звоните, набирайте каждый из них, поймайте ExecutorException и вызовите упорядоченное выключение.

+0

ОК ... вы видите какие-либо другие недостатки или места, где мне нужно обрабатывать исключения? – jagamot

1

Подкласс ThreadPoolExecutor и переопределить его метод protected afterExecute (Runnable r, Throwable t).

Если вы создаете пул потоков через класс удобства java.util.concurrent.Executors (которого вы не знаете), взгляните на его источник, чтобы посмотреть, как он вызывает ThreadPoolExecutor.

0

Поскольку вы отправляете задания на ThreadPoolExecutor, исключения проглатываются FutureTask.

Посмотрите на этот code

**Inside FutureTask$Sync** 

void innerRun() { 
    if (!compareAndSetState(READY, RUNNING)) 
     return; 

    runner = Thread.currentThread(); 
    if (getState() == RUNNING) { // recheck after setting thread 
     V result; 
     try { 
      result = callable.call(); 
     } catch (Throwable ex) { 
      setException(ex); 
      return; 
     } 
     set(result); 
    } else { 
     releaseShared(0); // cancel 
    } 

}

protected void setException(Throwable t) { 
    sync.innerSetException(t); 
} 

Из выше кода, то ясно, что setException метод ловли Throwable.По этой причине, FutureTask глотает все исключения, если вы используете «submit()» метод на ThreadPoolExecutor

Как на Java documentation, вы можете продлить afterExecute() метод в ThreadPoolExecutor

protected void afterExecute(Runnable r, 
          Throwable t) 

Пример кода согласно документации:

class ExtendedExecutor extends ThreadPoolExecutor { 
    // ... 
    protected void afterExecute(Runnable r, Throwable t) { 
    super.afterExecute(r, t); 
    if (t == null && r instanceof Future<?>) { 
     try { 
     Object result = ((Future<?>) r).get(); 
     } catch (CancellationException ce) { 
      t = ce; 
     } catch (ExecutionException ee) { 
      t = ee.getCause(); 
     } catch (InterruptedException ie) { 
      Thread.currentThread().interrupt(); // ignore/reset 
     } 
    } 
    if (t != null) 
     System.out.println(t); 
    } 
} 

Вы можете поймать Exceptions в три пути

  1. Future.get() как предложено в принятом ответе
  2. обертывания всего run() или call() метод try{}catch{}Exceptoion{} блоков
  3. переопределяют afterExecute из ThreadPoolExecutor способа, как показано выше

Для того, чтобы корректно прерывать другие потоки, посмотрят на ниже SE вопрос:

How to stop next thread from running in a ScheduledThreadPoolExecutor

How to forcefully shutdown java ExecutorService