2015-08-08 4 views
1

Я пишу приложение Java, которое отправляет и получает сообщения с сервера веб-сервера. Когда приложение получает сообщение, может потребоваться некоторое время для его обработки. Поэтому я пытаюсь использовать несколько потоков для получения сообщений. Насколько я понимаю, Grizzly имеет селекторные потоки, а также рабочие потоки. По умолчанию есть 1 селекторный поток и 2 рабочих потока, в следующем примере я пытаюсь увеличить их до 5 и 10 соответственно. В приведенном ниже примере я приостанавливаю поток, который вызывает метод onMessage для 10 секунд для имитации обработки поступающей информации. Информация поступает в каждую секунду, поэтому 10 потоков должны иметь возможность обрабатывать объем трафика. Когда я просматриваю прогон, выполняется только 1 селекторный поток и 2 рабочих потока. Кроме того, сообщения принимаются только через 10 секунд. Указывая, что только 1 поток обрабатывает трафик - я нахожу это очень странным. Во время профилирования один рабочий поток, например, Grizzly(1) получает первое отправленное сообщение. Затем через 10 секунд «Grizzly (2)» получает второе сообщение, затем Grizzly(2) продолжает получать сообщения, а Grizzly(1) не выполняет никаких действий.Threading in javax.websockets/Tyrus

Может кто-нибудь объяснить это странное поведение и как его изменить, например, 10 потоков постоянно ждут очереди для сообщения?

Главная:

public static void main(String[] args) { 
     WebsocketTextClient client = new WebsocketTextClient(); 
     client.connect(); 
     for (int i = 0; i < 60; i++) { 
      client.send("Test message " + i); 
      try { 
       Thread.sleep(1000); 
      } catch (Exception e) { 
       System.out.println("Error sleeping!"); 
      } 
     } 
    } 

WebsocketTextClient.java:

import java.net.URI; 
import javax.websocket.ClientEndpointConfig; 
import javax.websocket.EndpointConfig; 
import javax.websocket.Session; 
import javax.websocket.Endpoint; 
import javax.websocket.MessageHandler; 
import org.glassfish.tyrus.client.ClientManager; 
import org.glassfish.tyrus.client.ThreadPoolConfig; 
import org.glassfish.tyrus.container.grizzly.client.GrizzlyClientProperties; 

public class WebsocketTextClient { 

    private ClientManager client; 
    private ClientEndpointConfig clientConfig; 
    WebsocketTextClientEndpoint endpoint; 

    public WebsocketTextClient() { 
     client = ClientManager.createClient(); 
     client.getProperties().put(GrizzlyClientProperties.SELECTOR_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(5)); 
     client.getProperties().put(GrizzlyClientProperties.WORKER_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(10)); 
    } 

    public boolean connect() { 
     try { 
      clientConfig = ClientEndpointConfig.Builder.create().build(); 
      endpoint = new WebsocketTextClientEndpoint(); 
      client.connectToServer(endpoint, clientConfig, new URI("wss://echo.websocket.org")); 
     } catch (Exception e) { 
      return false; 
     } 
     return true; 
    } 

    public boolean disconnect() { 
     return false; 
    } 

    public boolean send(String message) { 
     endpoint.session.getAsyncRemote().sendText(message); 
     return true; 
    } 

    private class WebsocketTextClientEndpoint extends Endpoint { 
     Session session; 

     @Override 
     public void onOpen(Session session, EndpointConfig config) { 
      System.out.println("Connection opened"); 
      this.session = session; 
      session.addMessageHandler(new WebsocketTextClientMessageHandler()); 
     } 
    } 

    private class WebsocketTextClientMessageHandler implements MessageHandler.Whole<String> { 

     @Override 
     public void onMessage(String message) { 
      System.out.println("Message received from " + Thread.currentThread().getName() + " " + message); 
      try { 
       Thread.sleep(10000); 
      } catch (Exception e) { 
       System.out.println("Error sleeping!"); 
      } 
      System.out.println("Resuming"); 
     } 
    } 
} 
+0

Это может быть проблема с «максимальным размером ядра», см. [Этот вопрос] (http://stackoverflow.com/q/17659510/3080094). То есть попробуйте использовать [setCorePoolSize] (https://tyrus.java.net/apidocs/1.9/org/glassfish/tyrus/client/ThreadPoolConfig.html#setCorePoolSize%28int%29) вместо «maxPoolSize». – vanOekel

+0

Нет, я тоже это пробовал. Однако теперь я обнаружил, что документация для 'messageHandlers' допускает только один поток. – sigvardsen

ответ

3

То, что вы, кажется, просят для WebSockets, чтобы иметь возможность получить несколько сообщений, отправленных тем же соединение клиента, чтобы обработать те сообщения в отдельных потоках и отправлять ответы, когда они готовы, что означает, что это может быть не по порядку. Этот сценарий может произойти только в том случае, если клиент многопоточен.

Для работы с несколькими потоками на одном и том же сеансе WebSocket обычно требуется способность WebSockets мультиплексировать данные, поступающие с клиента и с него. В настоящее время это не функция WebSockets, но, безусловно, может быть построена поверх нее. Тем не менее, мультиплексирование этих потоков клиентов и серверов на одном канале представляет собой довольно сложную задачу, потому что вам необходимо остановить все потоки клиента и сервера от непреднамеренной перезаписи или голодания друг друга.

Спецификация Java для MessageHandler, возможно, немного неоднозначна в отношении модели потоков;

https://docs.oracle.com/javaee/7/api/javax/websocket/MessageHandler.html говорит:

Каждая веб-сокет сеанса использует не более чем один поток одновременно называть его MessageHandlers.

Но важным термином здесь является «разъем сеанс». Если ваш клиент отправляет несколько сообщений в рамках одного сеанса WebSocket, обработчик на стороне сервера будет выполняться в одном потоке. Это не означает, что вы не можете делать много интересного в потоке, особенно если вы используете Input/OutputStreams (или Writers) на обоих концах. Это делает означает, что связь с клиентом осуществляется только одним потоком. Если вы хотите мультиплексировать связь, вам нужно написать что-то поверх сокета, чтобы сделать это; который будет включать разработку собственной модели потоковой передачи для отправки запросов.

Более простым решением было бы создать новый сеанс для каждого запроса клиента.Каждый клиентский запрос запускает сеанс (т. Е. TCP-соединение), отправляет данные и ждет результата. Это дает вам несколько потоков MessageHandler - по одному за сеанс по спецификации.

Это самый простой способ получить многопоточность на стороне сервера; любой другой подход, как правило, требует механизма мультиплексирования, который, в зависимости от вашего варианта использования, возможно, не стоит усилий и, безусловно, несет определенную сложность и риск.

Если вас беспокоит количество сеансов (TCP/HTTP-соединений) между клиентом и серверами, вы можете подумать о создании пула сеансов на стороне клиента и использовать каждый сеанс клиента каждый раз за один раз , возвращая сеанс в пул всякий раз, когда клиент с ним работает.

Наконец, возможно, не имеет прямого отношения: я обнаружил, что, когда я использовал скумбриевидный гидролик Micro служить WebSocket конечной точки, мне нужно, чтобы установить это:

<resources> 
    ... 
    <managed-executor-service maximum-pool-size="200" core-pool-size="10" long-running-tasks="true" keep-alive-seconds="300" hung-after-seconds="300" task-queue-capacity="20000" jndi-name="concurrent/__defaultManagedExecutorService" object-type="system-all"></managed-executor-service> 

по умолчанию ManagedExecutorService только обеспечивает единую нить. Это похоже на Glassfish. Это заставило меня бегать вокруг часами, думая, что я не понимаю модель нарезки, когда это был просто размер пула, который меня путал.