2013-11-08 8 views
1

Я строю систему, в которой ход вызовов потоков зависит от состояния двух переменных. Одна переменная обновляется спорадически внешним источником (отдельно от клиентских потоков) и блоком нескольких клиентских потоков при условии обеих переменных. Система что-то вроде этогоВыполнение всех ожидающих потоков

TypeB waitForB() { // Can be called by many threads. 
    synchronized (B) { 
     while (A <= B) { B.wait(); } 
     A = B; 
     return B; 
    { 
} 

void updateB(TypeB newB) { // Called by one thread. 
    synchronized (B) { 
     B.update(newB); 
     B.notifyAll(); // All blocked threads must receive new B. 
    } 
} 

Мне нужно все блокированные потоки, чтобы получить новое значение B, как только она была обновлена. Но проблема в том, что когда один поток заканчивается и обновляет A, условие ожидания снова становится истинным, поэтому некоторые из других потоков блокируются и не получают новое значение B. Существует ли способ гарантировать, что только последний поток, который был заблокирован на B-обновлениях A или другим способом получения такого поведения?

+0

Проверить, что летучее ключевое слово делает в резьбе – Prateek

+0

не кажется, Он будет синхронизировать на 'Ā' @DanielGabriel. – Gray

+0

@ DanielGabriel A = B не изменяет B. –

ответ

0

Я получил следующую мысль: держать счетчик потоков, ожидающих «хорошего» значения В, первый из них проснулся кэш это хорошая ценность и пусть другие читатели до этого момента прочтут это. Мы оставляем новых читателей из очереди ожидания до тех пор, пока не будут выполнены все предыдущие раундные потоки.

Вот наброски кода:

final AtomicInteger A = new AtomicInteger(-1), B = new AtomicInteger(-1); 
int cachedB = -1; 

int readersCount; 

int waitForB() throws InterruptedException { // Can be called by many threads. 
    synchronized (B) { 
     while (cachedB != -1) B.wait(); 

     readersCount ++; 

     while (A.get() <= B.get()) { B.wait(); } 

     if (cachedB == -1) { 
      cachedB = B.get(); 
      A.set(B.get()); 

      readersCount--; 
      if (readersCount == 0) { cachedB = -1; B.notifyAll(); } 

      return B.get(); 
     } else { 
      int ret = cachedB; 

      readersCount--; 
      if (readersCount == 0) { cachedB = -1; B.notifyAll(); } 

      return ret; 
     } 
    } 
} 

void updateB(int newB) { // Called by one thread. 
    synchronized (B) { 
     B.set(newB); 
     B.notifyAll(); // All blocked threads must receive new B. 
    } 
} 
+0

Привет, Виктор, мне нравится ваше решение! Я думаю, что нашел другое решение, используя CyclicBarrier. Я помечаю ваше решение как правильное и опубликую решение. Благодарю. – Daniel

0

Мое предложение - использовать подход, основанный на событиях, где потоки хотят знать о новом значении B просто зарегистрируйтесь об изменениях! и один поток просто вызывает (запускает) их.
что-то вроде этого.
сначала объявите знак события.

interface EventListener{ 
void onUpdate(TypeB oldOne,TypeB newOne); 
} 

Затем у вас есть реализация в качестве слушателя.

class ManyThread implements EventListener,Runnable{ 
... 
private TypeA a; 
    synchronized void onUpdate(TypeB oldOne,TypeB newOne){ 
    if(!oldOne.equals(newOne)){a=newOne;this.notify();} 
    } 

    public ManyThread(){SingleThread.registerListener(this);} 
    public synchronized void run(){ 
    this.wait();//waiting for an event! 
    //some business 
    } 
... 
} 

затем укажите источник публикации.

final class EventMgr{//would be as a singleton guy too 
    private EventMgr(){} 
    static private java.util.List<EventListener> li=new java.util.ArrayList<EventListener>(); 
    static synchronized public void registerListener(EventListener e){li.add(e);} 
    static synchronized void triggerListeners(TypeB oldOne,TypeB newOne){ 
    for(EventListener e:li){e.onUpdate(oldOne,newOne)} 
    } 
} 

и простой триггер слушателей в EventMgr парень

class SingleThread{ 
    TypeB oldOne,B; 
    void updateB(TypeB newB) { // Called by one thread. 
     synchronized (B) { 
     oldOne=B.clone(); 
     B.update(newB); 
     //B.notifyAll(); 
     EventMgr.triggerListeners(oldOne,B); 
     } 
    } 
} 
+0

Спасибо за ответ, но, к сожалению, я не могу изменить потоки вызывающего клиента. Я думаю о попытке использовать CyclicBarrier или что-то подобное. Это сложная проблема. – Daniel

0

Я не уверен, если это 100% поточно, но я не нашел каких-либо проблем, пока. Идея что-то вроде этого:

CyclicBarrier barrier; 
AtomicInteger count = 0; 

TypeB waitForB() { // Can be called by many threads. 
    synchronized (B) { 
     count++; 
     while (A <= B) { B.wait(); } 
     count--; 
    { 
    if (barrier != null) { barrier.await(); } 
    return B; 
} 

class UpdateA implements Runnable { 
    void run() { 
     A = B; 
    } 
} 

void updateB(TypeB newB) { // Called by one thread. 
    synchronized (B) { 
     B.update(newB); 
     barrier = new CyclicBarrier(count, new UpdateA); 
     B.notifyAll(); // All blocked threads must receive new B. 
    } 
} 
Смежные вопросы