0

Я новичок в пакете исполнителей java. Я хочу делегировать ответственность за создание потока в другую задачу (класс) из spring rabbitmq (amqp). В настоящее время я создаю класс runnable для внутреннего уровня внутри асинхронного метода onMessage() для MesageListener для достижения параллелизма при наличии нескольких сообщений в очереди обмена сообщениями rabbitmq. Функциональность отлично работает для меня. Я хочу отделить форму исполняемого кода на методе сообщения сообщения (Message message). Ниже приведен код.Как делегировать исполняемый код из прослушивателя?

package com.xyz.forum.event.listener; 

import com.xyz.forum.constant.ClassType; 
import com.xyz.forum.domain.dto.impl.AnswerDto; 
import com.xyz.forum.domain.dto.impl.PollDto; 
import com.xyz.forum.domain.dto.impl.QuestionDto; 
import com.xyz.forum.manager.PumpManager; 
import com.xyz.forum.utils.XyzForumConsumerUtils; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.amqp.core.Message; 
import org.springframework.amqp.core.MessageListener; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Qualifier; 

import java.util.Map; 
import java.util.concurrent.Executor; 
import java.util.concurrent.Executors; 

/** 
* Created by bhupati on 11/3/16. 
*/ 
public class XyzForumListener implements MessageListener { 

    private static final Logger LOGGER = LoggerFactory.getLogger(XyzForumListener.class); 
    private static final String CLASS_HEADER = "__TypeId__"; 

    private Executor threadPool = Executors.newFixedThreadPool(10); 

    @Autowired @Qualifier("pollPump") PumpManager pollPumpManager; 
    @Autowired @Qualifier("questionPump") PumpManager questionPumpManager; 
    @Autowired @Qualifier("answerPump") PumpManager answerPumpManager; 

    @Override 
    public void onMessage(final Message message) { 

     Runnable runnable = new Runnable() { 
      @Override 
      public void run() { 
       handleMessage(message); 
      } 
     }; 
     threadPool.execute(runnable); 
    } 

    private void handleMessage(Message message) { 
     Map<String, Object> headers = message.getMessageProperties().getHeaders(); 
     String classType = (String) headers.get(CLASS_HEADER); 
     LOGGER.info("Got the message: " + classType); 

     switch (classType) { 
      case ClassType.QUESTION: 
       QuestionDto questionDto = XyzForumConsumerUtils.parseDto(message.getBody(), QuestionDto.class); 
       LOGGER.info("questionDto : " + questionDto.getBody()); 
       questionPumpManager.executePumpService(questionDto, headers); 
       break; 

      case ClassType.POLL: 
       PollDto pollDto = XyzForumConsumerUtils.parseDto(message.getBody(), PollDto.class); 
       LOGGER.info("pollDto : " + pollDto.getBody()); 
       pollPumpManager.executePumpService(pollDto, headers); 
       break; 

      case ClassType.ANSWER: 
       AnswerDto answerDto = XyzForumConsumerUtils.parseDto(message.getBody(), AnswerDto.class); 
       LOGGER.info("answerDto : " + answerDto.getBody()); 
       answerPumpManager.executePumpService(answerDto, headers); 

      default: LOGGER.warn("Unknown Type"); 
       break; 

     } 

    } 

} 

В целом есть стандартный шаблон для запуска, чтобы метод одного слушателя одновременно с использованием пакета java.util.concurrent.

ответ

1

Поймите, что при передаче такого сообщения сообщение будет немедленно активировано и будет потеряно при сбое системы.

Обычно лучше просто увеличить параллелизм в контейнере слушателя и позволить контейнеру управлять потоковой обработкой.

Смежные вопросы