2009-09-08 6 views
13

Обнаружена ситуация, когда ThreadPoolExecutor припаркован в execute(Runnable) функции, а все токи ThreadPool ждут в getTask func, workQueue пуст.Тупик в ThreadPoolExecutor

Есть ли у кого-нибудь идеи?

ThreadPoolExecutor создается с ArrayBlockingQueue и corePoolSize == maximumPoolSize = 4

[Редактировать] Чтобы быть более точным, то поток блокируется в ThreadPoolExecutor.exec(Runnable command) FUNC. У этого есть задача выполнить, но не делает этого.

[Edit2] Исполнитель заблокирован где-то внутри рабочей очереди (ArrayBlockingQueue).

[Edit3] стеком вызовов:

thread = front_end(224) 
at sun.misc.Unsafe.park(Native methord) 
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1114) 
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) 
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) 
at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224) 
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653) 
at net.listenThread.WorkersPool.execute(WorkersPool.java:45) 

в то же время workQueue пусто (проверено с помощью удаленной отладки)

[Edit4] Код работы с ThreadPoolExecutor:

public WorkersPool(int size) { 
    pool = new ThreadPoolExecutor(size, size, IDLE_WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(WORK_QUEUE_CAPACITY), 
     new ThreadFactory() { 
     @NotNull 
     private final AtomicInteger threadsCount = new AtomicInteger(0); 

     @NotNull 
     public Thread newThread(@NotNull Runnable r) { 
      final Thread thread = new Thread(r); 
      thread.setName("net_worker_" + threadsCount.incrementAndGet()); 
      return thread; 
     } 
     }, 

     new RejectedExecutionHandler() { 
     public void rejectedExecution(@Nullable Runnable r, @Nullable ThreadPoolExecutor executor) { 
      Verify.warning("new task " + r + " is discarded"); 
     } 
     }); 
    } 

    public void execute(@NotNull Runnable task) { 
    pool.execute(task); 
    } 

    public void stopWorkers() throws WorkersTerminationFailedException { 
    pool.shutdownNow(); 
    try { 
     pool.awaitTermination(THREAD_TERMINATION_WAIT_TIME, TimeUnit.SECONDS); 
    } catch (InterruptedException e) { 
     throw new WorkersTerminationFailedException("Workers-pool termination failed", e); 
    } 
    } 
} 
+0

Какова природа задачи, передаваемой функции TPE.execute()? Если задача имеет доступ к TPE, это может быть вашей проблемой. – artemv

+1

Я думаю, что это сопоставимая проблема на 1.7.0_13.Процесс начинается и работает без проблем ... и затем в какой-то момент у меня есть ~~ 200 задач, но моя очередь блокировки пуста. Размер основного пула - 3 ... Я также использую ArrayBlockingQueue .... – cljk

ответ

2

Я не вижу блокировки в коде ThreadPoolExecutorexecute(Runnable). Единственная переменная - workQueue. Какой вид BlockingQueue вы предоставили вашему ThreadPoolExecutor?

На тему тупиков:

Вы можете подтвердить, что это тупиковый путем изучения полного дампа темы, как это предусмотрено <ctrl><break> на Windows, или kill -QUIT на системах UNIX.

После того, как у вас есть эти данные, вы можете изучить потоки. Вот Уместно отрывок из Sun's article on examining thread dumps (suggested reading):

Для подвешивания, зашли в тупик или замороженные программы: Если вы думаете, что ваша программа висит, генерировать трассировки стека и исследовать потоки в состояниях MW или CW. Если программа зашла в тупик, то некоторые из системных потоков, вероятно, будут отображаться как текущие потоки, потому что для JVM больше ничего не нужно делать.

Замечание: если вы работаете в среде IDE, можете ли вы обеспечить, чтобы в этих методах не было точек останова.

+0

Как я писал в своем вопросе, используется ArrayBlockingQueue. И он пуст. Да, поток блокируется где-то в рабочей очереди. – Vitaly

+0

Я использовал удаленную отладку. Отредактирован вопрос - добавлен столик вызова. – Vitaly

+0

Вы также можете проверить блокировки с помощью JConsole – pjp

0

Как уже упоминалось, это звучит как нормальное поведение, ThreadPoolExecutor просто ждет выполнения какой-либо работы. Если вы хотите, чтобы остановить его, вам необходимо позвонить:

executor.shutdown()

, чтобы заставить его прекратить, как правило, с последующим исполнителем.awaitTermination

+0

Отредактировал вопрос. – Vitaly

0

Библиотека исходного кода ниже (это фактически класс от http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip),
- немного сложнее - дополнительная защита от повторных вызовов FutureTask, если я не ошибаюсь - но не кажется тупиковой склонной - очень простое использование ThreadPool:

package net.spy.memcached.transcoders; 

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.Future; 
import java.util.concurrent.FutureTask; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 
import java.util.concurrent.atomic.AtomicBoolean; 

import net.spy.memcached.CachedData; 
import net.spy.memcached.compat.SpyObject; 

/** 
* Asynchronous transcoder. 
*/ 
public class TranscodeService extends SpyObject { 

    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L, 
      TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), 
      new ThreadPoolExecutor.DiscardPolicy()); 

    /** 
    * Perform a decode. 
    */ 
    public <T> Future<T> decode(final Transcoder<T> tc, 
      final CachedData cachedData) { 

     assert !pool.isShutdown() : "Pool has already shut down."; 

     TranscodeService.Task<T> task = new TranscodeService.Task<T>(
       new Callable<T>() { 
        public T call() { 
         return tc.decode(cachedData); 
        } 
       }); 

     if (tc.asyncDecode(cachedData)) { 
      this.pool.execute(task); 
     } 
     return task; 
    } 

    /** 
    * Shut down the pool. 
    */ 
    public void shutdown() { 
     pool.shutdown(); 
    } 

    /** 
    * Ask whether this service has been shut down. 
    */ 
    public boolean isShutdown() { 
     return pool.isShutdown(); 
    } 

    private static class Task<T> extends FutureTask<T> { 
     private final AtomicBoolean isRunning = new AtomicBoolean(false); 

     public Task(Callable<T> callable) { 
      super(callable); 
     } 

     @Override 
     public T get() throws InterruptedException, ExecutionException { 
      this.run(); 
      return super.get(); 
     } 

     @Override 
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
       ExecutionException, TimeoutException { 
      this.run(); 
      return super.get(timeout, unit); 
     } 

     @Override 
     public void run() { 
      if (this.isRunning.compareAndSet(false, true)) { 
       super.run(); 
      } 
     } 
    } 

} 
0

Определенно странно.

Но прежде чем писать свой собственный ТРЕ попробовать:.

  • другой BlockingQueue осущ, например, LinkedBlockingQueue

  • указать справедливость = истина в ArrayBlockingQueue, то есть использовать new ArrayBlockingQueue(n, true)

Из этих двух ОПТС я бы выбрал второе, потому что это очень странно, что offer() блокируется; одна причина, которая приходит в голову - политика планирования потоков на вашем Linux. Как предположение.

7

Похоже, что это ошибка с JVM старше 6u21. В скомпилированном исходном коде возникла проблема для некоторых (возможно, всех) ОС.

Из ссылке:

Эта ошибка вызвана отсутствием барьеров памяти в различных Паркера :: парк() пути, которые могут привести к потере пробуждениями и зависаний. (Обратите внимание, что PlatformEvent :: парк, используемый встроенной синхронизацией, не уязвим ). -XX: + UseMembar создает обход, потому что барьер membar в логике перехода состояния скрывает проблему в Parker ::. (т. е. нет ничего плохого в использовании механизма -UseMembar , но + UseMembar скрывает ошибку Parker: :). Это ошибка дня с добавлением java.util.concurrent в JDK 5.0. Я разработал простой режим C отказа, и, похоже, более вероятно, что проявит себя на современных платформах AMD и Nehalem, вероятно, из-за более глубокихбуферов для хранения, которые занимают больше времени для слива. Я предусмотрел предварительное исправление в Doug Lea для Parker :: park, который, кажется, устраняет ошибку. Я буду доставить это исправление во время выполнения. (Я также увеличу CR с дополнительными тестовыми примерами и более длинным объяснением). Вероятно, это хороший кандидат на задние порты .

Ссылка: JVM Bug

Обходные доступны, но вы, вероятно, будет лучше от просто получить самую последнюю копию Java.

+1

Я обновился до 'build 1.6.0_27-b07' (работает на Solaris 10 SPARC), но все еще не решает проблему. Мой Jboss ESB по-прежнему создает тысячи потоков и не закрывает их. –

1

Этот тупик, вероятно, потому, что вы запускаете задачу у самого исполнителя. Например, вы отправляете одну задачу, и она запускает еще 4 задачи. Если у вас размер пула равен 4, то вы просто полностью переполняете его, а последняя задача будет ждать, пока кто-то из задач вернет значение. Но первая задача ждет завершения всех разветвленных задач.

Смежные вопросы