2016-10-10 4 views
0

Я пробовал rxJava в нашей базе кода, в основном, чтобы повысить производительность параллелизма. Однако при использовании rxJava возникают проблемы с накладными расходами/стартовыми затратами. В приведенном ниже примере в «doRx()» требуется ~ 130 мс, прежде чем getAllElements() будет запущен, а в «doOld» потребуется 0 мс до запуска getAllElements(). Любое объяснение тому, почему я теряю 130 мс изначально в doRx()?Накладные расходы в rxJava

Это журнал, который я использую, используя System.currentTimeMillis(). Время() истекает через init().

Существующие реализации

  • (0) 2016-10-11T13: 34: 07,060: OldImpl: INIT()
  • (0) 2016-10-11T13: 34: 07,060: OldImpl: Call getAllElements()
  • (327) 2016-10-11T13: 34: 07,387: OldImpl: Received getAllElements()

реализации RX

  • (0) 2016-10-11T13: 34: 07,703: RxImpl: INIT()
  • () 2016-10-11T13: 34: 07,863: RxImpl: Call getAllElements()
  • (392) 2016-10-11T13: 34: 08,095: RxImpl: Received getAllElements()

Смысл кода является то, что я сначала хочу, чтобы собрать все элементы, а затем запустить их параллельно (при h2), так как это то, где мы можем сэкономить время, поскольку есть много внутренних запросов. Я использовал this blog в качестве руководства для этой настройки.

public List<Element> doRx() { 

    List<Element> elements = new ArrayList<>(); 

    Observable 
      .from(getAllElements()) 
      .flatMap(
        s -> Observable 
          .just(Element::new) 
          .subscribeOn(Schedulers.io()) 
          .flatMap(
            e -> { 

             List<Element> elements = new ArrayList<>(); 

             for (SubElement se : e.getSubElements()) { 

              elements.add(se); 

             } 

             return Observable.from(elements); 
            } 
          ) 
      ) 
      .flatMap(

        h1 -> Observable 
          .just(h1) 
          .subscribeOn(Schedulers.computation()) 
          .flatMap(
            h2 -> { 

             // Do additional things in parallell on all elements 

             return Observable 
               .just(h2); 
            } 
          ) 
      ) 
      .toBlocking() 
      .getIterator() 
      .forEachRemaining(myList::add); 

    return elements; 
} 


public List<Element> doOld() { 

    List<Element> elements = getAllElements(); 

    for (Element e : elements) { 
     // Do stuff, same as under h2 
    } 

    return elements; 
} 
+0

Использовал ли старый _sequential_ код более 1 потока? – miensol

+0

Нет, только одна тема. Последовательный был слабым выбором слова. Синхронный был тем, что я имел в виду. getElements() - первый шаг выполнения здесь. – user1682170

+0

Вы понимаете, что h2 представляет собой однопоточную базу данных? –

ответ

1

Если я вас понимаю код правильно, это эквивалентно следующему:

public List<Element> doRx() { 
    return Observable 
     .from(getAllElements()) 
     .flatMap(element -> Observable 
      .just(new Element(element)) 
      .subscribeOn(Schedulers.io()) 
      .flatMaplIterable(e -> e.getSubElements()) 
     ) 
     .observeOn(Schedulers.computation()) 
     .doOnNext(element -> { 
      // Do additional things in parallell on all elements 
     }) 
     .toList() 
     .toBlocking() 
     .single(); 
} 

Это при минимальном 2 переключений контекста в элемент более последовательной версии. Как вы делаете свои тайминги? X работает, игнорирует самые большие и наименьшие числа?

+0

Кажется, это эквивалент, да. Перепишите мой код, чтобы он соответствовал этому, и он работал, но никаких изменений в таймингах. – user1682170

+0

Я не выполнял никаких тщательных испытаний нагрузки. Я просто проверил это несколько раз. Ran это 15 раз, кажется, что те же 100-150 мс «потеряли» в вызове getElements() – user1682170

+0

Когда вы говорите «Ran», вы имеете в виду, что вы запускали 'java -jar ...' 15 раз? Или ваш код имеет время (() -> doRx); '15 раз один за другим? –