Я пытаюсь создать приложение для обмена сообщениями в сети, основанное на Spring Websocket Demo, запущенном ActiveMQ в качестве брокера сообщений STOMP с Undertow. Приложение работает нормально на небезопасных соединениях. Тем не менее, мне трудно настроить STOMP Broker Relay для пересылки с помощью SSL-соединений.Spring Boot SSL TCPClient ~ StompBrokerRelayMessageHandler ~ ActiveMQ ~ Undertow
Как упоминалось в весеннем WebSocket Docs ...
«ТОПАЙТЕ брокер реле» в приведенной выше конфигурации является Spring MessageHandler, который обрабатывает сообщения, отправляя их на внешнее сообщение брокеру. Для этого он устанавливает TCP-соединения с брокером, пересылает ему все сообщения, а затем пересылает все сообщения, полученные от брокера, клиентам через их сеансы WebSocket. По сути, он действует как «реле», которое пересылает сообщения в обоих направлениях.
Кроме того, документы утверждают, зависимость от reactor-net, которую я имею ...
Пожалуйста, добавьте зависимость от org.projectreactor: реакторный-сеть для управления соединениями TCP.
Проблема в том, что моя текущая реализация не инициализирует NettyTCPClient через SSL, поэтому соединение ActiveMQ завершается с SSLException.
[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED:
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] »
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...
Таким образом я попытался исследовать Project Reactor Docs установить параметры SSL для соединения, но я не был успешным.
В этот момент я обнаружил, что StompBrokerRelayMessageHandler инициализирует NettyTCPClient по умолчанию в Reactor2TcpClient, но он не представляется настраиваемым.
Помощь была бы принята с благодарностью.
SSCCE
app.props
spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx
WebSocketConfig
//...
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);
private final static String KEYSTORE = "/activemq.jks";
private final static String KEYSTORE_PASS = "xxx";
private final static String KEYSTORE_TYPE = "JKS";
private final static String TRUSTSTORE = "/activemq_certs.jks";
private final static String TRUSTSTORE_PASS = "xxx";
private static String getBindLocation() {
return "stomp+ssl://localhost:8442?transport.needClientAuth=false";
}
@Bean(initMethod = "start", destroyMethod = "stop")
public SslBrokerService activeMQBroker() throws Exception {
final SslBrokerService service = new SslBrokerService();
service.setPersistent(false);
KeyManager[] km = SecurityManager.getKeyManager();
TrustManager[] tm = SecurityManager.getTrustManager();
service.addSslConnector(getBindLocation(), km, tm, null);
final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
service.setDestinations(new ActiveMQDestination[]{topic});
return service;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442);
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/welcome").withSockJS();
registry.addEndpoint("/test").withSockJS();
}
private static class SecurityManager {
//elided...
}
}
SOLVED Per Rossens Рекомендации. Вот подробности реализации для всех заинтересованных.
WebSocketConfig
@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
...
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
ConfigurationReader reader = new StompClientDispatcherConfigReader();
Environment environment = new Environment(reader).assignErrorJournal();
TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
handler.setTcpClient(client);
return handler;
}
}
StompTCPClientSpecFactory
private static class StompTcpClientSpecFactory
implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);
private final String host;
private final int port;
private final String KEYSTORE = "src/main/resources/tcpclient.jks";
private final String KEYSTORE_PASS = "xxx";
private final String KEYSTORE_TYPE = "JKS";
private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
private final String TRUSTSTORE_PASS = "xxx";
private final String TRUSTSTORE_TYPE = "JKS";
private final Environment environment;
private final SecurityManager tcpManager = new SecurityManager
.SSLBuilder(KEYSTORE, KEYSTORE_PASS)
.keyStoreType(KEYSTORE_TYPE)
.trustStore(TRUSTSTORE, TRUSTSTORE_PASS)
.trustStoreType(TRUSTSTORE_TYPE)
.build();
public StompTcpClientSpecFactory(Environment environment, String host, int port) {
this.environment = environment;
this.host = host;
this.port = port;
}
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
return tcpClientSpec
.ssl(new SslOptions()
.sslProtocol("TLS")
.keystoreFile(tcpManager.getKeyStore())
.keystorePasswd(tcpManager.getKeyStorePass())
.trustManagers(tcpManager::getTrustManager)
.trustManagerPasswd(tcpManager.getTrustStorePass()))
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.env(this.environment)
.dispatcher(this.environment.getCachedDispatchers("StompClient").get())
.connect(this.host, this.port);
}
}
Кто бы ни запустил это, я бы * действительно * хотел объяснить ... безжалостным. –
Внутренние устройства позволяют это (https://github.com/reactor/reactor-io/blob/master/reactor-net/src/test/java/reactor/io/net/tcp/TcpServerTests.java#L100), но я «Проверяют ли они их. – smaldini