2016-01-25 4 views
13

У меня была догадка, что для высоко вычислительных, параллельных задач в RxJava традиционный ExecutorService будет быстрее, чем Scheduler.RxJava - Планировщики против ExecutorService?

У меня была теория, что этот код

Observable<MyItem> source = ... 

source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.computation())) 
.subscribe(); 

будет работать медленнее, чем это

final ExecutorService svc = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); 
Observable<MyItem> source = ... 

source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.from(svc))) 
.finallyDo(svc::shutdown) 
.subscribe(); 

Я сравнил эти два подхода с типичным параллельным процессом я делаю на работе, и я получил следующее Результаты.

EXECUTOR 

START: 2016-01-25T09:47:04.350 
END: 2016-01-25T09:48:37.181 
TOTAL TIME (SEC): 92 


COMPUTATION SCHEDULER 

START: 2016-01-25T09:50:37.799 
END: 2016-01-25T09:54:23.674 
TOTAL TIME (SEC): 225 

Так что мое грубое тестирование показало традиционный ExecutorService гораздо быстрее, чем Scheduler для вычислений.

Есть ли причина для этих результатов? Планировщики RxJava просто не оптимизированы для распараллеливания? У меня сложилось впечатление, что планировщики вычислений используют меньше потоков, чем Исполнители.

+0

В любом случае, он не обрабатывает элементы параллельно. Если вы хотите выполнить параллельное выполнение, вы должны проверить экспериментальную библиотеку [RxJavaParallel] (https://github.com/ReactiveX/RxJavaParallel). –

+0

Вопрос о том, почему 'ExecutorService' быстрее, чем' Schedulers.computation() ', по-прежнему остается хорошим. Я не могу ответить на это. –

+0

Я думаю, что проект был в подвешенном состоянии в течение года, главным образом потому, что разработчики RxJava не могут уделять первоочередное внимание своему времени на это прямо сейчас. – tmn

ответ

7

Я сделал несколько тестов и обнаружил, что создание собственного ExecutorService может фактически увеличить производительность параллелизации. I wrote a blog post on it here.

2

Когда вы используете Schedulers.computation(), все события обрабатываются в том же потоке. Вы можете обратиться к исходному коду CachedThreadScheduler.java и NewThreadWorker.java. Преимущество этой реализации заключается в том, что если eventA выдается после события B, то eventA будет обрабатываться после событияB.

Когда вы используете Schedulers.from(), события обрабатываются в разных потоках.

Смежные вопросы