2014-10-03 3 views
2

У меня есть исполнитель threadpool, выполняющий ту же операцию для списка ключей, входящих в партии. поэтому я использую метод invokeall() для обработки списка ключей в пакете. usecase такова, что если какая-либо из задач в пакете возвращает ошибку, нет смысла продолжать обработку других ключей. Итак,ThreadPoolExecutor: отменить задачи из invokeAll(), когда одна задача возвращает ошибку

  1. Как я могу отменить задачи пакетного выполнения после того, как задача перенастроила ошибку.
  2. , но не влияет на выполнение другой партии ключей. т.е. отмена должна быть выделена на каждую партию.

Благодарим за помощь.

ответ

1

Я не вижу, как это можно сделать без какой-либо настройки. Простейшая реализация я мог придумать требует:

  • специализированная реализация Будущего в основном подкласс FutureTask, который переопределяет метод setException(), чтобы отменить все другие задачи, когда задача бросает исключение
  • специализированных ThreadPoolExecutor реализация которых перекрывает invokeAll(), чтобы использовать пользовательский будущее

это выглядит следующим образом:

для пользовательского будущего:

import java.util.Collection; 
import java.util.concurrent.*; 

public class MyFutureTask<V> extends FutureTask<V> { 
    private Callable<V> task; 
    private Collection<Future<V>> allFutures; 

    public MyFutureTask(Callable<V> task, Collection<Future<V>> allFutures) { 
    super(task); 
    this.task = task; 
    this.allFutures = allFutures; 
    } 

    @Override 
    protected void setException(Throwable t) { 
    super.setException(t); 
    synchronized(allFutures) { 
     for (Future<V> future: allFutures) { 
     if ((future != this) && !future.isDone()) { 
      future.cancel(true); 
     } 
     } 
    } 
    } 
} 

для пользовательского пула потоков: например

import java.util.*; 
import java.util.concurrent.*; 

public class MyThreadPool extends ThreadPoolExecutor { 
    public MyThreadPool(int size) { 
    super(size, size, 1L, TimeUnit.MILLISECONDS, 
     new LinkedBlockingQueue<Runnable>()); 
    } 

    @Override 
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException { 
    List<Future<T>> futures = new ArrayList<>(tasks.size()); 
    for (Callable<T> callable: tasks) { 
     futures.add(new MyFutureTask<>(callable, futures)); 
    } 
    for (Future<T> future: futures) { 
     execute((MyFutureTask<T>) future); 
    } 
    for (Future<T> future: futures) { 
     try { 
     future.get(); 
     } catch (ExecutionException|CancellationException e) { 
     // ignore this exception 
     } 
    } 
    return futures; 
    } 
} 

код, чтобы проверить это:

import java.util.*; 
import java.util.concurrent.*; 

public class TestThreadPool { 
    public static void main(final String[] args) { 
    ExecutorService executor = null; 
    try { 
     int size = 10; 
     executor = new MyThreadPool(size); 
     List<Callable<String>> tasks = new ArrayList<>(); 
     int count=1; 
     tasks.add(new MyCallable(count++, false)); 
     tasks.add(new MyCallable(count++, true)); 
     List<Future<String>> futures = executor.invokeAll(tasks); 
     System.out.println("results:"); 
     for (int i=0; i<futures.size(); i++) { 
     Future<String> f = futures.get(i); 
     try { 
      System.out.println(f.get()); 
     } catch (CancellationException e) { 
      System.out.println("CancellationException for task " + (i+1) + 
      ": " + e.getMessage()); 
     } catch (ExecutionException e) { 
      System.out.println("ExecutionException for task " + (i+1) + 
      ": " + e.getMessage()); 
     } 
     } 
    } catch(Exception e) { 
     e.printStackTrace(); 
    } finally { 
     if (executor != null) executor.shutdownNow(); 
    } 
    } 

    public static class MyCallable implements Callable<String> { 
    private final int index; 
    private final boolean simulateFailure; 

    public MyCallable(int index, boolean simulateFailure) { 
     this.index = index; 
     this.simulateFailure = simulateFailure; 
    } 

    @Override 
    public String call() throws Exception { 
     if (simulateFailure) { 
     throw new Exception("task " + index + " simulated failure"); 
     } 
     Thread.sleep(2000L); 
     return "task " + index + " succesful"; 
    } 
    } 
} 

и, наконец, результат выполнения теста, как показано на консоли вывода:

results: 
CancellationException for task 1: null 
ExecutionException for task 2: java.lang.Exception: task 2 simulated failure 
0

  1. Передайте ссылку в ExecutorService для каждой задачи, как ниже:

    ExecutorService eServ = Executors.newFixedThreadPool(10); 
    
    Set<Callable<ReaderThread>> tasks = new HashSet<Callable<ReaderThread>>(); 
    
    for (int i = 0; i < 10 ; i++) 
    { 
        tasks.add(new ReaderThread(eServ)); 
    } 
    
    List<Future<ReaderThread>> lt = eServ.invokeAll(tasks); 
    
  2. Если задача ошибка, то вызовите shutdownNow() то он остановит все задачи

    public ReaderThread call() throws Exception 
    { 
        try  
        { 
         for (int i = 1; i < 50; i++) 
         { 
          System.out.println("i="+i+"::"+Thread.currentThread()); 
          Thread.sleep(1000); 
    
          if (i == 10 && Thread.currentThread().toString().equals("Thread[pool-1-thread-7,5,main]")) 
          { 
           throw new Exception(); 
          }   
         } 
        } 
        catch (Exception exc) 
        { 
         ex.shutdownNow();  
        } 
    
        return this; 
    } 
    
Смежные вопросы