2011-02-02 3 views
5

Предположим, что у меня есть ExecutorService (который может быть пулом потоков, поэтому есть параллелизм), который выполняет задачу в разное время, периодически или в ответ на какое-то другое условие. Задача, которая должна быть выполнена, следующая:java: executors + tasks + locks

  • Если эта задача уже выполняется, ничего не предпринимайте (и пусть закончившаяся работа заканчивается).
  • Если эта задача еще не выполняется, запустите Алгоритм X, который может занять много времени.

Я пытаюсь придумать способ реализации этого. Это должно быть что-то вроде:

Runnable task = new Runnable() { 
    final SomeObj inProgress = new SomeObj(); 
    @Override public void run() { 
     if (inProgress.acquire()) 
     { 
      try 
      { 
      algorithmX(); 
      } 
      finally 
      { 
      inProgress.release(); 
      } 
     } 
    } 
} 

// re-use this task object whenever scheduling the task with the executor 

где SomeObj является либо ReentrantLock (приобретение = tryLock() и отпустить = unlock()) или AtomicBoolean или что-то, но я не уверен,. Мне нужен ReentrantLock здесь? (Возможно, я хочу блокировку без реентера в случае, если algorithmX() заставляет эту задачу запускать рекурсивно!) Или достаточно было бы AtomicBoolean?


изменить: для блокировки, не являющейся реентерабельным, это подходит?

Runnable task = new Runnable() { 
    boolean inProgress = false; 
    final private Object lock = new Object(); 
    /** try to acquire lock: set inProgress to true, 
    * return whether it was previously false 
    */ 
    private boolean acquire() { 
     synchronized(this.lock) 
     { 
     boolean result = !this.inProgress; 
     this.inProgress = true; 
     return result; 
     } 
    } 
    /** release lock */ 
    private void release() { 
     synchronized(this.lock) 
     { 
     this.inProgress = false; 
     } 
    } 
    @Override public void run() { 
     if (acquire()) 
     { 
      // nobody else is running! let's do algorithmX() 
      try 
      { 
      algorithmX(); 
      } 
      finally 
      { 
      release(); 
      } 
     } 
     /* otherwise, we are already in the process of 
     * running algorithmX(), in this thread or in another, 
     * so don't do anything, just return control to the caller. 
     */ 
    } 
} 
+0

Рекомендация: вместо того, чтобы представлять сложный код и просить кого-то его проверить, попробуйте сами, а затем, если вы столкнулись с поведением, которое вы не понимаете, задайте конкретный вопрос. –

+0

спасибо ... Это не то, что я ищу, чтобы проверить код, это то, что у меня есть ситуация, когда мне нужно запускать задачу поочередно в многопоточной системе, и я не уверен, как правильно ее реализовать перед лицом проблемы параллелизма. Представленный мной код представляет собой просто попытку на примере. Если вместо этого я должен использовать существующий класс, я хотел бы знать это, потому что я возился с каким точным вопросом. –

+1

Мой опыт с проблемами параллелизма заключается в том, что, хотя вы можете попробовать код, чтобы увидеть, есть ли очевидные ошибки, вы не можете просто попробовать его проверить, удастся ли ему, потому что могут быть странные невероятные угловые условия, которые покажут вашу программу неверной , и может быть невозможно воспроизвести эти условия по требованию. –

ответ

2

Реализация блокировки, которую вы предлагаете, является слабым в том смысле, что было бы очень легко использовать ее ненадлежащим образом.

Ниже является гораздо более эффективной реализации с теми же неподходящих слабостей использовать в качестве реализации:

AtomicBoolean inProgress = new AtomicBoolean(false) 
    /* Returns true if we acquired the lock */ 
    private boolean acquire() { 
     return inProgress.compareAndSet(false, true); 
    } 
    /** Always release lock without determining if we in fact hold it */ 
    private void release() { 
     inProgress.set(false); 
    } 
+0

«Когда ваш метод получения возвращает true, это означает, что у кого-то еще есть блокировка, и вы фактически не смогли получить блокировку». nope - снова прочитайте код, я перевернул старое существующее значение. Но у вас гораздо более простая реализация, спасибо! –

+0

@ Джейсон S, да, извините, не видел этого. Тем не менее, операции CAS> синхронизация –

+0

Можете ли вы объяснить немного больше того, что вы подразумеваете под «было бы довольно легко использовать [код] неправильно»? – user359996

0

ReentrantLock кажется прекрасным для меня. Единственная ситуация, когда мне было бы интересно вручную создать блокировку с использованием AtomicInteger, будет, если у вас действительно короткий algorithmX, который не ваш случай.

+0

Проблема с * ReentrantLock * заключается в том, что OP обеспокоен тем, что * algorithmX * может инициировать выполнение когда он уже запущен. – user359996

2

Ваш первый бит кода выглядит довольно хорошо, но если вы беспокоитесь о algorithmX рекурсивно Призывая задача, я бы предложил использовать файл java.util.concurrent.Semaphore как объект синхронизации, а не ReentrantLock. Например:

Runnable task = new Runnable() { 
    final Semaphore lock = new Semaphore(1); 
    @Override public void run() { 
     if (lock.tryAcquire()) 
     { 
      try 
      { 
      algorithmX(); 
      } 
      finally 
      { 
      lock.release(); 
      } 
     } 
    } 
} 

Примечание, в частности, использование попробоватьприобретают. Если получение блокировки не выполняется, algorithmX не запускается.

0

Я думаю, что секрет выбора правильной блокировки заключается в следующем: * если эта задача уже выполняется, ничего не делайте (и пусть закончившаяся работа заканчивается).

Что означает «ничего не делать» в этом контексте? Поток должен блокировать и повторять выполнение после запуска algorithmX закончен ?. Если это так, то semaphore.acquire вместо tryAcquire следует использовать, и решение AtomicBoolean не будет работать должным образом.