2015-03-28 5 views
1

Я пытаюсь использовать модификацию с rxjava. У меня проблемы с перестройкой наблюдаемых друг с другом или с наблюдаемыми, созданными мной. Пример:Наблюдение за сеткой дооснащения

Observable<List<Friend>> friendsListObservable = friendsService.getFriends(); 
    Observable<Void> updateReqestObservable = friendsListObservable.switchMap(friends -> { 
     Log.d(TAG, "Hello"); 
     return userAPI.updateFriends(session.getUserId(), friends); 
    }).subscribe(); 

Все вызывается до тех пор, пока оно не переключится на коммутатор. Таким образом, привет никогда не отображается, но если я вернусь, например, Observable.just(null) вместо наблюдаемого, то он отлично работает. Кроме того, если я использую модифицированную систему без цепочки, она работает.

Редактировать 1: Это приложение для Android. Похоже, что оператор карты вообще не называется. Иногда случается так, что с модифицированными наблюдаемыми тоже. Я все еще думаю, что это связано с потоками. Из того, что я понимаю, оператор вызывается, когда элемент испускается, но вызов onNext не вызывает оператора map. Ниже весь мой код:

public Observable<List<FacebookFriend>> getFriends() { 
    PublishSubject<List<FacebookFriend>> friendsPublishSubject = PublishSubject.create(); 

    Observable<List<FacebookFriend>> returnObservable = friendsPublishSubject.doOnSubscribe(() -> { 
    Log.d(TAG, "OnSubscribe called"); 
    Session session = Session.getActiveSession(); 

    if (session != null && session.isOpened()) { 
     new Request(session, "/me/friends", null, HttpMethod.GET, 
       new Request.Callback() { 
        public void onCompleted(Response response) { 
         JSONObject graphResponse = response.getGraphObject() 
           .getInnerJSONObject(); 
         try { 
          JSONArray friends = graphResponse.getJSONArray("data"); 
          Gson gson = new Gson(); 
          Type listType = new TypeToken<ArrayList<FacebookFriend>>() { 
          }.getType(); 
          List<FacebookFriend> friendsList = gson.fromJson(friends.toString(), listType); 

          friendsPublishSubject.onNext(friendsList); 
          friendsPublishSubject.onCompleted(); 
         } catch (JSONException e) { 
          e.printStackTrace(); 
          friendsPublishSubject.onError(e); 
         } 
        } 
       }).executeAsync(); 
    } else { 
     InvalidSessionException exception = new InvalidSessionException("Your facebook session expired"); 
     friendsPublishSubject.onError(exception); 
    } 
    }); 

    return returnObservable.subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread()); 
} 

public Observable<Void> updateFriendsList() { 
    Observable<List<FacebookFriend>> facebookFriendsListObservable = facebookService.getFriends(); 

    Observable<Void> updateReqestObservable = facebookFriendsListObservable.map(friends -> { 
    Log.d(TAG, "This is never called"); 
}); 

}

+0

Вы пытаетесь использовать flatMap или concatMap вместо switchMap? Я не думаю, что проблема заключается в switchMap, но так как я плохо знаю этого оператора ... – dwursteisen

+1

Я пробовал, оператор не вызван, он остается застрявшим. 'onError' также не вызван. Единственный способ, который работает, - это подписаться непосредственно на наблюдаемый. Я думаю, что это может быть связано с потоками, но я не уверен. Я закончил использование 'observable.toBlocking()'. Я знаю, что это не хорошая практика, но пока мы это сделаем, если кто-то споткнется в том же вопросе и найдет ответ, не стесняйтесь публиковать его. – Jelly

+0

гул, да, ты прав. Фактически, ваш вызов выполняется асинхронно, и ваш основной метод остановится, прежде чем вы получите результат. Таким образом, вам придется заблокировать свой основной метод. Поэтому toBlocking - хороший выход. – dwursteisen

ответ

1

Один из способов вы можете обойти блокирование по фактической точке вызова будет подписываться его к предмету, а затем блокировать на эту тему в конце какого-либо части вашего кода требует выполнения запросов.

Например:

final ReplaySubject<Void> subject = ReplaySubject.create(); 
friendsService.getFriends() 
    .switchMap(friends -> userApi.updateFriends(session.getUserId(), friends)) 
    .subscribeOn(Schedulers.io()) 
    .subscribe(subject); 

// Do other things that don't require this to be done yet... 

// Wherever you need to wait for the request to happen: 
subject.toBlocking().singleOrDefault(null); 

Поскольку субъект также Observable, вы можете даже возвратить предмет из метода, и заблокировать на ней позже.

Все, что было сказано, кажется, немного странно использовать вызов обновления в качестве побочного эффекта. Есть updateFriends также Модернизация? Если это так, вы можете подумать о том, чтобы сделать вызов для обновления синхронной службой, которая возвращает void вместо Observable<Void>, которую вы вызовете из вызова onNext. Если он по-прежнему необходимо блокировать, вы можете использовать forEach так:

friendsService.getFriends() 
    .forEach(friends -> { userApi.updateFriends(session.getUserId(), friends) }); 

forEach очень похож на subscribe, за исключением того, что он явно блоков. Вы даже можете использовать его с исходным кодом, но добавленное пустое действие onNext не было бы ужасно чистым.

Если вы также можете предоставить более подробную информацию о структуре вашего приложения (это все в основном методе? Это приложение для Android? И т. Д.), Я мог бы дать несколько указателей на предотвращение блокировки как можно больше.

+0

Благодарим за помощь, я обновил сообщение всем кодом. Я почти уверен, что делаю что-то ужасно неправильно. Если у вас есть пример того, как я могу обернуть rx что-то вроде Facebook SDK, это было бы очень полезно. – Jelly

Смежные вопросы