2016-09-16 2 views
0

Я пытаюсь UNSUBSCRIBE прочных подписчиков из ТЕМЫ.Отказаться от подписчиков на долговечность с помощью ActiveMQ

Мое приложение - это своего рода социальная сеть: каждый пользователь является темой для других пользователей. Таким образом, каждый раз, когда пользователь что-то делает, его друзья уведомляются. Конечно, абонент может отказаться от подписки на тему, желая получать уведомления о пользователе больше.

Каждый раз, когда я пытаюсь отказаться подписчиком от темы, я получил сообщение об ошибке говорит мне, что: «javax.jms.JMSException: Прочный потребитель в использовании»

Вот мои 2 класса, SENDER и RECEIVER. Может кто-нибудь сказать мне, что я делаю неправильно?

ОТПРАВИТЕЛЬ Класс:

package com.citizenweb.classes; 

import java.util.Date; 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageFormatException; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 
import javax.jms.ObjectMessage; 
import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQSession; 

import com.citizenweb.interfaces.PostIF; 
import com.citizenweb.interfaces.UserIF; 

public class Sender { 

    private ActiveMQConnectionFactory factory = null; 
    private ActiveMQConnection connection = null; 
    private ActiveMQSession session = null; 
    private Destination destination = null; 
    private MessageProducer producer = null; 

    public Sender() { 
    } 

    public void connect(){ 
     try{ 
      factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); 
      // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production 
      factory.setTrustAllPackages(true); 
      connection = (ActiveMQConnection) factory.createConnection(); 
      connection.start(); 
      session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     } catch (JMSException e){ 
      e.printStackTrace(); 
     } 
    } 

    public void sendPost(UserIF user,PostIF post) { 
     if(session==null){connect();} 
     try { 
      destination = session.createTopic(user.toString()); 
      producer = session.createProducer(destination); 
      ObjectMessage postMessage = session.createObjectMessage(); 
      postMessage.setObject(post); 
      producer.send(postMessage); 
      System.out.println("\n SENDER Object message sent"); 



     } catch (MessageFormatException e) { 
      e.printStackTrace(); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void sendInformation(UserIF user,String info){ 
     if(session==null){connect();} 
     try { 
      destination = session.createTopic(user.toString()); 
      producer = session.createProducer(destination); 
      TextMessage infoMessage = session.createTextMessage(); 
      infoMessage.setText(info); 
      producer.send(infoMessage); 
      System.out.println("\n SENDER Information message sent"); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    /** 
    * @param args 
    * @throws Exception 
    */ 
    public static void main(String[] args) throws Exception { 

     UserIF u1, u2, u3; 
     String[] nom = new String[5]; 
     String[] prenom = new String[5]; 
     String[] login = new String[5]; 
     String[] password = new String[5]; 
     Date[] naiss = new Date[5]; 
     String[] mail = new String[5]; 
     for (int i = 0; i < 5; i++) { 
      nom[i] = "nom_" + i; 
      prenom[i] = "prenom_" + i; 
      login[i] = "login_" + i; 
      password[i] = "password_" + i; 
      naiss[i] = new Date(); 
      mail[i] = "mail_" + i; 
     } 

     System.out.println("\n SENDER AFFECTATION DES NOMS"); 
     u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]); 
     u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]); 
     u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]); 


     Sender sender = new Sender(); 

     sender.sendInformation(u1, "U1 notification"); 
     sender.sendInformation(u2, "U2 notification"); 
     sender.sendInformation(u3, "U3 notification"); 
     //PostIF post = new Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User) u1).getIdUser(),0); 
     //sender.sendPost(user, post); 
     sender.session.close(); 
     sender.connection.close(); 

    } 

} 

RECEIVER Класс:

package com.citizenweb.classes; 

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.ObjectMessage; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQSession; 
import org.apache.activemq.broker.region.Destination; 
import com.citizenweb.interfaces.PostIF; 
import com.citizenweb.interfaces.UserIF; 
import com.citizenweb.classes.Post; 

public class Receiver implements MessageListener, Serializable { 

    private static final long serialVersionUID = 1L; 
    private ActiveMQConnectionFactory factory = null; 
    private ActiveMQConnection connection = null; 
    private ActiveMQSession session = null; 
    private Topic destination = null; 
    private MessageConsumer consumer = null; 

    UserIF userTopic = new User(); 
    UserIF userSubscriber = new User(); 
    List<Message> listeMsg = new ArrayList<Message>(); 

    public Receiver(UserIF subscriber) { 
     this.userSubscriber = subscriber; 
    } 

    public void connect() { 
     try { 
      factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); 
      // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production 
      factory.setTrustAllPackages(true); 
      connection = (ActiveMQConnection) factory.createConnection(); 
      // ClientID : 
      // https://qnalist.com/questions/2068823/create-durable-topic-subscriber 
      connection.setClientID(userSubscriber.toString()); 
      connection.start(); 
      session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void receiveMessage(UserIF topic) { 
     try { 
      if (session == null) { 
       connect(); 
      } 
      destination = session.createTopic(topic.toString()); 
      String nomAbonnement = topic.toString() + "->" + userSubscriber.toString(); 
      //String nomAbonnement = userSubscriber.toString(); 
      consumer = session.createDurableSubscriber(destination, nomAbonnement); 
      consumer.setMessageListener(this); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void unsubscribe(UserIF topic) { 
     try { 
      if (session == null) { 
       connect(); 
      } 
      System.out.println("\n RECEIVER Désinscription du topic " + topic.toString()); 
      //consumer.close(); 
      String nomAbonnement = topic.toString() + "->" + userSubscriber.toString(); 
      //String nomAbonnement = userSubscriber.toString(); 
      System.out.println("\n RECEIVER Abonnement à clore = " + nomAbonnement); 
      session.unsubscribe(nomAbonnement); 
      System.out.println("\n RECEIVER " + userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void onMessage(Message message) { 
     System.out.println("\n RECEIVER OnMessage triggered for " + userSubscriber.toString()); 
     listeMsg.add(message); 
     System.out.println("\n RECEIVER Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size()); 
     String classe = message.getClass().getSimpleName(); 
     System.out.println("\n RECEIVER Classe de message : " + classe); 
     try { 
      if (message instanceof TextMessage) { 
       TextMessage text = (TextMessage) message; 
       System.out.println("\n RECEIVER Information : " + text.getText()); 
      } 
      if (message instanceof ObjectMessage) { 
       System.out.println("\n RECEIVER ObjectMessage"); 
       ObjectMessage oMessage = (ObjectMessage) message; 
       if (oMessage.getObject() instanceof PostIF) { 
        PostIF post = (PostIF) oMessage.getObject(); 
        String s = ((Post) post).getCorpsMessage(); 
        System.out.println("\n RECEIVER Post : " + s); 
       } 
      } 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    public static void main(String[] args) throws JMSException { 

     /* 
     * EACH USER IS A TOPIC FOR OTHER USERS 
     * WHATEVER A USER DOES RESULTS IN A NOTIFICATION TO SUBSCRIBERS 
     */ 

     //CREATE USER 
     UserIF u1, u2, u3; 
     String[] nom = new String[5]; 
     String[] prenom = new String[5]; 
     String[] login = new String[5]; 
     String[] password = new String[5]; 
     Date[] naiss = new Date[5]; 
     String[] mail = new String[5]; 
     for (int i = 0; i < 5; i++) { 
      nom[i] = "nom_" + i; 
      prenom[i] = "prenom_" + i; 
      login[i] = "login_" + i; 
      password[i] = "password_" + i; 
      naiss[i] = new Date(); 
      mail[i] = "mail_" + i; 
     } 

     u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]); 
     u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]); 
     u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]); 

     /* 
     * MAKE EACH USER A SUBSCRIBER 
     */ 
     Receiver receiver1 = new Receiver(u1); 
     Receiver receiver2 = new Receiver(u2); 
     Receiver receiver3 = new Receiver(u3); 

     /* 
     * PUT A MESSAGE LISTENER FOR EACH USER 
     */ 
     receiver1.receiveMessage(u2); 
     receiver1.receiveMessage(u3); 
     receiver2.receiveMessage(u1); 
     receiver2.receiveMessage(u3); 
     receiver3.receiveMessage(u1); 
     receiver3.receiveMessage(u2); 

     /* 
     * CALL THE SENDER CLASS TO SEND MESSAGES 
     */ 
     try { 
      Sender.main(args); 
     } catch (Exception e1) { 
      e1.printStackTrace(); 
     } 

     /* 
     * A SLEEP TO HAVE ENOUGH TIME TO LOOK AT THE ACTIVEMQ CONSOLE 
     * CAN BE REMOVE 
     */ 
     try { 
      Thread.sleep(10000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
      return; 
     } 

     /* 
     * UNSUBSCRIBE SUBSCRIBERS FROM TOPICS 
     */ 
     receiver1.unsubscribe(u2); 
     receiver1.unsubscribe(u3); 
     receiver2.unsubscribe(u1); 
     receiver2.unsubscribe(u3); 
     receiver3.unsubscribe(u1); 
     receiver3.unsubscribe(u2); 
    } 

} 

ответ

1

Вы можете только отказаться долговечный подписку, если нет активного абонент в настоящее время потребляет от него. Похоже, что ваш код создает несколько подписчиков и не останавливает потребителей, поэтому, конечно, отказ от подписки не удастся, если вы закроете потребителей, а затем отмените подписку, вы должны получить результат, который вы ищете.

Примером отсроченной подписки на подписку является here.

+0

Привет Тим ​​и спасибо за помощь. Когда вы говорите: «Вы можете отказаться от подписки на долговременную подписку, если в ней нет активного абонента, потребляющего сейчас», вы, кажется, говорите о ** теме **. Моя проблема заключается в том, что ** одна тема ** может иметь ** много прочных подписчиков **.Один из этих подписчиков может захотеть оспорить эту тему, и нет оснований останавливать подписку других подписчиков по той же теме. Я хочу прекратить подписку на тему, не убивая сама тема Вот что я хочу сделать с методом unsubscribe() моего класса приемника Невозможно? – Lovegiver

+0

Это не то, что я сказал, внимательно прочитал. Активная подписка на тему не может быть отписана, вам необходимо закрыть пользователя, который использует подписку, которую вы хотите отменить. –

+0

OK Tim , так что если я делаю session.unsubscribe (durableID), как показано в моем коде, это должно прекратить подписку на данного абонента, но только если я уже сделал user.close() раньше? Я пробовал это много, но всегда получал такое же сообщение об ошибке: долговечный потребитель используется Что мне делать больше: 'consumer.close();' 'session.unsubscribe (nomAbonnement);' – Lovegiver

1

Решение

Здравствуйте,

У меня есть решение, и его объяснение.

Для каждого подключения требуется уникальный идентификатор клиента: connection.setClientID ("clientID");

Моя ошибка заключалась в понимании этого единства для данного клиента.

Когда клиент подписывается на тему, для этой темы есть одно соединение. Таким образом, для данного клиента, подписанного на 3 темы (например), требуется 3 ClientID, потому что необходимы 3 соединения. ClientID должен быть уникальным, поскольку он идентифицирует одно соединение одного клиента для одной темы.

Вот почему у меня было так много JMSExceptions, в которых говорилось, что Durable Consumer был в использовании, когда я хотел закончить подписку.

Thanx всем, кто дал мне время и поддержку.

Смежные вопросы