2016-09-11 1 views
0

Приоритеты сообщений в очередях в RabbitMQ. Он работает с rabbitmq, предоставленным java-клиентом. Но это не работает с зависимостью весеннего кролика. Пожалуйста, смотрите.Приоритетное сообщение Spring AMQP

  • RabbitMQ Server Version - 3.6.5
  • Erlang - OTP 19 (8,0)

Использование Client RabbitMQ Java
pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>org.springframework.samples</groupId> 
    <artifactId>RabbitMQ</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 

    <developers> 
     <developer> 
      <name>Sagar Rout</name> 
     </developer> 
    </developers> 

    <properties> 
     <!-- Generic properties --> 
     <java.version>1.8</java.version> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 

     <!-- Spring --> 
     <spring-framework.version>4.3.2.RELEASE</spring-framework.version> 
    </properties> 

    <dependencies> 
     <!-- Spring --> 
     <dependency> 
      <groupId>org.springframework</groupId> 
      <artifactId>spring-context</artifactId> 
      <version>${spring-framework.version}</version> 
     </dependency> 

     <!-- Spring AMQP --> 
     <dependency> 
      <groupId>org.springframework.amqp</groupId> 
      <artifactId>spring-rabbit</artifactId> 
      <version>1.6.1.RELEASE</version> 
     </dependency> 
    </dependencies> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.5.1</version> 
       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 

</project> 

Publisher. java

public class Publisher { 

private final static String QUEUE_NAME = "S1_Priority"; 

public static void main(String[] argv) throws Exception { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-max-priority", 10); 
    channel.queueDeclare(QUEUE_NAME, false, false, false, args); 
    String message = "Hello World!"; 

    for (int i = 0; i < 10; i++) { 
     channel.basicPublish("", QUEUE_NAME, 
       new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(i).build(), 
       message.getBytes("UTF-8")); 
     System.out.println(" [x] Sent '" + message + "'" + "priority" + i); 
    } 
    channel.close(); 
    connection.close(); 
}} 

Consumer.Java

public class Consumer { 

private final static String QUEUE_NAME = "S1_Priority"; 

public static void main(String[] argv) throws Exception { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-max-priority", 10); 
    channel.queueDeclare(QUEUE_NAME, false, false, false, args); 
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

    DefaultConsumer consumer = new DefaultConsumer(channel) { 
     @Override 
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
       byte[] body) throws IOException { 
      String message = new String(body, "UTF-8"); 
      System.out.println(" [x] Received '" + message + "'" + properties.getPriority()); 
     } 
    }; 
    channel.basicConsume(QUEUE_NAME, true, consumer); 
}} 

Это работает, и сообщение с более высоким приоритетом подходит. Но это не работает с Весенним кроликом. Пожалуйста, найдите код.
RabbitMQConfig.class

@Configuration 
@ComponentScan(basePackages = { "com.blackocean.*" }) 
@PropertySource("classpath:config.properties") 
public class RabbitMQConfig { 

@Value("${rabbitmq.host}") 
private String host; 

@Value("${rabbitmq.port}") 
private Integer port; 

@Value("${rabbitmq.username}") 
private String username; 

@Value("${rabbitmq.password}") 
private String password; 

@Value("${rabbitmq.connection.size}") 
private Integer connectionSize ; 

@Bean 
public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() { 
    return new PropertySourcesPlaceholderConfigurer(); 
} 

@Bean 
public ConnectionFactory connectionFactory() { 

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); 
    cachingConnectionFactory.setHost(host); 
    cachingConnectionFactory.setPort(port); 
    cachingConnectionFactory.setUsername(username); 
    cachingConnectionFactory.setPassword(password); 
    cachingConnectionFactory.setConnectionLimit(connectionSize); 

    return cachingConnectionFactory; 
} 

@Bean 
public RabbitAdmin rabbitAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    return new RabbitTemplate(connectionFactory()); 
} 

@Bean 
public Queue queue() { 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-priority", 10); 
    Queue queue = new Queue("myQueue", true, false, false, args) ; 
    return queue ; 
}} 

SendUsingJavaConfig

public class Send1UsingJavaConfig { 

/** 
* @param args 
*/ 
public static void main(String[] args) { 

    ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class); 
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); 

     rabbitTemplate.convertAndSend("", "myQueue", "Hi Mr.Ocean 10", new MessagePostProcessor() { 

      @Override 
      public Message postProcessMessage(Message message) throws AmqpException { 
       message.getMessageProperties().setPriority(9); 
       return message; 
      } 
     }); 
    } 
} 

ReceiveusingJavaConfig

public class RecvUsingJavaConfig { 

public static void main(String[] args) { 
    ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class); 
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); 

    // Basic Example 
    String message = (String) rabbitTemplate.receiveAndConvert("myQueue"); 
    System.out.println(message); 
}} 

Config.propertie s

#RabbitMQ 
rabbitmq.host=localhost 
#Always provide port and connection size in numbers 
rabbitmq.port=5672 
rabbitmq.username=guest 
rabbitmq.password=guest 
rabbitmq.connection.size=100 

Теперь я посылаю сообщение с различным приоритетом, но он всегда получает сообщение в порядке. Любое предложение будет отличным !!!

ответ

3

Просто догадайтесь, я попытался заглянуть в старую библиотеку AMQP, которую я использовал (очередь приоритетов в более ранней версии Rabbit MQ).

Приоритет был установлен, как показано ниже

args.put("x-max-priority", 10);, он выглядит немного отличается от args.put("x-priority", 10);.

Вы можете сослаться на старый priority queue repo по ссылке. Вы могли бы попытаться узнать, поможет ли это

+1

Да; в собственном коде у вас есть 'args.put (« x-max-priority », 10);' но в коде Spring вы неправильно используете args.put («x-priority», 10); ». –

+0

Спасибо вам обоим. Вчера я попробовал x-max-priority, это не сработало. Сегодня, когда я изменился, теперь он работает. Я не знаю, чего не было в то время. Спасибо чувак. :) – blackOcean

Смежные вопросы