2012-04-10 3 views
8

Есть что-то странное в реализации BoundedExecutor в книге Java Concurrency in Practice.Java Параллельность на практике: состояние гонки в BoundedExecutor?

Предполагается подчинить выполнение задачи Исполнителю, заблокировав подающий поток, если в Исполнителе есть достаточно нити, поставленных в очередь или работающих.

Это реализация (после добавления недостающего Rethrow в пункте улова):

public class BoundedExecutor { 
    private final Executor exec; 
    private final Semaphore semaphore; 

    public BoundedExecutor(Executor exec, int bound) { 
     this.exec = exec; 
     this.semaphore = new Semaphore(bound); 
    } 

    public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { 
     semaphore.acquire(); 

     try { 
      exec.execute(new Runnable() { 
       @Override public void run() { 
        try { 
         command.run(); 
        } finally { 
         semaphore.release(); 
        } 
       } 
      }); 
     } catch (RejectedExecutionException e) { 
      semaphore.release(); 
      throw e; 
     } 
    } 

Когда я экземпляр BoundedExecutor с Executors.newCachedThreadPool() и граница 4, я бы ожидать, что количество потоков, экземпляры которых по пул кэшированных потоков никогда не будет превышать 4. На практике, однако, это так. Я получил эту маленькую тестовую программу для создания целых 11 резьба:

public static void main(String[] args) throws Exception { 
    class CountingThreadFactory implements ThreadFactory { 
     int count; 

     @Override public Thread newThread(Runnable r) { 
      ++count; 
      return new Thread(r); 
     }   
    } 

    List<Integer> counts = new ArrayList<Integer>(); 

    for (int n = 0; n < 100; ++n) { 
     CountingThreadFactory countingThreadFactory = new CountingThreadFactory(); 
     ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory); 

     try { 
      BoundedExecutor be = new BoundedExecutor(exec, 4); 

      for (int i = 0; i < 20000; ++i) { 
       be.submitTask(new Runnable() { 
        @Override public void run() {} 
       }); 
      } 
     } finally { 
      exec.shutdown(); 
     } 

     counts.add(countingThreadFactory.count); 
    } 

    System.out.println(Collections.max(counts)); 
} 

Я думаю, что есть крошечные временные рамки между выпуском семафора и окончанием задачи, где другой поток может aquire разрешения и отправьте задачу, пока поток выделения еще не закончен. Другими словами, он имеет состояние гонки.

Может кто-нибудь подтвердить это?

+1

Я добавил 1ms Thread.sleep сразу после semaphore.release(), чтобы узнать, насколько это будет хуже: я создал более 300 потоков. – toto2

ответ

2

Я вижу целых 9 потоков, созданных сразу. Я подозреваю, что есть состояние гонки, из-за чего количество потоков больше, чем требуется.

Это может быть связано с тем, что перед выполнением заданий выполняется до и после. Это означает, что хотя в вашем блоке есть только 4 потока, существует несколько потоков, останавливающих предыдущую задачу или готовящихся к запуску новой задачи.

т. Е. Поток выполняет освобождение(), пока он все еще работает. Несмотря на то, что последнее, что вы делаете, это не последнее, что он делает, прежде чем приобретать новую задачу.

+0

Я согласен с вашим заявлением, которое также в основном связано с тем, что подозревала Босси. Но я не уверен, что я назвал бы это условие гонки, потому что это подразумевает некоторую ошибку программирования: я не думаю, что у программы должно быть максимальное количество потоков, поскольку она написана. – toto2

+0

@ toto2 Его не является регулярной ошибкой программирования, но между выпуском Семафора есть состояние гонки, и поток получает новую задачу. Если задача, в которой вы жили дольше, вы можете видеть это поведение редко. –

+1

Я понимаю о длине задачи (см. Мой комментарий к вопросу Босси). Я просто имею в виду, что это не ошибка, потому что нет ничего, указывающего, что должно быть 4 потока. «Это не ошибка, это особенность!» В этом случае я думаю, что кто-то действительно может это сделать. Извините ... Я просто философствую здесь. – toto2

5

Вы в своем анализе о состоянии гоночной травы. Гарантийных гарантий синхронизации между ExectorService & Семафор нет.

Однако я не знаю, регулирует ли количество потоков то, для чего используется BoundedExecutor. Я думаю, что это больше для регулирования количества заданий, переданных службе. Представьте, если у вас есть 5 миллионов задач, которые необходимо отправить, и если вы отправляете более 10 000 из них, у вас заканчивается память.

У вас будет только 4 потока, работающих в любой момент времени, почему вы хотите попробовать и поставить в очередь все 5 миллионов задач? Вы можете использовать подобную конструкцию, чтобы уменьшить количество задач, стоящих в очереди в любой момент времени. Из этого вы должны выйти, что в любой момент времени выполняется всего 4 задачи.

Очевидно, что разрешение на это - использовать Executors.newFixedThreadPool(4).

+1

Точно. Задача, представленная базовому исполнителю, который запускает 'command', освобождает семафор до его завершения, но это происходит в потоке исполнителя, выполняющем задачу. Это позволяет отправить другую задачу исполнителю и передать ее новому потоку, теоретически разрешив в два раза больше потоков, чем 'bound', при условии правильных условий. –

+1

@John: он дросселирует количество поставленных задач, но непредсказуемым и ненадежным способом. Если потоки запланированы неудачно, это может означать, что многие подающие потоки могут проникнуть внутрь. Кроме того, в случае с 'newFixedThreadPool()' эти задачи все еще могут накапливаться в неограниченной очереди «Executor» , все еще рискуя нехваткой памяти. @David: Я получил целых 11 потоков с обязательностью 4. Кроме того, мне кажется забавным, что в справочнике по параллелизму Java все еще есть условия гонки. –

+0

@Bossie Вы правы, вы все еще подозреваете OOM в newFixedThreadPool, но это предотвратит создание лишних потоков. Вы используете обязательство, чтобы предотвратить, как я уже упоминал, миллионы заданий, которые будут представлены одновременно. –

10

BoundedExecutor действительно был предназначен как иллюстрация того, как подавлять подчинение задания, а не как способ привязки к размеру пула потоков. Есть более прямые способы достижения последнего, как отметил хотя бы один комментарий.

Но другие ответы не упоминают текст в книге, которая говорит, чтобы использовать неограниченную очередь и

set the bound on the semaphore to be equal to the pool size plus the number of queued tasks you want to allow, since the semaphore is bounding the number of tasks both currently executing and awaiting execution. [JCiP, end of section 8.3.3]

Упоминая неограниченные очереди и размер пула, мы подразумевающие (по-видимому, не очень ясно) использование пула потоков ограниченного размера.

Однако, что всегда беспокоило меня о BoundedExecutor, это то, что он не реализует интерфейс ExecutorService. Современный способ достижения аналогичной функциональности и реализации стандартных интерфейсов должен был бы использовать метод Guava listeningDecorator и класс ForwardingListeningExecutorService.

+0

Ответ от одного из экспертов, это потрясающе. Спасибо, что сделал кое-что еще более ясным Тимом. Эти «ListenableFuture's» выглядят интересными. Поэтому я должен представить несколько задач, и всякий раз, когда вы завершаете, я отправляю новый код в обратном вызове, верно? –

+0

+1 от источника –

+0

@Bossie - Вы могли бы это сделать, конечно, но я имел в виду, что вы могли бы продолжать использовать метод Семафор BoundedExecutorService, украсив исполняемый файл приобретением и обратным вызовом с выпуском. –

Смежные вопросы