2014-11-14 4 views
0

У меня есть приложение Grails, которое ежедневно работает в полночь. В моем примере приложения у меня есть 10000 Person записи и сделать следующее в кварцевой работы:Обработка огромного количества данных с помощью grails и gpars

package threading 

import static grails.async.Promises.task 
import static groovyx.gpars.GParsExecutorsPool.withPool 

class ComplexJob { 
    static triggers = { 
     simple repeatInterval: 30 * 1000l 
    } 

    def execute() { 
     if (Person.count == 5000) { 
      println "Executing job"     
      withPool 10000, { 
       Person.listOrderByAge(order: "asc").each { p -> 
        task { 
         log.info "Started ${p}" 
         Thread.sleep(15000l - (-1 * p.age)) 
        }.onComplete { 
         log.info "Completed ${p}" 
        } 
       } 
      }     
     } 
    } 
} 

игнорировать repeatInterval, как это только для целей тестирования. Когда задание запускается на выполнение, я получаю следующее исключение:

2014-11-14 16:11:51,880 quartzScheduler_Worker-3 grails.plugins.quartz.listeners.ExceptionPrinterJobListener - Exception occurred in job: Grails Job 
org.quartz.JobExecutionException: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000 [See nested exception: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000] 
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:111) 
    at org.quartz.core.JobRunShell.run(JobRunShell.java:202) 
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) 
Caused by: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000 
    at org.grails.async.factory.gpars.LoggingPoolFactory$3.rejectedExecution(LoggingPoolFactory.groovy:100) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) 
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) 
    at groovyx.gpars.scheduler.DefaultPool.execute(DefaultPool.java:155) 
    at groovyx.gpars.group.PGroup.task(PGroup.java:305) 
    at groovyx.gpars.group.PGroup.task(PGroup.java:286) 
    at groovyx.gpars.dataflow.Dataflow.task(Dataflow.java:93) 
    at org.grails.async.factory.gpars.GparsPromise.<init>(GparsPromise.groovy:41) 
    at org.grails.async.factory.gpars.GparsPromiseFactory.createPromise(GparsPromiseFactory.groovy:68) 
    at grails.async.Promises.task(Promises.java:123) 
    at threading.ComplexJob$_execute_closure1_closure3.doCall(ComplexJob.groovy:20) 
    at threading.ComplexJob$_execute_closure1.doCall(ComplexJob.groovy:19) 
    at groovyx.gpars.GParsExecutorsPool$_withExistingPool_closure2.doCall(GParsExecutorsPool.groovy:192) 
    at groovyx.gpars.GParsExecutorsPool.withExistingPool(GParsExecutorsPool.groovy:191) 
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:162) 
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:136) 
    at threading.ComplexJob.execute(ComplexJob.groovy:18) 
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:104) 
    ... 2 more 
2014-11-14 16:12:06,756 Actor Thread 20 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368) 
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
2014-11-14 16:12:06,756 Actor Thread 5 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368) 
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Кажется, если пул потоков не был установлен на 10000 в то время как я использую withPool(10000) Могу ли я, возможно, сделать это вычисление (теперь только печатает войти заявления) в кусках? Если да, то как я могу узнать, что последний элемент был обработан (например, где продолжить)?

+5

Почему бы просто не использовать меньший пул (1000, как говорится в исключении)? Создание 10000 потоков для выполнения работы вряд ли будет быстрее, чем каждый раз подряд. –

+3

Почему бы не использовать что-то, что на самом деле предназначено для дозирования, например Spring Batch внутри вашего приложения Grails? Это то, что я делаю, и это работает очень хорошо. –

+0

Это не тот случай, что чем больше пул, тем быстрее выполняется обработка. Некоторое время назад я использовал 100 потоков, и я фактически вызвал большие проблемы с эффективностью JVM. После проб и ошибок выяснилось, что 15 потоков было достаточно. – Opal

ответ

0

Попытка обернуть обработку каждого элемента в задачу кажется не оптимальной. Стандартный способ выполнения параллельной обработки - это разделение всей задачи на соответствующее количество подзадач. Вы начинаете с выбора этого номера. Для задачи, связанной с процессором, вы можете создать задачи N = число процессоров. Затем вы разбиваете задачу на N подзадач. Как это:

persons = Person.listOrderByAge(order: "asc") 
nThreads = Runtime.getRuntime().availableProcessors() 
size = persons.size()/nThreads 
withPool nThreads, { 
    persons.collate(size).each { subList => 
     task { 
      subList.each { p => 
       ...  
      } 
     }   
    } 
} 
1

Я подозреваю, что метод withPool() не имеет никакого эффекта, так как задача, скорее всего, с помощью резьбы по умолчанию пул, а не один созданный в withPool. Попробуйте удалить вызов withPool() и проверьте, все ли выполняются задачи.

Пул groovyx.gpars.scheduler.DefaultPool (по умолчанию для задач) в GPars изменяет размер с задачами и имеет ограничение до 1000 одновременных потоков.

Я бы предложил создать пул фиксированного размера вместо этого, например:

def group = new DefaultPGroup(numberOfThreads) 
group.task {...} 

Примечание: Я не знаком с этой задачей grails.async, только те основные GPars, так что вещи могут немного отличаться вокруг PGroups в grails.async.

Смежные вопросы