2014-11-14 2 views
2

У меня есть следующий код:Java - Несколько очередей производитель потребительских

while(!currentBoard.boardIsValid()){ 
     for (QueueLocation location : QueueLocation.values()){ 
      while(!inbox.isEmpty(location)){ 
       Cell c = inbox.dequeue(location); 
       notifyNeighbours(c.x, c.y, c.getCurrentState(),previousBoard); 
      } 
     } 
    } 

У меня потребителя несколько очередей (все их методы синхронизируются). Одна очередь для каждого производителя. Потребитель перебирает все очереди и проверяет, есть ли у них задача для его потребления. Если в очереди, в которой он проверяет, есть задача, она его потребляет. В противном случае он переходит к проверке следующей очереди, пока не закончит итерацию по всем очередям.

На данный момент, если он выполняет итерации по всем очередям, и все они пустые, он продолжает зацикливаться, а не ждет, когда один из них будет содержать что-то (как видно внешним while).

Как заставить пользователя подождать, пока в одной из очередей не появится что-то?

У меня возникла проблема со следующим сценарием: Допустим, что всего 2 очереди. Потребитель проверил первый, и он был пуст. Так же, как он проверяет второй (который также пуст), производитель помещает что-то в первую очередь. Что касается потребителя, очереди остаются пустыми, и поэтому он должен ждать (хотя один из них больше не пуст, и он должен продолжать цикл).

Редактировать: Последнее обновление. Это упражнение для меня. Я пытаюсь реализовать синхронизацию самостоятельно. Поэтому, если в какой-либо из java-библиотек есть решение, которое реализует это, я не заинтересован в этом. Я пытаюсь понять, как я могу это реализовать.

+0

Если вы не можете изменить производителей, чтобы сделать дополнительный сигнал/уведомление, существует два типа решения: 1) использовать тайм-аут и найти компромисс между потреблением процессора и отказоустойчивостью или 2) использовать дополнительные потоки, ожидающие очередей и уведомление потребителя. Дополнительные потоки второго подхода будут потреблять больше памяти, но не будут значительного времени процессора, поскольку они в основном ждут. – Holger

ответ

1

@Abe был близок. Я буду использовать сигнал и ждать - используйте встроенные объекты класса Object, поскольку они являются самым легким весом.

Object sync = new Object(); // Can use an existing object if there's an appropriate one 

// On submit to queue 
synchronized (sync) { 
    queue.add(...); // Must be inside to avoid a race condition 
    sync.notifyAll(); 
} 

// On check for work in queue 
synchronized (sync) { 
    item = null; 
    while (item == null) { 
     // Need to check all of the queues - if there will be a large number, this will be slow, 
     // and slow critical sections (synchronized blocks) are very bad for performance 
     item = getNextQueueItem(); 
     if (item == null) { 
      sync.wait(); 
     } 
    } 
} 

Обратите внимание, что sync.wait освобождает замок на синхронизацию, пока не уведомит - и замок на синхронизации требуется для успешного вызова метода ожидания (это напоминание программисту, что некоторый тип критической секции действительно необходимо для этого надежно работать).

Кстати, я бы рекомендовал очередь, предназначенную для потребителя (или группы потребителей), а не очередь, предназначенную для производителя, если это возможно. Это упростит решение.

0

Если вы хотите блокировать несколько очередей, тогда один из вариантов - использовать .

Поэтому всякий раз, когда у производителя есть данные, он должен вызывать signallAll.

Lock fileLock = new ReentrantLock(); 
Condition condition = fileLock.newCondition(); 
... 
// producer has to signal 
condition.signalAll(); 
... 
// consumer has to await. 
condition.await(); 

Таким образом, только когда сигнал будет предоставлен, потребитель пойдет и проверяет очереди.

0

Я решил аналогичную ситуацию в соответствии с тем, что предлагает @Abe, но остановился на использовании Semaphore в сочетании с AtomicBoolean и назвал его BinarySemaphore. Это требует, чтобы производители были модифицированы, чтобы они сигнализировали, когда что-то делать.
Ниже код для BinarySemaphore и общее представление о том, что потребитель работа контур должен выглядеть следующим образом:

import java.util.concurrent.Semaphore; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicBoolean; 

public class MultipleProdOneConsumer { 

BinarySemaphore workAvailable = new BinarySemaphore(); 

class Consumer { 

    volatile boolean stop; 

    void loop() { 

     while (!stop) { 
      doWork(); 
      if (!workAvailable.tryAcquire()) { 
       // waiting for work 
       try { 
        workAvailable.acquire(); 
       } catch (InterruptedException e) { 
        if (!stop) { 
         // log error 
        } 
       } 
      } 
     } 
    } 

    void doWork() {} 

    void stopWork() { 
     stop = true; 
     workAvailable.release(); 
    } 
} 

class Producer { 

    /* Must be called after work is added to the queue/made available. */ 
    void signalSomethingToDo() { 
     workAvailable.release(); 
    } 
} 

class BinarySemaphore { 

    private final AtomicBoolean havePermit = new AtomicBoolean(); 
    private final Semaphore sync; 

    public BinarySemaphore() { 
     this(false); 
    } 

    public BinarySemaphore(boolean fair) { 
     sync = new Semaphore(0, fair); 
    } 

    public boolean release() { 

     boolean released = havePermit.compareAndSet(false, true); 
     if (released) { 
      sync.release(); 
     } 
     return released; 
    } 

    public boolean tryAcquire() { 

     boolean acquired = sync.tryAcquire(); 
     if (acquired) { 
      havePermit.set(false); 
     } 
     return acquired; 
    } 

    public boolean tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException { 

     boolean acquired = sync.tryAcquire(timeout, tunit); 
     if (acquired) { 
      havePermit.set(false); 
     } 
     return acquired; 
    } 

    public void acquire() throws InterruptedException { 

     sync.acquire(); 
     havePermit.set(false); 
    } 

    public void acquireUninterruptibly() { 

     sync.acquireUninterruptibly(); 
     havePermit.set(false); 
    } 

} 

}