Я играю с Spring, RxJava и без блокировки обработки данных. В моих тестовых приложений, которые я хочу реализовать следующий тест работы потока:Неожиданное поведение async: @Async от Springs vs RxJava
- [RT] Получение запроса
- [RT] Запуск обработки асинхронно в рабочем потоке
- [WT] У некоторых (дорогих) инициализации работа
- [WT] Вызов удаленной системы асинхронно, чтобы получить значение
- [HT] Выполнить запрос к удаленной системе
- [HT] Форвард результат ответа на рабочий поток
- [WT] Есть ли более (дорогой) работать с результатом от удаленной системы
- [WT] Возвратить окончательный результат
RT: Запрос Thread (Tomcat NIO)
WT: Worker Thread (Threadpool с фиксированным размером 1 и размера очереди 5)
HT: Hystrix резьбы (Hystrix Threadpool с установкой по умолчанию)
(Это просто пример, чтобы SIMUL ели дорогую обработку данных в сочетании с зависимостью от удаленного ресурса)
У меня есть два варианта кода:
- Использование
@Async
для вызова WT (этап 2) и Rx вObservable
-х для остальное (http://localhost:9001/value
) - только с помощью наблюдаемые Rx (в
http://localhost:9001/value-rx
)
(http://localhost:9002/value
является удаленный ресурс)
Вариант 2 работает очень хорошо, но вариант 1 (с @Async
) сталкивается с некоторыми проблемами. Анализируя исключения, потоки дампов, состояния пула потоков и файлы журналов, он выглядит так: ListenableFuture
(возвращаемый методом службы @Async
на шаге 2) блокирует поток пула бесконечно, сам поток WAITING. Таким образом, RxJava не может запускать код обратных вызовов по желанию в данном пуле потоков (шаг 6). через 30 секунд возникает исключение и весь процесс завершается с ошибкой, поскольку пул потоков по-прежнему заблокирован, я не понимаю, почему.
Если я использую вариант 1 несколько раз, второй (и все последующие запросы) не работает на шаге 2 (вместо 6), поскольку пул потоков (размер = 1) по-прежнему блокируется ListenableFuture
(трассировка стека ниже).
Вариант 2 способен обрабатывать несколько запросов «в одно и то же время» без проблем, пока очередь не будет заполнена даже с одним потоком запроса и одним рабочим потоком.
- В обоих случаях я использую модифицированную версию this на карту экземпляра
Observable
кListenableFuture
. - Я добавил дополнительные записи в контроллер и классы обслуживания. Это облегчает просмотр, в каком потоке выполняются части кода.
Почему @Async
вызывает это и как я могу это исправить?
Вот код:
App1Controller
@Slf4j
@RestController
public class App1Controller {
@Autowired
private App1Service app1Service;
@ResponseBody
@RequestMapping("/value")
public ListenableFuture<String> value() {
final ListenableFuture<String> future;
log.info("before invoke 'app1Service'");
future = this.app1Service.value();
log.info("after invoke 'app1Service'");
return future;
}
@ResponseBody
@RequestMapping("/value-rx")
public ListenableFuture<String> valueRx() {
final Observable<String> observable;
log.info("before invoke 'app1Service'");
observable = this.app1Service.valueRx();
log.info("after invoke 'app1Service'");
return new ObservableListenableFuture<>(observable);
}
}
App1Service
@Slf4j
@Service
public class App1Service {
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private App2Service app2Service;
@Async
public ListenableFuture<String> value() {
final ListenableFuture<String> future;
log.info("before start processing");
this.doSomeStuff();
future = new ObservableListenableFuture<>(this.valueFromApp2Service());
log.info("after start processing");
return future;
}
public Observable<String> valueRx() {
final Observable<String> observable;
log.info("before start processing");
observable = Observable.<String>create(s -> {
this.doSomeStuff();
this.valueFromApp2Service().subscribe(
result -> {
log.info("next (processing)");
s.onNext(result);
},
throwable -> {
log.info("error (processing)");
s.onError(throwable);
},
() -> {
log.info("completed (processing)");
s.onCompleted();
});
}).subscribeOn(Schedulers.from(this.taskExecutor));
log.info("after start processing");
return observable;
}
private Observable<String> valueFromApp2Service() {
final AsyncSubject<String> asyncSubject;
log.info("before invoke 'app2Service'");
asyncSubject = AsyncSubject.create();
this.app2Service.value().observeOn(Schedulers.from(this.taskExecutor)).subscribe(
result -> {
log.info("next (from 'app2Service')");
asyncSubject.onNext(this.doSomeMoreStuff(result));
}, throwable -> {
log.info("error (from 'app2Service')");
asyncSubject.onError(throwable);
},() -> {
log.info("completed (from 'app2Service')");
asyncSubject.onCompleted();
});
log.info("after invoke 'app2Service'");
return asyncSubject;
}
private void doSomeStuff() {
log.info("do some expensive stuff");
this.sleep(1000);
log.info("finish some expensive stuff");
}
private String doSomeMoreStuff(final String valueFromRemote) {
log.info("do some more expensive stuff with '{}'", valueFromRemote);
this.sleep(2000);
log.info("finish some more expensive stuff with '{}'", valueFromRemote);
return "MODIFIED " + valueFromRemote;
}
private void sleep(final long milliSeconds) {
try {
Thread.sleep(milliSeconds);
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
}
App2Service
@Slf4j
@Service
public class App2Service {
@HystrixCommand(commandKey = "app2")
public Observable<String> value() {
Observable<String> observable;
log.info("before invoke remote service");
observable = new ObservableResult<String>() {
@Override
public String invoke() {
log.info("invoke");
return new RestTemplate().getForEntity("http://localhost:9002/value", String.class).getBody();
}
};
log.info("after invoke remote service");
return observable;
}
}
Конфигурация
Применение (основной класс/конфигурации):
@EnableCircuitBreaker
@SpringBootApplication
public class Application {
public static void main(final String[] args) {
SpringApplication.run(Application.class, args);
}
@Configuration
@EnableAsync
public static class AsyncConfiguration {
@Bean
public TaskExecutor taskExecutor() {
final ThreadPoolTaskExecutor taskExecutor;
taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1);
taskExecutor.setMaxPoolSize(1);
taskExecutor.setQueueCapacity(5);
taskExecutor.setThreadNamePrefix("worker-");
return taskExecutor;
}
}
}
application.properties:
server.port=9001
server.tomcat.max-threads=1
hystrix.command.app2.fallback.enabled=false
hystrix.command.app2.execution.isolation.thread.timeoutInMilliseconds=15000
Вход Выход варианта 1 (первого звонка)
16:06:31.871 [nio-9001-exec-1] before invoke 'app1Service'
16:06:31.879 [nio-9001-exec-1] after invoke 'app1Service'
16:06:31.887 [ worker-1] before start processing
16:06:31.888 [ worker-1] do some expensive stuff
16:06:32.890 [ worker-1] finish some expensive stuff
16:06:32.891 [ worker-1] before invoke 'app2Service'
16:06:33.135 [x-App2Service-1] before invoke remote service
16:06:33.136 [x-App2Service-1] after invoke remote service
16:06:33.137 [x-App2Service-1] invoke
16:06:33.167 [ worker-1] after invoke 'app2Service'
16:06:33.172 [ worker-1] after start processing
16:07:02.816 [nio-9001-exec-1] Exception Processing ErrorPage[errorCode=0, location=/error]
java.lang.IllegalStateException: Cannot forward after response has been committed
at org.apache.catalina.core.ApplicationDispatcher.doForward(ApplicationDispatcher.java:328)
at org.apache.catalina.core.ApplicationDispatcher.forward(ApplicationDispatcher.java:318)
at org.apache.catalina.core.StandardHostValve.custom(StandardHostValve.java:439)
at org.apache.catalina.core.StandardHostValve.status(StandardHostValve.java:305)
at org.apache.catalina.core.StandardHostValve.throwable(StandardHostValve.java:399)
at org.apache.catalina.core.AsyncContextImpl.setErrorState(AsyncContextImpl.java:438)
at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:291)
at org.apache.coyote.http11.AbstractHttp11Processor.asyncDispatch(AbstractHttp11Processor.java:1709)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:649)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1521)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1478)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745)
Вход Выход варианта 2 (первый вызов)
16:07:54.465 [nio-9001-exec-1] before invoke 'app1Service'
16:07:54.472 [nio-9001-exec-1] before start processing
16:07:54.500 [nio-9001-exec-1] after start processing
16:07:54.500 [nio-9001-exec-1] after invoke 'app1Service'
16:07:54.517 [ worker-1] do some expensive stuff
16:07:55.522 [ worker-1] finish some expensive stuff
16:07:55.522 [ worker-1] before invoke 'app2Service'
16:07:55.684 [x-App2Service-1] before invoke remote service
16:07:55.685 [x-App2Service-1] after invoke remote service
16:07:55.686 [x-App2Service-1] invoke
16:07:55.717 [ worker-1] after invoke 'app2Service'
16:08:05.786 [ worker-1] next (from 'app2Service')
16:08:05.786 [ worker-1] do some more expensive stuff with 'value from app2 service'
16:08:07.791 [ worker-1] finish some more expensive stuff with 'value from app2 service'
16:08:07.791 [ worker-1] completed (from 'app2Service')
16:08:07.791 [ worker-1] next (processing)
16:08:07.792 [ worker-1] completed (processing)
дампа Нить для WT (после использования варианта 1)
"worker-1" #24 prio=5 os_prio=31 tid=0x00007fe2be8cf000 nid=0x5e03 waiting on condition [0x000000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c0d68fb0> (a org.springframework.util.concurrent.ListenableFutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:122)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:110)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- <0x00000006c0d68170> (a java.util.concurrent.ThreadPoolExecutor$Worker)
дамп темы для WT (после использования вариант 2)
"worker-1" #24 prio=5 os_prio=31 tid=0x00007fc6136dd800 nid=0x5207 waiting on condition [0x000000012d638000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c02f5388> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
РЕШЕНИЕ
Асинхронный Intercepter использует простой Future
и не может справиться с ListenableFuture
. После того, как я снова просмотрю свалку потоков, я заметил, что FutureTask.get
получил. Это блокирующий вызов. Это означает, что вариант 1 является встроенным тупиком при использовании всего лишь одной нити.
Этот код работает:
Контроллер
@ResponseBody
@RequestMapping("/value")
public ListenableFuture<String> value() {
final SettableListenableFuture<String> future;
this.app1Service.value(future);
return future;
}
Сервис
@Async
public void value(final SettableListenableFuture<String> future) {
this.doSomeStuff();
this.valueFromApp2Service().subscribe(future::set, future::setException);
}
Я также описал, что вариант 2 работает в тех же условиях, а также может обрабатывать больше запросов одновременно. Простое решение: я не могу использовать ListenableFuture как возвращаемый тип метода @Async. Spring просто использует функциональность Future и вызывает блокирующий метод get. – baymon
Право. Spring вызывает 'future.get()' в 'AsyncExecutionInterceptor', потому что это единственное, что он может сделать, у него просто нет другого способа получить результат вычисления' app1Service.value() 'для отправки запроса , На самом деле, я думаю, вам вообще не нужно '@ Async', поскольку вы уже используете RxJava. – Ruben