У меня есть приложение, которое отправляет сообщения AMQP через RabbitMQ. отправка сообщения запускается по запросу http. Недавно я заметил, что некоторые сообщения, кажется, теряются (как никогда раньше). Я также заметил, что список каналов, управляемых сервером, неуклонно растет. Первое, что я исправил, это закрыть каналы после того, как они больше не требуются. Тем не менее, я все еще не уверен, что мой код правильно структурирован для обеспечения доставки. Ниже приведены два раздела кода; первый - это раздел одного синглета, который управляет соединением (не воссоздается при каждом вызове), второй - код отправки. Любые рекомендации/рекомендации будут оценены.Надежные сообщения с RabbitMQ
@Service
public class PersistentConnection {
private static Connection myConnection = null;
private Boolean blocked = false;
@Autowired ApplicationConfiguration applicationConfiguration;
@Autowired ConfigurationService configurationService;
@PostConstruct
private void init() {
}
@PreDestroy
private void destroy() {
try {
myConnection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public Connection getConnection() {
if (myConnection == null) {
start();
}
else if (!myConnection.isOpen()) {
log.warn("AMQP Connection closed. Attempting to start.");
start();
}
return myConnection;
}
private void start() {
log.debug("Building AMQP Connection");
ConnectionFactory factory = new ConnectionFactory();
String ipAddress = applicationConfiguration.getAMQPHost();
String password = applicationConfiguration.getAMQPUser();
String user = applicationConfiguration.getAMQPPassword();
String virtualHost = applicationConfiguration.getAMQPVirtualHost();
String port = applicationConfiguration.getAMQPPort();
try {
factory.setUsername(user);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setPort(Integer.parseInt(port));
factory.setHost(ipAddress);
myConnection = factory.newConnection();
}
catch (Exception e) {
e.printStackTrace();
}
myConnection.addBlockedListener(new BlockedListener() {
public void handleBlocked(String reason) throws IOException {
// Connection is now blocked
blocked = true;
}
public void handleUnblocked() throws IOException {
// Connection is now unblocked
blocked = false;
}
});
}
public Boolean isBlocked() {
return blocked;
}
}
/*
* Sends ADT message to AMQP server.
*/
private void send(String routingKey, String message) throws Exception {
String exchange = applicationConfiguration.getAMQPExchange();
String exchangeType = applicationConfiguration.getAMQPExchangeType();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, exchangeType);
channel.basicPublish(exchange, routingKey, null, message.getBytes());
// Close the channel if it is no longer needed in this thread
channel.close();
}
Может ли 'getConnection()' вызываться из большего количества потоков? если да, то код не в потокобезопасном. – Gabriele
Входит запрос HTTP, в результате которого вызывает вызов getConnection(). Так что я думаю, это возможно. Я подумал, сделав это синглтон, который рассмотрит эту проблему. Каков наилучший способ улучшить код? – skyman