2016-06-07 6 views
0

Каждая операция создаст основной поток в нашем приложении, и этот поток создаст задачи и отправит ExecutorService.Как заставить потоки производителей подождать, пока потребительские потоки закончатся

В настоящее время эти потоки производителей добавят задачи к ArrayList, из которых потоки от threadPool будут выполнять задачи.

Во время высокой нагрузки (когда есть много операций), я получаю «Jvm out of memory» ошибок.

Как я могу избежать этой ситуации, создав основные потоки, чтобы подождать некоторое время на основе некоторого ограничения?

+0

Почему бы Вы просто ограничиваете количество потоков? – Kayaman

+0

Подсказка: вы хотите запустить профилировщик java/включить GC-протоколирование, чтобы ** действительно понять, что происходит с потреблением памяти. Вы не можете исправить проблему, которую вы не понимаете. – GhostCat

+1

Re: «Каждая операция создаст основной поток ...» Когда большинство разработчиков говорят «основной поток», они говорят о первом потоке в JVM для выполнения кода вашего приложения. В этом использовании может быть только один «основной поток», и он не может быть создан _by_ вашим приложением, потому что он был создан для вашего приложения. –

ответ

2

Из памяти еще одна проблема, но вы хотите producer/consumer with blocking queue.

Если потребляемые потоки в пуле вашего исполнителя не могут обрабатывать запросы достаточно быстро, очередь блокировки будет заполняться. Как только он станет полным, производители будут заблокированы от добавления новых задач, пока потребители не смогут обрабатывать запросы и освобождать место.

Это может не решить проблему с OOM. Вам нужно прокомментировать код, чтобы узнать, где потребляется память. Возможно, ваши потребительские потоки не очищаются, когда они закончены, или есть статические данные, которые не были исправлены GC.

Возможно, это такая ситуация, как Sorcerer's Apprentice. Вы получите ошибку OOM, если производители создают запросы так быстро, не останавливаясь, что ArrayList расширяется без конца, пока не будет потреблена вся ваша память. Если это основная причина, то блокировка deque исправит ее.

+1

Почему дека? Очередь будет, верно? – bowmore

+0

№ Deque требуется, чтобы производители добавляли с одного конца, а потребители брали у другого. Блокирует производителей, когда они полны. – duffymo

+1

Euhm, вот что делает очередь, статья, которую вы связали, даже не говорит о deques ... Deque просто позволяет удалять и добавлять на обоих концах.Это функциональность, которая вам не нужна для шаблона Producer Consumer. – bowmore

0

Если вы используете ExecutorService для выполнения задач, то я предлагаю вам вместо стандартной реализации пулов потоков вы можете определить свой собственный пул потоков с настраиваемой конфигурацией, создав экземпляр ThreadPoolExecutor с необходимой конфигурацией. Смотрите ниже:

/** 
    * Creates a new {@code ThreadPoolExecutor} with the given initial 
    * parameters and default thread factory and rejected execution handler. 
    * It may be more convenient to use one of the {@link Executors} factory 
    * methods instead of this general purpose constructor. 
    * 
    * @param corePoolSize the number of threads to keep in the pool, even 
    *  if they are idle, unless {@code allowCoreThreadTimeOut} is set 
    * @param maximumPoolSize the maximum number of threads to allow in the 
    *  pool 
    * @param keepAliveTime when the number of threads is greater than 
    *  the core, this is the maximum time that excess idle threads 
    *  will wait for new tasks before terminating. 
    * @param unit the time unit for the {@code keepAliveTime} argument 
    * @param workQueue the queue to use for holding tasks before they are 
    *  executed. This queue will hold only the {@code Runnable} 
    *  tasks submitted by the {@code execute} method. 
    * @throws IllegalArgumentException if one of the following holds:<br> 
    *   {@code corePoolSize < 0}<br> 
    *   {@code keepAliveTime < 0}<br> 
    *   {@code maximumPoolSize <= 0}<br> 
    *   {@code maximumPoolSize < corePoolSize} 
    * @throws NullPointerException if {@code workQueue} is null 
    */ 
    public ThreadPoolExecutor(int corePoolSize, 
           int maximumPoolSize, 
           long keepAliveTime, 
           TimeUnit unit, 
           BlockingQueue<Runnable> workQueue) { 
    } 

Здесь вы можете предоставить свой собственный BlockingQueue с размером, определенной вами, так что теперь в этом случае, когда очередь заполнена, то в дальнейшем новые задачи будут ждать до тех пор, пока очередь не имеет места, чтобы принять новые задачи ,

С помощью этой реализации вы можете контролировать и настраивать пул потоков в соответствии с вашими требованиями.

Вы можете нажать эту link для получения более подробной информации, Вы можете найти его скучный: P, но поверьте мне, вы найдете это очень полезно, когда вы читаете это так просто прочитать его терпеливо .. :)

Смежные вопросы