2015-07-22 5 views
6

У меня есть производитель и потребитель, связанный с BlockingQueue.Подвесной потребитель в схеме производителя/потребителя

Потребительские ожидания записи из очереди и обрабатывать его:

Record r = mQueue.take(); 
process(r); 

мне нужно приостановить этот процесс на некоторое время из другого потока. Как его реализовать?

Теперь я думаю, что реализовать такие, но это выглядит как плохое решение:

private Object mLock = new Object(); 
private boolean mLocked = false; 

public void lock() { 
    mLocked = true; 
} 

public void unlock() { 
    mLocked = false; 
    mLock.notify(); 

} 

public void run() { 
    .... 
      Record r = mQueue.take(); 
      if (mLocked) { 
       mLock.wait(); 
      } 
      process(r); 
} 
+1

Вы не возражаете обеспечивая немного больше контекста на ваших конкретных потребностей? Причина, по которой я спрашиваю, заключается в том, что если мне действительно нужно приостановить кого-то в цепочке, я бы скорее остановил продюсера, а не потребитель. Он в основном достигает аналогичного эффекта, не увеличивая производительность очереди, пока пауза действует. – sstan

ответ

2

Я думаю, что ваше решение является простым и элегантным, и думаю, что вы должны держать его с некоторыми изменениями. Модификации, которые я предлагаю, - synchronization.

Без него могут возникать (и очень часто) помехи потоков и ошибки согласования памяти. Кроме того, вы не можете wait или notify на блокировке, которой вы не являетесь (и владеете ею, если у вас ее есть в блоке ..). Исправить легко, просто добавьте блок синхронизации mLock, где вы ждете/уведомляете об этом. Кроме того, по мере того, как вы меняете mLocked из другого потока, вы хотите отметить его volatile.

private Object mLock = new Object(); 
private volatile boolean mLocked = false; 

public void lock() { 
    mLocked = true; 
} 

public void unlock() { 
    synchronized(mlock) { 
     mLocked = false; 
     mLock.notify(); 
    } 

} 

public void run() { 
    .... 
      Record r = mQueue.take(); 
      synchronized(mLock) { 
       while (mLocked) { 
        mLock.wait(); 
       } 
      } 
      process(r); 
} 
+0

Я написал тот же код. Но потом я подумал, что если вы зайдете в блок синхронизации с ожиданием, я не буду входить в блок синхронизации с уведомлением. Но я ошибся: http://stackoverflow.com/questions/13249835/java-does-wait-release-lock-from-synchronized-block – HotIceCream

+2

Может быть лучше заменить, если (замедленное) на (замедленном). http://stackoverflow.com/a/2779565/1173794 – HotIceCream

+0

@HotIceCream Да, не заметил, что - отредактирован. – ddmps

2

Вы можете использовать java.util.concurrent.locks.ConditionJava docs, чтобы приостановить на некоторое время, основанного на том же состоянии.

Этот подход выглядит чистым для меня и Механизм ReentrantLock имеет лучшую пропускную способность, чем синхронизированный. Читайте ниже отрывок из IBM article

В качестве бонуса, реализация ReentrantLock гораздо более масштабируемым под раздора, чем в текущей реализации синхронизированных. (Это , вероятно, будет улучшено согласованное исполнение , синхронизированное в будущей версии JVM.) Это означает, что при многие потоки борются за одну и ту же блокировку, общая пропускная способность , как правило, будет лучше с ReentrantLock, чем с синхронизированным.


BlockingQueue наиболее известны решения производитель-потребитель проблемы, а также использует Condition для ожидания.

См. Ниже пример, взятый из Java doc Condition, который представляет собой пример реализации шаблона производителя - потребителя.

class BoundedBuffer { 
    final Lock lock = new ReentrantLock(); 
    final Condition notFull = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[100]; 
    int putptr, takeptr, count; 

    public void put(Object x) throws InterruptedException { 
    lock.lock(); 
    try { 
     while (count == items.length) 
     notFull.await(); 
     items[putptr] = x; 
     if (++putptr == items.length) putptr = 0; 
     ++count; 
     notEmpty.signal(); 
    } finally { 
     lock.unlock(); 
    } 
    } 

    public Object take() throws InterruptedException { 
    lock.lock(); 
    try { 
     while (count == 0) 
     notEmpty.await(); 
     Object x = items[takeptr]; 
     if (++takeptr == items.length) takeptr = 0; 
     --count; 
     notFull.signal(); 
     return x; 
    } finally { 
     lock.unlock(); 
    } 
    } 
} 

Дальнейшее чтение:

+3

Г-н вниз Downvoter, можете ли вы принять боль, чтобы объяснить свое голодание, пожалуйста? – hagrawal

+0

Человек, которого я ненавижу downvotes, когда человек действительно занимает некоторое время, чтобы дать проницательное объяснение. Если это решение не работает для вас, отлично. Но он не заслуживает ниспуска. +1 для меня, даже если я нахожу, что первый проще, второй дает представление о потоках и параллелизмах. –

+2

не мой нисходящий, но оценка производительности, основанная на статье 10+ лет, кажется неудобной. –

1

Создайте новый класс, который расширяет реализацию BlockingQueue.Добавьте два новых метода: pause() и unpause(). В случае необходимости, рассмотреть paused флаг и использовать другой blockingQueue2 ждать (в моем примере только в методе take(), а не в put()):

public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> { 

    private static final long serialVersionUID = 184661285402L; 

    private Object lock1 = new Object();//used in pause() and in take() 
    private Object lock2 = new Object();//used in pause() and unpause() 

    //@GuardedBy("lock") 
    private volatile boolean paused; 

    private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>(); 

    public void pause() { 
     if (!paused) { 
      synchronized (lock1) { 
      synchronized (lock2) { 
       if (!paused) { 
        paused = true; 
        blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty 
       } 
      } 
      } 
     } 
    } 

    public void unpause() throws InterruptedException { 
     if (paused) { 
      synchronized (lock2) { 
       paused = false; 
       blockingQueue2.put(new Object());//release waiting thread, if there is one 
      } 
     } 
    } 

    @Override 
    public E take() throws InterruptedException { 
     E result = super.take(); 

     if (paused) { 
      synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting 
       if (paused) { 
        blockingQueue2.take(); 
       } 
      } 
     } 

     return result; 
    } 

    //TODO override similarly the poll() method. 
}