2015-12-17 5 views
3

Я пытаюсь создать приложение для обмена сообщениями в сети, основанное на Spring Websocket Demo, запущенном ActiveMQ в качестве брокера сообщений STOMP с Undertow. Приложение работает нормально на небезопасных соединениях. Тем не менее, мне трудно настроить STOMP Broker Relay для пересылки с помощью SSL-соединений.Spring Boot SSL TCPClient ~ StompBrokerRelayMessageHandler ~ ActiveMQ ~ Undertow

Как упоминалось в весеннем WebSocket Docs ...

«ТОПАЙТЕ брокер реле» в приведенной выше конфигурации является Spring MessageHandler, который обрабатывает сообщения, отправляя их на внешнее сообщение брокеру. Для этого он устанавливает TCP-соединения с брокером, пересылает ему все сообщения, а затем пересылает все сообщения, полученные от брокера, клиентам через их сеансы WebSocket. По сути, он действует как «реле», которое пересылает сообщения в обоих направлениях.

Кроме того, документы утверждают, зависимость от reactor-net, которую я имею ...

Пожалуйста, добавьте зависимость от org.projectreactor: реакторный-сеть для управления соединениями TCP.

Проблема в том, что моя текущая реализация не инициализирует NettyTCPClient через SSL, поэтому соединение ActiveMQ завершается с SSLException.


[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED: 
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442] 
... 
[o.a.a.b.TransportConnection.Transport:245] » 
Transport Connection to: tcp://127.0.0.1:17779 failed: 
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection? 
... 

Таким образом я попытался исследовать Project Reactor Docs установить параметры SSL для соединения, но я не был успешным.

В этот момент я обнаружил, что StompBrokerRelayMessageHandler инициализирует NettyTCPClient по умолчанию в Reactor2TcpClient, но он не представляется настраиваемым.

Помощь была бы принята с благодарностью.

SSCCE


app.props

spring.activemq.in-memory=true 
spring.activemq.pooled=false 
spring.activemq.broker-url=stomp+ssl://localhost:8442 
server.port=8443 
server.ssl.enabled=true 
server.ssl.protocol=tls 
server.ssl.key-alias=undertow 
server.ssl.key-store=classpath:undertow.jks 
server.ssl.key-store-password=xxx 
server.ssl.trust-store=classpath:undertow_certs.jks 
server.ssl.trust-store-password=xxx 

WebSocketConfig

//... 
@Configuration 
@EnableWebSocketMessageBroker 
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { 

    private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class); 

    private final static String KEYSTORE = "/activemq.jks"; 
    private final static String KEYSTORE_PASS = "xxx"; 
    private final static String KEYSTORE_TYPE = "JKS"; 
    private final static String TRUSTSTORE = "/activemq_certs.jks"; 
    private final static String TRUSTSTORE_PASS = "xxx"; 

    private static String getBindLocation() { 
     return "stomp+ssl://localhost:8442?transport.needClientAuth=false"; 
    } 

    @Bean(initMethod = "start", destroyMethod = "stop") 
    public SslBrokerService activeMQBroker() throws Exception { 

     final SslBrokerService service = new SslBrokerService(); 
     service.setPersistent(false); 

     KeyManager[] km = SecurityManager.getKeyManager(); 
     TrustManager[] tm = SecurityManager.getTrustManager(); 

     service.addSslConnector(getBindLocation(), km, tm, null); 
     final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test"); 
     service.setDestinations(new ActiveMQDestination[]{topic}); 

     return service; 
    } 


    @Override 
    public void configureMessageBroker(MessageBrokerRegistry config) { 
     config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442); 
     config.setApplicationDestinationPrefixes("/app"); 
    } 

    @Override 
    public void registerStompEndpoints(StompEndpointRegistry registry) { 
     registry.addEndpoint("/welcome").withSockJS(); 
     registry.addEndpoint("/test").withSockJS(); 
    } 

    private static class SecurityManager { 
    //elided... 
    } 

} 

SOLVED Per Rossens Рекомендации. Вот подробности реализации для всех заинтересованных.


WebSocketConfig

@Configuration 
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration { 
    ... 
    @Bean 
    public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() { 
     StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler(); 
     ConfigurationReader reader = new StompClientDispatcherConfigReader(); 
     Environment environment = new Environment(reader).assignErrorJournal(); 
     TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443)); 
     handler.setTcpClient(client); 
     return handler; 
    } 
} 

StompTCPClientSpecFactory

private static class StompTcpClientSpecFactory 
     implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> { 

    private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class); 

    private final String host; 
    private final int port; 
    private final String KEYSTORE = "src/main/resources/tcpclient.jks"; 
    private final String KEYSTORE_PASS = "xxx"; 
    private final String KEYSTORE_TYPE = "JKS"; 
    private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks"; 
    private final String TRUSTSTORE_PASS = "xxx"; 
    private final String TRUSTSTORE_TYPE = "JKS"; 
    private final Environment environment; 

    private final SecurityManager tcpManager = new SecurityManager 
      .SSLBuilder(KEYSTORE, KEYSTORE_PASS) 
      .keyStoreType(KEYSTORE_TYPE) 
      .trustStore(TRUSTSTORE, TRUSTSTORE_PASS) 
      .trustStoreType(TRUSTSTORE_TYPE) 
      .build(); 

    public StompTcpClientSpecFactory(Environment environment, String host, int port) { 
     this.environment = environment; 
     this.host = host; 
     this.port = port; 
    } 

    @Override 
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
      Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) { 

     return tcpClientSpec 
       .ssl(new SslOptions() 
         .sslProtocol("TLS") 
         .keystoreFile(tcpManager.getKeyStore()) 
         .keystorePasswd(tcpManager.getKeyStorePass()) 
         .trustManagers(tcpManager::getTrustManager) 
         .trustManagerPasswd(tcpManager.getTrustStorePass())) 
       .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())) 
       .env(this.environment) 
       .dispatcher(this.environment.getCachedDispatchers("StompClient").get()) 
       .connect(this.host, this.port); 
    } 
} 
+1

Кто бы ни запустил это, я бы * действительно * хотел объяснить ... безжалостным. –

+0

Внутренние устройства позволяют это (https://github.com/reactor/reactor-io/blob/master/reactor-net/src/test/java/reactor/io/net/tcp/TcpServerTests.java#L100), но я «Проверяют ли они их. – smaldini

ответ

6

StompBrokerRelayMessageHandler имеет свойство tcpClient, которое вы можете установить. Однако похоже, что мы не показываем это через установку WebSocketMessageBrokerConfigurer.

Вы можете удалить @EnableWebSocketMessageBroker и расширить вместо этого DelegatingWebSocketMessageBrokerConfiguration. Это фактически то же самое, но вы теперь распространяетесь непосредственно из класса конфигурации, который предоставляет все компоненты.

Это позволяет затем переопределить фасоль stompBrokerRelayMessageHandler() и сразу установить его свойство TcpClient. Просто убедитесь, что метод переопределения отмечен @Bean.

+0

Спасибо, Россен ... Позвольте мне дать это сэр. –

+0

Просто хотел поблагодарить вас за вашу помощь ... взял меня на время, но у меня наконец-то это работает ... вы, рок-товарищ :) –

3

Мне нужно было обеспечить защиту брокерской службы STOMP до RabbitMQ, используя Spring Messaging 4.2.5 с Java 8, и обнаружил, что последующий код вопроса устарел.

При запуске приложения я предоставляю свойства среды доверия, чтобы доверять внутреннему самозаверяющему центру сертификации. java -Djavax.net.ssl.trustStore=/etc/pki/java/server.jks -Djavax.net.ssl.trustStorePassword=xxxxx -jar build/libs/server.war

ответ

Per Росен, я изменил

@Configuration 
@EnableWebSocketMessageBroker 
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { 

в

@Configuration 
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration { 

Тогда, в этом WebSocketConfig я представил свой собственный AbstractBrokerMessageHandler боб:

@Bean 
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() { 
    AbstractBrokerMessageHandler handler = super.stompBrokerRelayMessageHandler(); 
    if (handler instanceof StompBrokerRelayMessageHandler) { 
     ((StompBrokerRelayMessageHandler) handler).setTcpClient(new Reactor2TcpClient<>(
       new StompTcpFactory("127.0.0.1", 61614, true) 
     )); 
    } 
    return handler; 
} 

InstanceOf условно Вашингтон s для упрощения использования NoOpBrokerMessageHandler в модульных тестах.

И, наконец, следующее реализация StompTcpFactory используется выше:

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> { 

    private final Environment environment = new Environment(new SynchronousDispatcherConfigReader()); 
    private final String host; 
    private final int port; 
    private final boolean ssl; 

    public StompTcpFactory(String host, int port, boolean ssl) { 
     this.host = host; 
     this.port = port; 
     this.ssl = ssl; 
    } 

    @Override 
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) { 
     return tcpClientSpec 
       .env(environment) 
       .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())) 
       .ssl(ssl ? new SslOptions() : null) 
       .connect(host, port); 
    } 

    private static class SynchronousDispatcherConfigReader implements ConfigurationReader { 
     @Override 
     public ReactorConfiguration read() { 
      return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties()); 
     } 
    } 

} 
+0

Отличное решение! Это действительно не должно быть так сложно.Я должен был настроить SSLC-текст по умолчанию для виртуальной машины перед возвратом объекта 'tcpClientSpec', потому что код реактора делает классический отказ от предположения, что хранилище ключей существует как физический файл на диске. –

+0

Просто отлично! Я провожу несколько часов, чтобы попытаться подключить брокерское реле с amazonMQ (aws) с SSL-соединением без успеха. Ваша конфигурация работает как шарм. Есть ли открытая проблема для этого в весенних сообщениях? –

+0

@MartinChoraine же здесь! Потрясающие. Кстати, по какой-то причине ему не нравится префикс 'stomp + ssl: //', предоставляемый строкой соединения. –

1

@amoebob ответ велик, но нити не закрываются должным образом. Каждый раз, когда связь с клиентом открыта, новый поток открыт и никогда не закрывается. Я обнаружил эту проблему в производстве и потратил несколько дней на ее решение. Поэтому я предлагаю вам изменить StompTcpFactory, чтобы улучшить повторное использование потоков:

import io.netty.channel.EventLoopGroup; 
import org.springframework.messaging.Message; 
import org.springframework.messaging.simp.stomp.Reactor2StompCodec; 
import org.springframework.messaging.simp.stomp.StompDecoder; 
import org.springframework.messaging.simp.stomp.StompEncoder; 
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; 
import reactor.Environment; 
import reactor.core.config.ReactorConfiguration; 
import reactor.io.net.NetStreams; 
import reactor.io.net.Spec; 
import reactor.io.net.config.SslOptions; 
import reactor.io.net.impl.netty.NettyClientSocketOptions; 

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> { 

    private final Environment environment; 
    private final EventLoopGroup eventLoopGroup; 
    private final String host; 
    private final int port; 
    private final boolean ssl; 

    public StompTcpFactory(String host, int port, boolean ssl) { 
    this.host = host; 
    this.port = port; 
    this.ssl = ssl; 
    this.environment = new Environment(() -> new ReactorConfiguration(emptyList(), "sync", new Properties())); 
    this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup(); 
    } 

    @Override 
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) { 
    return tcpClientSpec 
      .env(environment) 
      .options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup)) 
      .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())) 
      .ssl(ssl ? new SslOptions() : null) 
      .connect(host, port); 
    } 

} 
+0

Я получаю частный доступ 'Reactor2TcpClient.initEventLoopGroup();', используя 'Reactor 2.0.8-RELEASE' –

+0

Вы должны найти' Reactor2TcpClient' из 'org.springframework.messaging.tcp.reactor' в режиме весеннего обмена сообщениями. Я использую версию '4.3.13' –

+0

Хорошая сделка, получил ее. Я все еще на 'Spring Boot 1.4.2', поэтому пришлось вручную переходить к некоторым версиям зависимостей для« Spring Messaging ». Виве-ла-Франс! –

Смежные вопросы