1

У меня есть серверная программа, которая принимает клиентские соединения. Эти клиентские соединения могут принадлежать многим потокам. Например, два или более клиентов могут принадлежать одному и тому же потоку. Из этих потоков я должен пройти одно сообщение, но мне нужно подождать, пока все потоки не будут установлены. Для этого я поддерживаю следующую структуру данных.Как сделать поток до тех пор, пока переменная не достигнет определенного значения (многопоточная Java)

ConcurrentHashMap<Integer, AtomicLong> conhasmap = new ConcurrentHashMap<Integer, AtomicLong>(); 

Целое - это идентификатор потока и длинный номер клиента. Чтобы сделать один поток для данного потока ждать, пока AtomicLong не достигнет определенного значения, я использовал следующий цикл. Фактически первый пакет потока помещает идентификатор потока и количество ожидающих соединений. С каждым соединением я уменьшаю соединения для ожидания.

while(conhasmap.get(conectionID) != new AtomicLong(0)){ 
     // Do nothing 
} 

Однако этот цикл блокирует другие потоки. В соответствии с этим answer он выполняет изменчивое считывание. Как я могу изменить код для ожидания правильного потока для данного потока до тех пор, пока он не достигнет определенного значения?

+0

Почему бы не использовать thread.wait() и разбудить его после того, как ваш сервер принимает новое соединение? Другим менее элегантным решением было бы некоторое время спать и проверить ценность. – PbxMan

+1

Использование ['Условие'] (http: // docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Condition.html), может быть? – fge

+0

@PbxMan Я жду поток, уже принятый сервером. Да, спят некоторое время, и чтение значения может сработать, однако я ищу лучший способ. – user340

ответ

2

Ваше выражение:

conhasmap.get(conectionID) != new AtomicLong(0) 

всегда будет верным, потому что вы сравниваете ссылки на объекты, которые никогда не будут равны, а не значений. Чем лучше выражение будет:

conhasmap.get(conectionID).longValue() != 0L) 

, но цикл, как это без ожидания/уведомит логики внутри цикла не является хорошей практикой, поскольку она использует время процессора постоянно. Вместо этого каждый поток должен вызывать .wait() в экземпляре AtomicLong, а когда он уменьшался или увеличивался, вы должны вызвать .notifyAll() в экземпляре AtomicLong, чтобы разбудить каждый ожидающий поток, чтобы проверить выражение. Класс AtomicLong уже может вызывать метод notifyAll() всякий раз, когда он изменяется, но я точно не знаю.

AtomicLong al = conhasmap.get(conectionID); 
synchronized(al) { 
    while(al.longValue() != 0L) { 
     al.wait(100); //wait up to 100 millis to be notified 
    } 
} 

В коде, что приращение/декремент, это будет выглядеть следующим образом:

AtomicLong al = conhasmap.get(conectionID); 
synchronized(al) { 
    if(al.decrementAndGet() == 0L) { 
     al.notifyAll(); 
    } 
} 

лично я бы не использовать AtomicLong для этого счетчика, потому что вы не выгоды от блокировки меньше поведения AtomicLong. Просто используйте java.lang.Long вместо этого, потому что вам все равно нужно синхронизировать объект-счетчик для логики wait()/notify().

+0

Спасибо за указание сравнения объектов. Однако решение все еще не работает. – user340

+0

Считаете ли вы, что блокировка замка последовательна как блокировка? Код переходит в бесконечный цикл во время цикла – user340

+0

Возможно, у вас может быть ситуация, которая слишком часто уменьшает счетчик. Изменение условий при быть: в то время как (al.longValue()> 0L) И если условие было: если (al.decrementAndGet() <= 0L) – Palamino

3

Если вы используете Java 8, CompletableFuture может быть в хорошей форме. Вот полный, надуманный пример, который ожидает, что 5 клиентов смогут подключиться и отправить сообщение на сервер (имитируется с использованием BlockingQueue с предложением/опросом).

В этом примере, когда достигнуто ожидаемое количество подключенных сообщений клиента, завершен крючок CompletableFuture, который затем запускает произвольный код в любом потоке по вашему выбору.

В этом примере у вас нет сложных ожидающих/уведомляющих установок потока или циклов ожидания ожидания.

package so.thread.state; 

import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicBoolean; 
import java.util.concurrent.atomic.AtomicLong; 

public class Main { 

    public static String CONNETED_MSG = "CONNETED"; 
    public static Long EXPECTED_CONN_COUNT = 5L; 

    public static ExecutorService executor = Executors.newCachedThreadPool(); 
    public static BlockingQueue<String> queue = new LinkedBlockingQueue<>(); 

    public static AtomicBoolean done = new AtomicBoolean(false); 

    public static void main(String[] args) throws Exception { 

    // add a "server" thread 
    executor.submit(() -> server()); 

    // add 5 "client" threads 
    for (int i = 0; i < EXPECTED_CONN_COUNT; i++) { 
     executor.submit(() -> client()); 
    } 

    // clean shut down 
    Thread.sleep(TimeUnit.SECONDS.toMillis(1)); 
    done.set(true); 
    Thread.sleep(TimeUnit.SECONDS.toMillis(1)); 
    executor.shutdown(); 
    executor.awaitTermination(1, TimeUnit.SECONDS); 

    } 

    public static void server() { 

    print("Server started up"); 
    // track # of client connections established 
    AtomicLong connectionCount = new AtomicLong(0L); 

    // at startup, create my "hook" 
    CompletableFuture<Long> hook = new CompletableFuture<>(); 
    hook.thenAcceptAsync(Main::allClientsConnected, executor); 

    // consume messages 
    while (!done.get()) { 
     try { 
     String msg = queue.poll(5, TimeUnit.MILLISECONDS); 
     if (null != msg) { 
      print("Server received client message"); 
      if (CONNETED_MSG.equals(msg)) { 
      long count = connectionCount.incrementAndGet(); 

      if (count >= EXPECTED_CONN_COUNT) { 
       hook.complete(count); 
      } 
      } 
     } 

     } catch (Exception e) { 
     e.printStackTrace(); 
     } 
    } 

    print("Server shut down"); 

    } 

    public static void client() { 
    queue.offer(CONNETED_MSG); 
    print("Client sent message"); 
    } 

    public static void allClientsConnected(Long count) { 
    print("All clients connected, count: " + count); 
    } 


    public static void print(String msg) { 
    System.out.println(String.format("[%s] %s", Thread.currentThread().getName(), msg)); 
    } 
} 

Вы получаете выход как этот

[pool-1-thread-1] Server started up 
[pool-1-thread-5] Client sent message 
[pool-1-thread-3] Client sent message 
[pool-1-thread-2] Client sent message 
[pool-1-thread-6] Client sent message 
[pool-1-thread-4] Client sent message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-4] All clients connected, count: 5 
[pool-1-thread-1] Server shut down 
+0

Я не знаю количество клиентских подключений заранее. Там может быть много соединений, и эти соединения имеют разные потоки. Я должен ждать определенного потока. – user340

+0

@kani, это надуманный пример, вы можете использовать такой шаблон с любой логикой (та же логика логики, что и сейчас), а затем использовать 'CompletableFuture.complete (..)', я просто пытаюсь показать новую парадигму, которую Java представила, чтобы сделать состав сложного асинхронного поведения более понятным, чем рассуждать вокруг 'Object.wait()' и 'Object.notify()' – Alex

+0

. Мои потоки клиентов сгруппированы на основе идентификатора связи. например, если имеется 15 клиентских соединений 10, принадлежащих одному сообщению A и 5, относятся к соединению B. Я должен подождать 10 потоков для A и 5 потоков для B. Возможно ли это с CompletableFuture? Я вижу, это требует, чтобы потоки собирались с исполнителем, и я не могу отличить поток связи при запуске потока. – user340

Смежные вопросы

 Смежные вопросы