2012-05-04 2 views
15

У меня есть несколько потребительских потоков, ожидающих CountDownLatch размера 1, используя await(). У меня есть один поток производителей, который вызывает countDown(), когда он успешно завершит работу.Как мне отменить «CountDownLatch»?

Это отлично работает, когда нет ошибок.

Однако, если производитель обнаруживает ошибку, я хотел бы, чтобы он мог сигнализировать об ошибке для потребительских потоков. В идеале я мог бы назвать продюсером что-то вроде abortCountDown(), и все потребители получат InterruptedException или какое-то другое исключение. Я не хочу называть countDown(), потому что для этого требуются все мои потребительские потоки, чтобы выполнить дополнительную ручную проверку на успех после их вызова await(). Я бы предпочел, чтобы они просто получили исключение, которое они уже знают, как обращаться.

Я знаю, что устройство прерывания недоступно в CountDownLatch. Есть ли другой примитив синхронизации, который я могу легко адаптировать для эффективного создания CountDownLatch, который поддерживает отмену обратного отсчета?

ответ

11

JB Nizet получил отличный ответ. Я взял его и немного отполировал. Результатом является подкласс CountDownLatch с именем AbortableCountDownLatch, который добавляет метод «abort()» к классу, который заставит все потоки, ожидающие на защелке, получить исключение AbortException (подкласс InterruptedException).

Кроме того, в отличие от класса JB, AbortableCountDownLatch будет прервать все блокирующие потоки немедленно при прерывании, вместо того, чтобы ждать обратного отсчета до нуля (для ситуаций, когда вы используете счетчик> 1).

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 

public class AbortableCountDownLatch extends CountDownLatch { 
    protected boolean aborted = false; 

    public AbortableCountDownLatch(int count) { 
     super(count); 
    } 


    /** 
    * Unblocks all threads waiting on this latch and cause them to receive an 
    * AbortedException. If the latch has already counted all the way down, 
    * this method does nothing. 
    */ 
    public void abort() { 
     if(getCount()==0) 
      return; 

     this.aborted = true; 
     while(getCount()>0) 
      countDown(); 
    } 


    @Override 
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
     final boolean rtrn = super.await(timeout,unit); 
     if (aborted) 
      throw new AbortedException(); 
     return rtrn; 
    } 

    @Override 
    public void await() throws InterruptedException { 
     super.await(); 
     if (aborted) 
      throw new AbortedException(); 
    } 


    public static class AbortedException extends InterruptedException { 
     public AbortedException() { 
     } 

     public AbortedException(String detailMessage) { 
      super(detailMessage); 
     } 
    } 
} 
+0

Как использовать этот класс, My Situation У меня есть список, и список динамически обновляется в режиме реального времени. Существует автоматическое событие, входящее в список, на котором я должен ждать 2 минуты, но если между временем ожидания появляется сообщение «Руководство пользователя» в списке, мне приходится прерывать ожидание и действовать немедленно. –

12

Encapsulate это поведение внутри определенного, высокого уровня класса, используя CountDownLatch внутренне:

public class MyLatch { 
    private CountDownLatch latch; 
    private boolean aborted; 
    ... 

    // called by consumers 
    public void await() throws AbortedException { 
     latch.await(); 
     if (aborted) { 
      throw new AbortedException(); 
     } 
    } 

    // called by producer 
    public void abort() { 
     this.aborted = true; 
     latch.countDown(); 
    } 

    // called by producer 
    public void succeed() { 
     latch.countDown(); 
    } 
} 
+1

Чтобы сделать этот поточно-безопасный, не следует прерывать 'volatile'? –

+5

Нет, потому что безопасность потока обеспечивается CountDownLatch: метод countDown обеспечивает наличие барьера между потоками. В javadoc говорится: «Действия в потоке до вызова countDown() происходят перед действиями после успешного возврата из соответствующего ожидания()« –

+0

Да, теперь я помню, что читал (некоторое время назад). Благодарю. –

3

Вы можете создать оболочку вокруг CountDownLatch, которая обеспечивает возможность отмены официантов. Он должен будет отслеживать ожидающие потоки и освобождать их при тайм-ауте, а также помнить, что защелка была отменена, поэтому будущие вызовы await будут прерываться немедленно.

public class CancellableCountDownLatch 
{ 
    final CountDownLatch latch; 
    final List<Thread> waiters; 
    boolean cancelled = false; 

    public CancellableCountDownLatch(int count) { 
     latch = new CountDownLatch(count); 
     waiters = new ArrayList<Thread>(); 
    } 

    public void await() throws InterruptedException { 
     try { 
      addWaiter(); 
      latch.await(); 
     } 
     finally { 
      removeWaiter(); 
     } 
    } 

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
     try { 
      addWaiter(); 
      return latch.await(timeout, unit); 
     } 
     finally { 
      removeWaiter(); 
     } 
    } 

    private synchronized void addWaiter() throws InterruptedException { 
     if (cancelled) { 
      Thread.currentThread().interrupt(); 
      throw new InterruptedException("Latch has already been cancelled"); 
     } 
     waiters.add(Thread.currentThread()); 
    } 

    private synchronized void removeWaiter() { 
     waiters.remove(Thread.currentThread()); 
    } 

    public void countDown() { 
     latch.countDown(); 
    } 

    public synchronized void cancel() { 
     if (!cancelled) { 
      cancelled = true; 
      for (Thread waiter : waiters) { 
       waiter.interrupt(); 
      } 
      waiters.clear(); 
     } 
    } 

    public long getCount() { 
     return latch.getCount(); 
    } 

    @Override 
    public String toString() { 
     return latch.toString(); 
    } 
} 
+0

Не могли бы вы объяснить, как использовать то же самое. Моя ситуация: у меня есть список, и список динамически обновляется в режиме реального времени. Существует автоматическое событие, входящее в список, на котором я должен ждать 2 минуты, но если между временем ожидания появляется сообщение «Руководство пользователя» в списке, мне приходится прерывать ожидание и действовать немедленно. –

0

Вы можете свернуть свой собственный CountDownLatch с использованием ReentrantLock, что позволяет получить доступ к его защищенной getWaitingThreads способом.

Пример:

public class FailableCountDownLatch { 
    private static class ConditionReentrantLock extends ReentrantLock { 
     private static final long serialVersionUID = 2974195457854549498L; 

     @Override 
     public Collection<Thread> getWaitingThreads(Condition c) { 
      return super.getWaitingThreads(c); 
     } 
    } 

    private final ConditionReentrantLock lock = new ConditionReentrantLock(); 
    private final Condition countIsZero = lock.newCondition(); 
    private long count; 

    public FailableCountDownLatch(long count) { 
     this.count = count; 
    } 

    public void await() throws InterruptedException { 
     lock.lock(); 
     try { 
      if (getCount() > 0) { 
       countIsZero.await(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public boolean await(long time, TimeUnit unit) throws InterruptedException { 
     lock.lock(); 
     try { 
      if (getCount() > 0) { 
       return countIsZero.await(time, unit); 
      } 
     } finally { 
      lock.unlock(); 
     } 
     return true; 
    } 

    public long getCount() { 
     lock.lock(); 
     try { 
      return count; 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public void countDown() { 
     lock.lock(); 
     try { 
      if (count > 0) { 
       count--; 

       if (count == 0) { 
        countIsZero.signalAll(); 
       } 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public void abortCountDown() { 
     lock.lock(); 
     try { 
      for (Thread t : lock.getWaitingThreads(countIsZero)) { 
       t.interrupt(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 

Вы можете изменить этот класс, чтобы бросить InterruptedException на новые вызовы в await после его отмены. Вы можете даже расширить этот класс CountDownLatch, если вам нужна эта функциональность.

Смежные вопросы