2013-10-03 4 views
1

У меня есть задача, которая запускается каждые 1 минуту. Эта задача содержит пару методов, которые обрабатывают некоторые данные. Process1 считывает данные (один или несколько) в статусе 1 и в конце этого процесса обновляет статус до 2. Process2 считывает данные в состоянии 2 и т. Д. Так что теперь я хочу, чтобы улучшить эту обработку с помощью taskExecutor (org.springframework.core.task.SimpleAsyncTaskExecutor), чтобы запустить его параллельно:Как улучшить работу асинхронной задачи

public void process1() { 

    List<Object> objects = someDao.readDataWithStatus("1"); 
    if (objects == null || objects.isEmpty()) { 
     return; 
    } 

    for (final Object object : objects) { 
      if (BooleanUtils.isTrue(isParalelProcess())) { 
       taskExecutor.execute(new Runnable() { 

        @Override 
        public void run() { 
         process(object); 
        } 
       }); 
      } else { 
       process(object); 
      }   
    } 
} 

Допустим, у нас есть один объект с STATUS1 и мы хотим, параллельную обработку. Задача, которая запускается каждую минуту. Process1 читает данные со статусом 1, помещает его в taskExecutor и переходит к следующему методу. Этот метод не нашел объект, статус 2, поэтому здесь нечего делать. Через минуту процесс2 считывает объект со статусом 2 и так далее. Объект обработки занимает от 1 секунды до пары минут. Как вы можете видеть, происходит значительное замедление процесса, которое занимает всего пару секунд. Есть ли способ улучшить эту обработку?

+0

Вы хотите запустить задачу 2 сразу после завершения задачи 1? – smajlo

+0

есть только одна задача. – hudi

+0

так что я не понимаю проблему – smajlo

ответ

0

Я потратил немного времени на это. Это может помочь вам.

public class SampleTest 
{ 

    public static void main(String[] args) 
    { 

     int noOfThreads = 100; 
     int maxThreadinQue = 100; 

     LinkedBlockingQueue<Runnable> processOneworkQueue = new LinkedBlockingQueue<Runnable>(); 

     ThreadPoolExecutor processOneThreadPoolExecutor = new ThreadPoolExecutor(noOfThreads, noOfThreads, 0L, 
       TimeUnit.SECONDS, 

       processOneworkQueue, new ThreadFactory() 
       { 
        private AtomicInteger itsCounter = new AtomicInteger(); 

        public Thread newThread(Runnable theRunnable) 
        { 
         Thread aThread = new Thread(theRunnable, "theThreadName" + "#" + itsCounter.getAndIncrement()); 
         aThread.setDaemon(true); 
         return aThread; 
        } 
       }); 

     LinkedBlockingQueue<Runnable> processTwoworkQueue = new LinkedBlockingQueue<Runnable>(); 

     ThreadPoolExecutor processTwoThreadPoolExecutor = new ThreadPoolExecutor(noOfThreads, noOfThreads, 0L, 
       TimeUnit.SECONDS, 

       processTwoworkQueue, new ThreadFactory() 
       { 
        private AtomicInteger itsCounter = new AtomicInteger(); 

        public Thread newThread(Runnable theRunnable) 
        { 
         Thread aThread = new Thread(theRunnable, "theThreadName" + "#" + itsCounter.getAndIncrement()); 
         aThread.setDaemon(true); 
         return aThread; 
        } 
       }); 

     ArrayList<SampleObject> sampleObjects = new ArrayList<SampleObject>(); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 
     sampleObjects.add(new SampleObject()); 

     while (true) 
     { 
      Iterator<SampleObject> it = sampleObjects.iterator(); 
      while (it.hasNext()) 
      { 

       final SampleObject sampleObject = it.next(); 

       if (sampleObject.getStatus() == 0) 
       { 
        if (processOneworkQueue.size() < maxThreadinQue) 
        { 
         processOneThreadPoolExecutor.submit(new Runnable() 
         { 
          @Override 
          public void run() 
          { 
           process1(sampleObject); 

          } 
         }); 
        } 
       } 
       else if (sampleObject.getStatus() == 1) 
       { 
        if (processTwoworkQueue.size() < maxThreadinQue) 
        { 
         processTwoThreadPoolExecutor.submit(new Runnable() 
         { 
          @Override 
          public void run() 
          { 
           process2(sampleObject); 

          } 
         }); 
        } 
       } 
       else if (sampleObject.getStatus() == 2) 
       { 

        it.remove(); 

       } 
      } 
     } 
    } 

    static void process1(SampleObject sampleObject) 
    { 
     System.out.println("process1"); 
     sampleObject.setStatus(1); 

    } 

    static void process2(SampleObject sampleObject) 
    { 
     System.out.println("process2"); 
     sampleObject.setStatus(2); 
    } 

} 

class SampleObject 
{ 
    int status=0; 

    public int getStatus() 
    { 
     return status; 
    } 

    public void setStatus(int status) 
    { 
     this.status = status; 
    } 
} 
Смежные вопросы