2015-06-29 5 views
0

Я бегу этот бит в моей программе прямо сейчасКак подписаться на rxJava в другой пул потоков

Observable.from(constituents.entrySet()).subscribeOn(Schedulers.newThread()) 
     .flatMap(Async.toAsync((Map.Entry<String, ConstituentInfo> entry) -> { 
      logger.info(Thread.currentThread().getName()); 
      ConstituentInfo constituent = entry.getValue(); 

      String securityBySymbol = Redux.getSecurityBySymbol(entry.getKey()); 

      String company = UtilityMethods.getNestedJsonObject(securityBySymbol, "company"); 
      Integer compId = UtilityMethods.getIntegerFromJsonObject(company, "id"); 
      String companyName = UtilityMethods.getStringFromJsonObject(company, "name"); 
      String tier = UtilityMethods.getNestedJsonObject(securityBySymbol, "tier"); 
      String tierId = UtilityMethods.getStringFromJsonObject(tier, "id"); 
      String marketPlace = UtilityMethods.getStringFromJsonObject(tier, "name"); 
      String countryName = getCountryName(compId); 

      constituent.setCompanyName(StringUtils.isBlank(companyName) ? NA : companyName); 
      constituent.setMarketPlace(StringUtils.isBlank(marketPlace) ? NA : marketPlace); 
      constituent.setCountryName(StringUtils.isBlank(countryName) ? NA : countryName); 
      constituent.setTierId(StringUtils.isBlank(tierId) ? NA : tierId); 

      return constituent; 
     })).subscribeOn(Schedulers.newThread()) 
     .toList() 
     .timeout(30, TimeUnit.MINUTES) 
     .toBlocking() 
     .single(); 

и он работает одновременно, но он работает на RxComputationThreadPool. Мне было интересно, как запустить его в Schedulers.newThread(), и если это улучшит производительность.

В качестве альтернативы, если это не улучшит производительность, есть ли способ сделать код ниже, чем быстрее?

ответ

1

Существует перегрузка toAsync, которая принимает Scheduler, и вам не нужно subscribeOn. Планировщик computation() - это самый низкий планировщик задержек от всех. io(), вероятно, и newThread() уверенно запускает новый поток и, следовательно, может выполнить несколько сотен микросекунд для выполнения первой задачи, но они хорошо подходят для блокировки ввода-вывода или сетевых вызовов, где эта задержка не имеет большого значения.

Смежные вопросы