2015-08-29 3 views
3

Я играю с Spring, RxJava и без блокировки обработки данных. В моих тестовых приложений, которые я хочу реализовать следующий тест работы потока:Неожиданное поведение async: @Async от Springs vs RxJava

  1. [RT] Получение запроса
  2. [RT] Запуск обработки асинхронно в рабочем потоке
  3. [WT] У некоторых (дорогих) инициализации работа
  4. [WT] Вызов удаленной системы асинхронно, чтобы получить значение
  5. [HT] Выполнить запрос к удаленной системе
  6. [HT] Форвард результат ответа на рабочий поток
  7. [WT] Есть ли более (дорогой) работать с результатом от удаленной системы
  8. [WT] Возвратить окончательный результат

RT: Запрос Thread (Tomcat NIO)

WT: Worker Thread (Threadpool с фиксированным размером 1 и размера очереди 5)

HT: Hystrix резьбы (Hystrix Threadpool с установкой по умолчанию)

(Это просто пример, чтобы SIMUL ели дорогую обработку данных в сочетании с зависимостью от удаленного ресурса)

У меня есть два варианта кода:

  1. Использование @Async для вызова WT (этап 2) и Rx в Observable-х для остальное (http://localhost:9001/value)
  2. только с помощью наблюдаемые Rx (в http://localhost:9001/value-rx)

(http://localhost:9002/value является удаленный ресурс)

Вариант 2 работает очень хорошо, но вариант 1 (с @Async) сталкивается с некоторыми проблемами. Анализируя исключения, потоки дампов, состояния пула потоков и файлы журналов, он выглядит так: ListenableFuture (возвращаемый методом службы @Async на шаге 2) блокирует поток пула бесконечно, сам поток WAITING. Таким образом, RxJava не может запускать код обратных вызовов по желанию в данном пуле потоков (шаг 6). через 30 секунд возникает исключение и весь процесс завершается с ошибкой, поскольку пул потоков по-прежнему заблокирован, я не понимаю, почему.

Если я использую вариант 1 несколько раз, второй (и все последующие запросы) не работает на шаге 2 (вместо 6), поскольку пул потоков (размер = 1) по-прежнему блокируется ListenableFuture (трассировка стека ниже).

Вариант 2 способен обрабатывать несколько запросов «в одно и то же время» без проблем, пока очередь не будет заполнена даже с одним потоком запроса и одним рабочим потоком.

  • В обоих случаях я использую модифицированную версию this на карту экземпляра Observable к ListenableFuture.
  • Я добавил дополнительные записи в контроллер и классы обслуживания. Это облегчает просмотр, в каком потоке выполняются части кода.

Почему @Async вызывает это и как я могу это исправить?

Вот код:

App1Controller

@Slf4j 
@RestController 
public class App1Controller { 

    @Autowired 
    private App1Service app1Service; 

    @ResponseBody 
    @RequestMapping("/value") 
    public ListenableFuture<String> value() { 
     final ListenableFuture<String> future; 
     log.info("before invoke 'app1Service'"); 
     future = this.app1Service.value(); 
     log.info("after invoke 'app1Service'"); 
     return future; 
    } 

    @ResponseBody 
    @RequestMapping("/value-rx") 
    public ListenableFuture<String> valueRx() { 
     final Observable<String> observable; 

     log.info("before invoke 'app1Service'"); 
     observable = this.app1Service.valueRx(); 
     log.info("after invoke 'app1Service'"); 

     return new ObservableListenableFuture<>(observable); 
    } 
} 

App1Service

@Slf4j 
@Service 
public class App1Service { 

    @Autowired 
    private TaskExecutor taskExecutor; 

    @Autowired 
    private App2Service app2Service; 

    @Async 
    public ListenableFuture<String> value() { 
     final ListenableFuture<String> future; 

     log.info("before start processing"); 
     this.doSomeStuff(); 
     future = new ObservableListenableFuture<>(this.valueFromApp2Service()); 
     log.info("after start processing"); 

     return future; 
    } 

    public Observable<String> valueRx() { 
     final Observable<String> observable; 

     log.info("before start processing"); 

     observable = Observable.<String>create(s -> { 
      this.doSomeStuff(); 
      this.valueFromApp2Service().subscribe(
        result -> { 
         log.info("next (processing)"); 
         s.onNext(result); 
        }, 
        throwable -> { 
         log.info("error (processing)"); 
         s.onError(throwable); 
        }, 
        () -> { 
         log.info("completed (processing)"); 
         s.onCompleted(); 
        }); 
     }).subscribeOn(Schedulers.from(this.taskExecutor)); 

     log.info("after start processing"); 

     return observable; 
    } 

    private Observable<String> valueFromApp2Service() { 
     final AsyncSubject<String> asyncSubject; 

     log.info("before invoke 'app2Service'"); 

     asyncSubject = AsyncSubject.create(); 
     this.app2Service.value().observeOn(Schedulers.from(this.taskExecutor)).subscribe(
       result -> { 
        log.info("next (from 'app2Service')"); 
        asyncSubject.onNext(this.doSomeMoreStuff(result)); 
       }, throwable -> { 
        log.info("error (from 'app2Service')"); 
        asyncSubject.onError(throwable); 
       },() -> { 
        log.info("completed (from 'app2Service')"); 
        asyncSubject.onCompleted(); 
       }); 

     log.info("after invoke 'app2Service'"); 

     return asyncSubject; 
    } 

    private void doSomeStuff() { 
     log.info("do some expensive stuff"); 
     this.sleep(1000); 
     log.info("finish some expensive stuff"); 
    } 

    private String doSomeMoreStuff(final String valueFromRemote) { 
     log.info("do some more expensive stuff with '{}'", valueFromRemote); 
     this.sleep(2000); 
     log.info("finish some more expensive stuff with '{}'", valueFromRemote); 
     return "MODIFIED " + valueFromRemote; 
    } 

    private void sleep(final long milliSeconds) { 
     try { 
      Thread.sleep(milliSeconds); 
     } catch (final InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

App2Service

@Slf4j 
@Service 
public class App2Service { 

    @HystrixCommand(commandKey = "app2") 
    public Observable<String> value() { 
     Observable<String> observable; 

     log.info("before invoke remote service"); 

     observable = new ObservableResult<String>() { 

      @Override 
      public String invoke() { 
       log.info("invoke"); 
       return new RestTemplate().getForEntity("http://localhost:9002/value", String.class).getBody(); 
      } 

     }; 

     log.info("after invoke remote service"); 

     return observable; 
    } 
} 

Конфигурация

Применение (основной класс/конфигурации):

@EnableCircuitBreaker 
@SpringBootApplication 
public class Application { 

    public static void main(final String[] args) { 
     SpringApplication.run(Application.class, args); 
    } 

    @Configuration 
    @EnableAsync 
    public static class AsyncConfiguration { 

     @Bean 
     public TaskExecutor taskExecutor() { 
      final ThreadPoolTaskExecutor taskExecutor; 

      taskExecutor = new ThreadPoolTaskExecutor(); 
      taskExecutor.setCorePoolSize(1); 
      taskExecutor.setMaxPoolSize(1); 
      taskExecutor.setQueueCapacity(5); 
      taskExecutor.setThreadNamePrefix("worker-"); 

      return taskExecutor; 
     } 
    } 
} 

application.properties:

server.port=9001 
server.tomcat.max-threads=1 
hystrix.command.app2.fallback.enabled=false 
hystrix.command.app2.execution.isolation.thread.timeoutInMilliseconds=15000 

Вход Выход варианта 1 (первого звонка)

16:06:31.871 [nio-9001-exec-1] before invoke 'app1Service' 
16:06:31.879 [nio-9001-exec-1] after invoke 'app1Service' 
16:06:31.887 [  worker-1] before start processing 
16:06:31.888 [  worker-1] do some expensive stuff 
16:06:32.890 [  worker-1] finish some expensive stuff 
16:06:32.891 [  worker-1] before invoke 'app2Service' 
16:06:33.135 [x-App2Service-1] before invoke remote service 
16:06:33.136 [x-App2Service-1] after invoke remote service 
16:06:33.137 [x-App2Service-1] invoke 
16:06:33.167 [  worker-1] after invoke 'app2Service' 
16:06:33.172 [  worker-1] after start processing 
16:07:02.816 [nio-9001-exec-1] Exception Processing ErrorPage[errorCode=0, location=/error] 

java.lang.IllegalStateException: Cannot forward after response has been committed 
    at org.apache.catalina.core.ApplicationDispatcher.doForward(ApplicationDispatcher.java:328) 
    at org.apache.catalina.core.ApplicationDispatcher.forward(ApplicationDispatcher.java:318) 
    at org.apache.catalina.core.StandardHostValve.custom(StandardHostValve.java:439) 
    at org.apache.catalina.core.StandardHostValve.status(StandardHostValve.java:305) 
    at org.apache.catalina.core.StandardHostValve.throwable(StandardHostValve.java:399) 
    at org.apache.catalina.core.AsyncContextImpl.setErrorState(AsyncContextImpl.java:438) 
    at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:291) 
    at org.apache.coyote.http11.AbstractHttp11Processor.asyncDispatch(AbstractHttp11Processor.java:1709) 
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:649) 
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1521) 
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1478) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) 
    at java.lang.Thread.run(Thread.java:745) 

Вход Выход варианта 2 (первый вызов)

16:07:54.465 [nio-9001-exec-1] before invoke 'app1Service' 
16:07:54.472 [nio-9001-exec-1] before start processing 
16:07:54.500 [nio-9001-exec-1] after start processing 
16:07:54.500 [nio-9001-exec-1] after invoke 'app1Service' 
16:07:54.517 [  worker-1] do some expensive stuff 
16:07:55.522 [  worker-1] finish some expensive stuff 
16:07:55.522 [  worker-1] before invoke 'app2Service' 
16:07:55.684 [x-App2Service-1] before invoke remote service 
16:07:55.685 [x-App2Service-1] after invoke remote service 
16:07:55.686 [x-App2Service-1] invoke 
16:07:55.717 [  worker-1] after invoke 'app2Service' 
16:08:05.786 [  worker-1] next (from 'app2Service') 
16:08:05.786 [  worker-1] do some more expensive stuff with 'value from app2 service' 
16:08:07.791 [  worker-1] finish some more expensive stuff with 'value from app2 service' 
16:08:07.791 [  worker-1] completed (from 'app2Service') 
16:08:07.791 [  worker-1] next (processing) 
16:08:07.792 [  worker-1] completed (processing) 

дампа Нить для WT (после использования варианта 1)

"worker-1" #24 prio=5 os_prio=31 tid=0x00007fe2be8cf000 nid=0x5e03 waiting on condition [0x000000] 
    java.lang.Thread.State: WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <0x00000006c0d68fb0> (a org.springframework.util.concurrent.ListenableFutureTask) 
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) 
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) 
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:122) 
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:110) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

    Locked ownable synchronizers: 
    - <0x00000006c0d68170> (a java.util.concurrent.ThreadPoolExecutor$Worker) 

дамп темы для WT (после использования вариант 2)

"worker-1" #24 prio=5 os_prio=31 tid=0x00007fc6136dd800 nid=0x5207 waiting on condition [0x000000012d638000] 
    java.lang.Thread.State: WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <0x00000006c02f5388> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) 
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

    Locked ownable synchronizers: 
    - None 

РЕШЕНИЕ

Асинхронный Intercepter использует простой Future и не может справиться с ListenableFuture. После того, как я снова просмотрю свалку потоков, я заметил, что FutureTask.get получил. Это блокирующий вызов. Это означает, что вариант 1 является встроенным тупиком при использовании всего лишь одной нити.

Этот код работает:

Контроллер

@ResponseBody 
@RequestMapping("/value") 
public ListenableFuture<String> value() { 
    final SettableListenableFuture<String> future; 
    this.app1Service.value(future); 
    return future; 
} 

Сервис

@Async 
public void value(final SettableListenableFuture<String> future) { 
    this.doSomeStuff(); 
    this.valueFromApp2Service().subscribe(future::set, future::setException); 
} 

ответ

0

Я думаю, вы уже ответили на свой вопрос (или я что-то отсутствует):

Если я использую вариант 1 несколько раз , второй (и все последующие запросы ) не работают на шаге 2 (вместо 6), поскольку пул (размер = 1) по-прежнему заблокирован трассировкой ListenableFuture (стек ниже).

Ваш TaskExecutor имеет только один поток доступен, который используется для @Async. Затем из этого потока, вы хотите использовать TaskExecutor снова для Observable:

this.app2Service.value().observeOn(Schedulers.from(this.taskExecutor)).subscribe(

, но нет больше потоков доступно на вашем бассейне. Если вы увеличьте coreSize или определите другой TaskExecutor для материала RxJava, он должен работать.

EDIT

Если вы действительно необходимо выполнить app1Service.value() асинхронно, вы можете удалить @Asynch из него, и представить задачу в явном виде в taskExecutor, так что вы можете добавить функцию обратного вызова в ListenableFuture , Если изменить тип результата к DeferredResult, вы можете установить результат этого, когда обратного вызова из ListenableFuture запускается на выполнение:

@Autowired 
private TaskExecutor taskExecutor; 

@ResponseBody 
@RequestMapping("/value") 
public DeferredResult<String> value() { 

    final DeferredResult<String> dr = new DeferredResult<String>(); 
    taskExecutor.execute(() -> { 
     final ListenableFuture<String> future = app1Service.value(); 
     future.addCallback(new ListenableFutureCallback<String>() { 

      @Override 
      public void onSuccess(String result) { 
       dr.setResult(result); 
      } 

      @Override 
      public void onFailure(Throwable ex) { 
       dr.setErrorResult(ex); 
      } 
     }); 
    }); 
    return dr; 
} 

Или, конечно, использовать ваше решение, что лучше.

+0

Я также описал, что вариант 2 работает в тех же условиях, а также может обрабатывать больше запросов одновременно. Простое решение: я не могу использовать ListenableFuture как возвращаемый тип метода @Async. Spring просто использует функциональность Future и вызывает блокирующий метод get. – baymon

+0

Право. Spring вызывает 'future.get()' в 'AsyncExecutionInterceptor', потому что это единственное, что он может сделать, у него просто нет другого способа получить результат вычисления' app1Service.value() 'для отправки запроса , На самом деле, я думаю, вам вообще не нужно '@ Async', поскольку вы уже используете RxJava. – Ruben

Смежные вопросы