2015-08-07 2 views
1

Как an answer к question about pausing a BlockingQueue, я пришел с идеей использования существующей блокирующей структуры blockingQueue2 и охраны состояния двумя разными замками.Охранное состояние с двумя замками

public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> { 

    private static final long serialVersionUID = 184661285402L; 

    private Object lock1 = new Object();//used in pause() and in take() 
    private Object lock2 = new Object();//used in pause() and unpause() 

    //@GuardedBy("lock1") 
    private volatile boolean paused; 

    private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>(); 

    public void pause() { 
     if (!paused) { 
      synchronized (lock1) { 
      synchronized (lock2) { 
       if (!paused) { 
        paused = true; 
        blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty 
       } 
      } 
      } 
     } 
    } 

    public void unpause() throws InterruptedException { 
     if (paused) { 
      synchronized (lock2) { 
       paused = false; 
       blockingQueue2.put(new Object());//will release waiting thread, if there is one 
      } 
     } 
    } 

    @Override 
    public E take() throws InterruptedException { 
     E result = super.take(); 

     if (paused) { 
      synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting 
       if (paused) { 
        blockingQueue2.take(); 
       } 
      } 
     } 

     return result; 
    } 

    //TODO override similarly the poll() method. 
} 

мне нужны два разных замки, в противном случае unpause() может ждать lock1 состоялись уже в take() потребительского потока.

Мои вопросы:

  1. Может ли это прийти в тупик?
  2. Это вообще работает?
  3. Как часто вы видите такой код, как я сам не считаю его доступным для чтения?
  4. Как я могу аннотировать флаг paused: с @GuardedBy("lock1, locks2")?

PS: Любые улучшения приветствуются (кроме того, я мог бы использовать двоичный семафор вместо blockingQueue2).

ответ

2

Я отвечу на ваши вопросы один за другим

Может ли это прийти в тупик?

Нет, вы не столкнетесь с тупиком. Если вы получили lock1 и lock2 в разных заказах, чем это может привести к взаимоблокировке. Поскольку вы получаете их в том же порядке, когда вы держите их, все должно быть в порядке.

Это вообще работает?

Кажется. Все происходит - до того, как заказы, похоже, удовлетворены.

Как часто вы видите такой код, поскольку я сам не могу его прочитать?

Я никогда раньше не видел такого рода реализацию. Я согласен, что это не очень элегантно.


Я собираюсь представить альтернативное решение, которое использует Phaser. Можно утверждать, что это не более элегантный, просто альтернативный подход. Я просмотрел его в течение некоторого времени, и я думаю, что этого достаточно. Конечно, я никогда не видел этого подхода, но было интересно придумать.

public static class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> { 

    private static final long serialVersionUID = 184661285402L; 

    private final Phaser phaser = new Phaser(1); 
    private volatile int phase = phaser.getPhase(); 

    public BlockingQueueWithPause() { 
     // base case, all phase 0 await's will succeed through. 
     phaser.arrive(); 
    } 

    public void pause() { 
     phase = phaser.getPhase(); 
    } 

    public void unpause() throws InterruptedException { 
     phaser.arrive(); 
    } 

    @Override 
    public E take() throws InterruptedException { 
     phaser.awaitAdvance(phase); 

     E result = super.take(); 

     return result; 
    } 
} 

Я думаю, я должен объяснить это решение. Phaser похож, если у CylicBarrier и CountDownLatch был ребенок. Это позволяет повторно использовать барьер, не дожидаясь остановки барьера.

В базовом случае, общий phase будет 0. Так как arrive вызывается в конструкторе внутренней фазе происходит phaser «s равен 1. Таким образом, если take вызывается без pause когда-либо называется awaitAdvance будет называться по 0 ,Так как внутренняя фаза 1 - фаза быстрого пути и является простой летучей нагрузкой (фаза 0 уже произошла, поэтому нам больше не нужно ждать заранее).

Если pause вызывается, общая phase переменным будет обновлен до внутренней фазы фазера, который сейчас 1. Так take будет awaitTermination-заставляя его приостановить.

Прибытие unpause приведет к возникновению всех нитей awaitAdvance Чтобы освободить и увеличить внутреннюю фазу фазера до 2. Опять же, последующий захват будет быстро пройден без соответствующей паузы.

+0

Хороший ответ: я нахожу ваше решение с более элегантным «Phaser», поскольку оно более кратким и не содержит сложных атомных операций. –

+0

@AndreiI Спасибо, надеюсь, что он может работать. Это было весело для меня, мне понадобилось некоторое время, чтобы по-настоящему разобрать его :) –

Смежные вопросы