2015-04-29 2 views
1

У меня есть проблемы со следующим кодом:RxJava - Non детерминированного поведение при вызове Наблюдаемых метод

public void foo(List<Player> players_list) { 

    JsonArray playersArr = new JsonArray(); 

    rx.Observable.from(players_list) // assume players_list is a list contains 2 players 
    .concatMap(player -> { 

    return getUser(player.getUserId()) // get entity 'User' from DB 
    .flatMap(userObj -> { 
     User user = ...; 

     playersArr.add(new JsonObject() 
      .putString("uid", player.getExtUserId()) 
     ); 

      return rx.Observable.just(playersArr); 
     }); 
    }).subscribe(playersObj -> { 

    }, (exception) -> { 
     log.error("", exception); 

    },() -> { 

     // at this point I expect 'playersArr' to consist 2 entries! 
    }); 
} 

при выполнении этого кода, кажется, что его выход не является детерминированным, то есть - в большинстве случаев я получаю действительные результаты из 2 записей в JsonArray, но иногда я получаю 1 запись.

Я пытаюсь выяснить почему?

EDIT:
Я попытался переход от .flatMap -> .concatMap и, кажется, чтобы решить эту проблему, но им не уверен, что это действительно хорошее решение.

ответ

2

Я буду считать, что playerArrs не является потокобезопасным, если ваш Observable выполняется в асинхронном контексте, возможно, playerArrs может пропустить дополнительный вызов.

У вашего Observable есть побочный эффект, вы обновили объект, который существует за пределами Observable. Чтобы избежать этого, вы можете обновить свой код таким образом, что ваш Observable будет строить JsonArray

public void foo(List<Player> players_list) { 

    rx.Observable.from(players_list) // assume players_list is a list contains 2 players 
       .concatMap(player -> getUser(player.getUserId()) // get entity 'User' from DB 
       .map(userObj -> new JsonObject().putString("uid", player.getExtUserId()) 
       .reduce(new JsonArray(), (seed, acu) -> seed.add(acu))) // build the JsonArray 
       .subscribe(playersObj -> { }, 
          (exception) -> { log.error("", exception); }, 
         () -> { 
    // at this point I expect 'playersArr' to consist 2 entries! 
       }); 
} 

я не уверен в своем коде, но это должно быть близко к тому, что вам нужно, я думаю.

+0

Я запускаю асинхронную среду с одним основным потоком. поэтому безопасность потоков не является проблемой, пока методы RxJava, которые я вызываю, не открывают новые потоки. Я довольно новый для реактивного программирования, поэтому я попробую ваш код и дам вам знать, работает ли он на меня. Спасибо! – Shvalb

+0

Я глубже посмотрел на ваш код, и я не могу понять, как в конце концов я получу ссылку на JsonArray в подписке? Мне нужно использовать его только при завершении работы над всеми элементами в «players_list». – Shvalb

+0

.map() не может получить доступ к экземпляру «игрока» в вашем примере кода. – Shvalb

Смежные вопросы