Я новичок в пакете исполнителей java. Я хочу делегировать ответственность за создание потока в другую задачу (класс) из spring rabbitmq (amqp). В настоящее время я создаю класс runnable для внутреннего уровня внутри асинхронного метода onMessage() для MesageListener для достижения параллелизма при наличии нескольких сообщений в очереди обмена сообщениями rabbitmq. Функциональность отлично работает для меня. Я хочу отделить форму исполняемого кода на методе сообщения сообщения (Message message). Ниже приведен код.Как делегировать исполняемый код из прослушивателя?
package com.xyz.forum.event.listener;
import com.xyz.forum.constant.ClassType;
import com.xyz.forum.domain.dto.impl.AnswerDto;
import com.xyz.forum.domain.dto.impl.PollDto;
import com.xyz.forum.domain.dto.impl.QuestionDto;
import com.xyz.forum.manager.PumpManager;
import com.xyz.forum.utils.XyzForumConsumerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* Created by bhupati on 11/3/16.
*/
public class XyzForumListener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(XyzForumListener.class);
private static final String CLASS_HEADER = "__TypeId__";
private Executor threadPool = Executors.newFixedThreadPool(10);
@Autowired @Qualifier("pollPump") PumpManager pollPumpManager;
@Autowired @Qualifier("questionPump") PumpManager questionPumpManager;
@Autowired @Qualifier("answerPump") PumpManager answerPumpManager;
@Override
public void onMessage(final Message message) {
Runnable runnable = new Runnable() {
@Override
public void run() {
handleMessage(message);
}
};
threadPool.execute(runnable);
}
private void handleMessage(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
String classType = (String) headers.get(CLASS_HEADER);
LOGGER.info("Got the message: " + classType);
switch (classType) {
case ClassType.QUESTION:
QuestionDto questionDto = XyzForumConsumerUtils.parseDto(message.getBody(), QuestionDto.class);
LOGGER.info("questionDto : " + questionDto.getBody());
questionPumpManager.executePumpService(questionDto, headers);
break;
case ClassType.POLL:
PollDto pollDto = XyzForumConsumerUtils.parseDto(message.getBody(), PollDto.class);
LOGGER.info("pollDto : " + pollDto.getBody());
pollPumpManager.executePumpService(pollDto, headers);
break;
case ClassType.ANSWER:
AnswerDto answerDto = XyzForumConsumerUtils.parseDto(message.getBody(), AnswerDto.class);
LOGGER.info("answerDto : " + answerDto.getBody());
answerPumpManager.executePumpService(answerDto, headers);
default: LOGGER.warn("Unknown Type");
break;
}
}
}
В целом есть стандартный шаблон для запуска, чтобы метод одного слушателя одновременно с использованием пакета java.util.concurrent.