2015-06-09 1 views
1

Я использую библиотеку, которая реализует свои собственные запросы REST. Он берет «постоянную ссылку» и выдает запрос REST для извлечения содержимого (я предоставляю mocks в исходном коде для тестирования). У меня есть список постоянных ссылок, и я хотел бы запланировать запрос для каждого и создать поток результатов контента. Все это должно быть асинхронным, и как только все результаты будут выполнены, я хочу выпустить их список.Пользовательский наблюдаемый поток REST получается из списка ссылок

Я пытаюсь достичь этого, используя RxJava. Вот что у меня сейчас (жаль не использовать лямбды, я просто хочу, чтобы привыкнуть к именам классов RxJava):

public class Main { 

    public static void main(String[] args) { 
     int count = 10; 
     List<String> permalinks = new ArrayList<>(count); 
     for (int i = 1; i <= count; ++i) { 
      permalinks.add("permalink_" + i); 
     } 

     ContentManager cm = new ContentManager(); 

     Observable 
       .create(new Observable.OnSubscribe<Content>() { 

        int gotCount = 0; 

        @Override 
        public void call(Subscriber<? super Content> subscriber) { 
         for (String permalink : permalinks) { // 1. is iterating here the correct way? 
          if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed? 
           cm.getBasicContentByPermalink(permalink, new RestCallback() { 

            @Override 
            public void onSuccess(Content content) { 
             if (!subscriber.isUnsubscribed()) { 
              subscriber.onNext(content); 
              completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called 
             } 
            } 

            @Override 
            public void onFailure(int code, String message) { 
             if (!subscriber.isUnsubscribed()) { 
              subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure? 
              completeIfFinished(); 
             } 
            } 

            private void completeIfFinished() { 
             ++gotCount; 
             if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic? 
              subscriber.onCompleted(); 
             } 
            } 
           }); 
          } 
         } 
        } 
       }) 
       .toList() 
       .subscribe(new Action1<List<Content>>() { 

        @Override 
        public void call(List<Content> contents) { 
         System.out.println("list count: " + contents.size()); 
         System.out.println("results: "); 
         contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println); 
        } 
       }); 

     System.out.println("finishing main"); 
    } 
} 

// library mocks 

class Content { 

    String basic; 

    String extended; 

    public Content(String basic, String extended) { 
     this.basic = basic; 
     this.extended = extended; 
    } 
} 

interface RestCallback { 

    void onSuccess(Content content); 

    void onFailure(int code, String message); 
} 

class ContentManager { 

    private final Random random = new Random(); 

    public void getBasicContentByPermalink(String permalink, RestCallback callback) { 
     // just to simulate network latency and unordered results 
     new Thread() { 

      @Override 
      public void run() { 
       try { 
        Thread.sleep(random.nextInt(1000) + 200); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       if (random.nextInt(100) < 95) { 
        // 95% of the time we succeed 
        callback.onSuccess(new Content(permalink + "_basic", null)); 
       } else { 
        callback.onFailure(-1, permalink + "_basic_failure"); 
       } 
      } 
     }.start(); 
    } 
} 

Этот бит вид работ, но я не уверен, если я делаю вещи правильно путь. Пожалуйста, обратите внимание на линии 1-5 числа:

  1. При создании наблюдаемого из списка, я должен перебирать по списку самому, или есть какая-то другой, лучше? Для экземпляра существует Observable.from (Iterable), но я не думаю, что я могу использовать его ?
  2. Я проверяю isUnsubscribed перед отправкой запроса REST и также в обоих обработчиках результатов (успех/сбой). Это так, как предполагается, используется ?
  3. Я реализую некоторую логику для вызова onComplete, когда все запросы возвращены , независимо от того, успешно или неудачно (см. Вопрос 5). Но, поскольку они , защищенные вызовом isUnsubscribed, может случиться так, что они onComplete никогда не вызываются. Как поток знает, что он должен заполнить ? Досрочно ли он заканчивается, когда подписчик подписывается, и мне не нужно об этом думать? Есть ли какие-либо оговорки? Например, что произойдет, если абонент отменит подписку в середине испускания контента? В моих тестах список всех результатов никогда не издавался, но я ожидал бы список всех результатов до этого момента.
  4. В методе onFailure я испускаю null с помощью onNext (null), чтобы отметить ошибку . Я делаю это, потому что в итоге у меня будет zip из 2 потоков, и будет выдавать экземпляр пользовательского класса, только если значения zip не равны нулю. Это правильный способ сделать это?
  5. Как я уже говорил, у меня есть специальная логика, чтобы проверить, завершен ли поток . Что я здесь делаю, я подсчитываю результаты REST и когда было обработано, так как в списке есть постоянные ссылки, это делается. Это необходимо? Правильно ли это?

ответ

2

При создании наблюдаемом из списка, я должен пройти по списку сам, или есть какой-то другой, лучше? Например, существует Observable.from (Iterable), но я не думаю, что могу его использовать?

Вместо того, чтобы выполнять итерацию в методе create, вы можете создать наблюдаемый для каждой ссылки.

public Observable<Content> getContent(permalink) { 

    return Observable.create(subscriber -> { 
        ContentManager cm = new ContentManager(); 
        if (!subscriber.isUnsubscribed()) { 
          cm.getBasicContentByPermalink(permalink, new RestCallback() { 

           @Override 
           public void onSuccess(Content content) { 
             subscriber.onNext(content); 
             subscriber.onCompleted(); 
           } 

           @Override 
           public void onFailure(int code, String message) { 
             subscriber.onError(OnErrorThrowable.addValueAsLastCause(new RuntimeException(message, permalink)); 
           } 

       } 
    }); 
} 

затем использовать его с flatMap оператором

Observable.from(permalinks) 
      .flatMap(link -> getContent(link)) 
      .subscribe(); 

Я проверка isUnsubscribed прежде чем отправить запрос REST, а также в обоих результирующих обработчиков (успех/неудача). Так ли это, как предполагается, используется?

check isUnsподписано перед отправкой запроса - это способ сделать. Не уверен, что для этого успеха/неудачи обратного вызова (я думаю, что это бесполезно, но кто-то может сказать, что я не права)

Я реализовать определенную логику для вызова OnComplete, когда все запросы вернулись, в случае успеха или не независимо от того (см вопроса 5). Но, поскольку они охраняются вызовом isUnsubscribed, может случиться так, что они onComplete никогда не вызываются.

Если вы отмените подписку на поток, вы остановитесь, чтобы наблюдать за потоком, поэтому вы не можете получать уведомления о завершении потока.

Например, что произойдет, если абонент отменит подписку в середине испускания результатов контента?

вы сможете потреблять то, что излучалось до отмены подписки.

В методе onFailure я испускаю null с onNext (null), чтобы отметить сбой. Я делаю это, потому что в итоге у меня будет zip из 2 потоков и выйдет экземпляр пользовательского класса, только если оба значения zipped не равны нулю. Это правильный способ сделать это?

nope. вместо нуля, исправить ошибку (с использованием onError на подписчике). Используя это, вам не нужно будет проверять значение null в zip lambda. Просто zip!

Как я уже говорил, у меня есть специальная логика, чтобы проверить, завершен ли поток. Что я здесь делаю, я подсчитываю результаты REST и когда столько обработано, что есть постоянные ссылки в списке, это делается. Это необходимо? Правильно ли это?

Если вы хотите получить постоянную ссылку, когда поток будет завершен, вы можете использовать оператор count.

Observable.from(permalinks) 
      .flatMap(link -> getContent(link)) 
      .count() 
      .subscribe(System.out::println); 

Если вы хотите, чтобы испускать количество линии связи, в конце sucessfull ссылки, вы можете попробовать scan оператор

Observable.from(permalinks) 
      .flatMap(link -> getContent(link)) 
      .map(content -> 1) // map content to 1 in order to count it. 
      .scan((seed, acu) -> acu + 1) 
      .subscribe(System.out::println); // will print 2, 3... 
+0

Я попытался с/flatMap себя, но я сделал ошибку, код помог я делаю это правильно, я предпочитаю этот путь, мне не нужна моя логическая логика, чтобы обнаружить завершение, спасибо! В строке subscriber.onError (OnErrorThrowable.addValueAsLastCause (новое исключение RuntimeException («failed»), permalink)); Я получаю: Исключение в потоке «Thread-7» rx.exceptions.OnErrorNotImplementedException: не удалось и никогда не видит никаких выбросов из потока. Я не худею, это путь. Я просто не позвоню onNext, а затем вызовет onCompleted, который преобразует постоянную ссылку в 0 Содержание. Мне даже не нужен фильтр для нулей. – wujek

+0

'вы сможете потреблять то, что было испущено, прежде чем отказаться от подписки.' Когда я реализую свой RestCallback # onFailure для вызова подписчика.unsubscribe(), я никогда не вижу каких-либо выбросов, независимо от того, сколько их удалось добиться - я вижу, что некоторым удалось просто поместить Sysout в метод onSuccess. Это вызвано вызовом «toList». Когда я не жду полного списка, а сам обрабатываю каждую эмиссию самостоятельно, я получаю успешные до точки отказа. Но мне нужен список. – wujek

+2

ВЫ ДОЛЖНЫ использовать onError вместо onCompleted для уведомления об ошибке. Если вы хотите обнаружить ошибку, вы должны выполнить обратный вызов onError: «Observable.error (новое RuntimeException (« oups »). Subscribe (System.out :: println, (e) -> System.err.println («ОШИБКА ->» + e.getMessage()); '. Это позволит вам управлять ошибками в вашем потоке (используя метод onErrorResumeNext, ...) – dwursteisen

Смежные вопросы