2014-10-02 4 views
1

У меня есть приложение, которое отправляет сообщения AMQP через RabbitMQ. отправка сообщения запускается по запросу http. Недавно я заметил, что некоторые сообщения, кажется, теряются (как никогда раньше). Я также заметил, что список каналов, управляемых сервером, неуклонно растет. Первое, что я исправил, это закрыть каналы после того, как они больше не требуются. Тем не менее, я все еще не уверен, что мой код правильно структурирован для обеспечения доставки. Ниже приведены два раздела кода; первый - это раздел одного синглета, который управляет соединением (не воссоздается при каждом вызове), второй - код отправки. Любые рекомендации/рекомендации будут оценены.Надежные сообщения с RabbitMQ

@Service 
public class PersistentConnection { 
    private static Connection myConnection = null; 
    private Boolean blocked = false; 

    @Autowired ApplicationConfiguration applicationConfiguration; 
    @Autowired ConfigurationService configurationService; 

    @PostConstruct 
    private void init() { 
    } 

    @PreDestroy 
    private void destroy() { 
     try { 
      myConnection.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    public Connection getConnection() { 
     if (myConnection == null) { 
      start(); 
     } 
     else if (!myConnection.isOpen()) { 
      log.warn("AMQP Connection closed. Attempting to start."); 
      start(); 
     } 
     return myConnection; 
    } 


    private void start() { 
     log.debug("Building AMQP Connection"); 

     ConnectionFactory factory = new ConnectionFactory(); 
     String ipAddress = applicationConfiguration.getAMQPHost(); 
     String password = applicationConfiguration.getAMQPUser(); 
     String user = applicationConfiguration.getAMQPPassword(); 
     String virtualHost = applicationConfiguration.getAMQPVirtualHost(); 
     String port = applicationConfiguration.getAMQPPort(); 

     try { 
      factory.setUsername(user); 
      factory.setPassword(password); 
      factory.setVirtualHost(virtualHost); 
      factory.setPort(Integer.parseInt(port)); 
      factory.setHost(ipAddress); 
      myConnection = factory.newConnection(); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 
     } 

     myConnection.addBlockedListener(new BlockedListener() { 
      public void handleBlocked(String reason) throws IOException { 
       // Connection is now blocked 
       blocked = true; 
      } 

      public void handleUnblocked() throws IOException { 
       // Connection is now unblocked 
       blocked = false; 
      } 
     }); 
    } 

    public Boolean isBlocked() { 
     return blocked; 
    } 
} 

/* 
* Sends ADT message to AMQP server. 
*/ 
private void send(String routingKey, String message) throws Exception { 
    String exchange = applicationConfiguration.getAMQPExchange(); 
    String exchangeType = applicationConfiguration.getAMQPExchangeType(); 

    Connection connection = myConnection.getConnection(); 
    Channel channel = connection.createChannel(); 
    channel.exchangeDeclare(exchange, exchangeType); 
    channel.basicPublish(exchange, routingKey, null, message.getBytes()); 

    // Close the channel if it is no longer needed in this thread 
    channel.close(); 
} 
+1

Может ли 'getConnection()' вызываться из большего количества потоков? если да, то код не в потокобезопасном. – Gabriele

+0

Входит запрос HTTP, в результате которого вызывает вызов getConnection(). Так что я думаю, это возможно. Я подумал, сделав это синглтон, который рассмотрит эту проблему. Каков наилучший способ улучшить код? – skyman

ответ

2

Попробуйте этот код:

@Service 
public class PersistentConnection { 
    private Connection myConnection = null; 
    private Boolean blocked = false; 

    @Autowired ApplicationConfiguration applicationConfiguration; 
    @Autowired ConfigurationService configurationService; 

    @PostConstruct 
    private void init() { 
     start(); /// In this way you can initthe connection and you are sure it is called only one time. 
    } 

    @PreDestroy 
    private void destroy() { 
     try { 
      myConnection.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    public Connection getConnection() { 
     return myConnection; 
    } 


    private void start() { 
     log.debug("Building AMQP Connection"); 

     ConnectionFactory factory = new ConnectionFactory(); 
     String ipAddress = applicationConfiguration.getAMQPHost(); 
     String password = applicationConfiguration.getAMQPUser(); 
     String user = applicationConfiguration.getAMQPPassword(); 
     String virtualHost = applicationConfiguration.getAMQPVirtualHost(); 
     String port = applicationConfiguration.getAMQPPort(); 

     try { 
      factory.setUsername(user); 
      factory.setPassword(password); 
      factory.setVirtualHost(virtualHost); 
      factory.setPort(Integer.parseInt(port)); 
      factory.setHost(ipAddress); 
      myConnection = factory.newConnection(); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 
     } 

     myConnection.addBlockedListener(new BlockedListener() { 
      public void handleBlocked(String reason) throws IOException { 
       // Connection is now blocked 
       blocked = true; 
      } 

      public void handleUnblocked() throws IOException { 
       // Connection is now unblocked 
       blocked = false; 
      } 
     }); 
    } 

    public Boolean isBlocked() { 
     return blocked; 
    } 
} 

/* 
* Sends ADT message to AMQP server. 
*/ 
private void send(String routingKey, String message) throws Exception { 
    String exchange = applicationConfiguration.getAMQPExchange(); 
    String exchangeType = applicationConfiguration.getAMQPExchangeType(); 

    Connection connection = myConnection.getConnection(); 
    if (connection!=null){ 
    Channel channel = connection.createChannel(); 
    try{ 
    channel.exchangeDeclare(exchange, exchangeType); 
    channel.basicPublish(exchange, routingKey, null, message.getBytes()); 
    } finally{ 
     // Close the channel if it is no longer needed in this thread 
     channel.close(); 
    } 

}}

Это может быть достаточно, у вас есть связь с RabbitMQ при запуске системы.

Если вы ленивый синглтон, код немного отличается.

Я предлагаю не использовать isOpen() метод, пожалуйста, прочитайте here:

IsOpen

булево IsOpen() Определить, является ли в настоящее время открыт компонент. Вернет false, если мы сейчас закрываем. Проверка этого метода должна быть только для информации, из-за условий гонки - состояние может измениться после разговора. Вместо того, чтобы просто выполнить и попытаться поймать ShutdownSignalException и IOException Возвращает: true, если компонент открыт, иначе ложь

EDIT **

Вопрос 1:

Что вы ищете является клиент HA.

RabbitMQ Java-клиент по умолчанию не поддерживает эти функции, так как версия 3.3.0 поддерживает только переподключение, читайте this:

... позволяет Java-клиентов автоматически восстановить соединение после сети отказ. Если вы хотите быть уверенным в своих сообщениях, вам нужно создать надежный клиент, способный сопротивляться всем.

Как правило, вы должны учитывать сбой, например: Что произойдет, если в ходе сообщения опубликовано сообщение об ошибке?

В вашем случае вы просто теряете сообщение, вы должны перематывать сообщение вручную.

Вопрос 2:

Я не знаю, ваш код, но connection == null не должно произойти, потому что эта процедура вызывается для первого:

@PostConstruct 
    private void init() { 
     start(); /// In this way you can initthe connection and you are sure it is called only one time. 
    } 

В любом случае вы можете вызвать исключение, вопрос is: Что я должен сделать с сообщением, которое я пытался отправить?

Смотрите вопрос 1

Я хотел бы предложить, чтобы прочитать больше о HA, например, это:

Создать надежную систему с rabbitmq не сложно, но вы должны знать некоторые основные понятия.

В любом случае .. Дайте мне знать!

+0

Большое спасибо за это. Я буду работать над этим сегодня вечером и загружать код завтра и обновлять вас. – skyman

+0

Я только начинаю смотреть на это, и у меня есть пара вопросов. Во-первых, что произойдет, если перезапустить сервер RabbitMQ (например) - я предполагаю, что соединение не удастся и не будет автоматически подключаться при запуске сервера? Есть ли способ автоматически подключиться? Кроме того, я думал, что метод send должен, вероятно, генерировать исключение, если соединение == null - это имеет смысл? – skyman

+1

@skyman Я отредактировал ответ – Gabriele

Смежные вопросы