2015-06-13 3 views
8

Мой вариант использования: Я получаю список постоянных ссылок и вам нужно выпустить два запроса REST на постоянную ссылку, чтобы получить их данные по частям. Когда оба запроса вернутся, я хочу объединить их информацию и сделать с ней что-то (здесь - распечатать). Я хочу сделать это с помощью кода с помощью оператора zip. Вот мой текущий код (вместе с издевается для библиотеки, я использую):Обработка ошибок для zipped наблюдаемых

public class Main { 

    public static void main(String[] args) { 
     ContentManager cm = new ContentManager(); 

     Observable 
       .from(cm.getPermalinks(10)) 
       .flatMap(permalink -> Observable.zip(
         Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
         Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
         (dataContent, streamUrlContent) -> { 
          if (dataContent == null || streamUrlContent == null) { 
           System.err.println("not zipping " + dataContent + " and " + streamUrlContent); 
           return Observable.empty(); 
          } 

          return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); 
         })) 
       .subscribe(System.out::println); 
    } 
} 

class SubscribingRestCallback implements RestCallback { 

    private final Subscriber<? super Content> subscriber; 

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) { 
     this.subscriber = subscriber; 
    } 

    @Override 
    public void onSuccess(Content content) { 
     subscriber.onNext(content); 
     subscriber.onCompleted(); 
    } 

    @Override 
    public void onFailure(int code, String message) { 
     System.err.println(message); 
     subscriber.onNext(null); 
     subscriber.onCompleted(); 
    } 
} 

public class Content { 

    public final String permalink; 

    public final String logoUrl; 

    public final String streamUrl; 

    public Content(String permalink, String logoUrl, String streamUrl) { 
     this.permalink = permalink; 
     this.logoUrl = logoUrl; 
     this.streamUrl = streamUrl; 
    } 

    @Override 
    public String toString() { 
     return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl); 
    } 
} 

public interface RestCallback { 

    void onSuccess(Content content); 

    void onFailure(int code, String message); 
} 

class ContentManager { 

    private final Random random = new Random(); 

    public List<String> getPermalinks(int n) { 
     List<String> permalinks = new ArrayList<>(n); 
     for (int i = 1; i <= n; ++i) { 
      permalinks.add("perma_" + i); 
     } 

     return permalinks; 
    } 

    public void getDataByPermalink(String permalink, RestCallback callback) { 
     getByPermalink(permalink, callback, false); 
    } 

    public void getStreamByPermalink(String permalink, RestCallback callback) { 
     getByPermalink(permalink, callback, true); 
    } 

    private void getByPermalink(String permalink, RestCallback callback, boolean stream) { 
     // simulate network latency and unordered results 
     new Thread(() -> { 
      try { 
       Thread.sleep(random.nextInt(1000) + 200); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      if (random.nextInt(100) < 95) { 
       String logoUrl; 
       String streamUrl; 
       if (stream) { 
        logoUrl = null; 
        streamUrl = "http://" + permalink + "/stream"; 
       } else { 
        logoUrl = "http://" + permalink + "/logo.png"; 
        streamUrl = null; 
       } 
       callback.onSuccess(new Content(permalink, logoUrl, streamUrl)); 
      } else { 
       callback.onFailure(-1, permalink + " data failure"); 
      } 
     }).start(); 
    } 
} 

В общем, это работает, но я не люблю обработки ошибок в этой реализации. В принципе, запросы REST могут потерпеть неудачу, и в этом случае метод onFailure вызывает subscriber.onNext(null), так что метод zip всегда имеет что-то, с чем можно работать (один запрос может быть неудачным, а другой - нет, и я не знаю,). Затем в функции zip мне нужен if, который проверяет, что оба не являются null (мой код сработает, если какой-либо из парциальных Content s будет null).

Я хочу, чтобы убрать null с помощью оператора filter где-нибудь, если это возможно. Или, может быть, есть лучший способ, чем исчисление null значений для случая отказа, но так, что он все еще работает с функцией zip?

+0

Почему вы называете 'subscriber.onNext (нуль)' 'от onFailure' метода? В этом случае вы должны называть 'subscriber.onError (throwable)'. –

+0

Потому что вызов 'onError()' приведет к сбою всего потока, и я не хочу этого. Я хочу собрать как можно больше результатов и в основном игнорировать/отфильтровывать сбои. Или я ошибаюсь? – wujek

+0

Даже если вы не хотите, чтобы весь поток не сработал, вам все равно нужно вызвать метод 'subscriber.onError(). Существуют и другие способы устранения ошибок. Один из них - оператор 'onErrorResumeNext'. Вот пример того, как его использовать - https://gist.github.com/nsk-mironov/3270b103fc3326a325e2. –

ответ

5

Прежде всего, правильный путь, чтобы уведомить Subscriber об ошибке является вызов subscriber.onError метод:

class SubscribingRestCallback implements RestCallback { 
    private final Subscriber<? super Content> subscriber; 

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) { 
     this.subscriber = subscriber; 
    } 

    @Override 
    public void onSuccess(Content content) { 
     subscriber.onNext(content); 
     subscriber.onCompleted(); 
    } 

    @Override 
    public void onFailure(int code, String message) { 
     subscriber.onError(new Exception(message)); 
    } 
} 

Даже если вы не хотите, чтобы весь поток на неудачу, вы должны назвать subscriber.onError() способ. Существуют и другие способы устранения ошибок. Один из них является onErrorResumeNext оператор:

Observable 
     .from(cm.getPermalinks(10)) 
     .flatMap(permalink -> Observable.zip(
       Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       (dataContent, streamUrlContent) -> { 
        return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); 
       }).onErrorResumeNext(Observable.empty())) 
     .subscribe(System.out::println); 

EDIT

У меня есть один последний вопрос: если вы заметили, мои функции молнии, я вернусь Observable.empty(), если два объекта не может быть застегнутым молнией, и как только я вернуть содержимое. Это кажется неправильным. Как я должен обрабатывать такую ​​ошибку условий в функции застежки-молнии?

Да, возвращение Observable.empty() совершенно неверно. Бросив исключение из zip функции кажется лучшим решением:

Observable 
     .from(cm.getPermalinks(10)) 
     .flatMap(permalink -> Observable.zip(
       Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       (dataContent, streamUrlContent) -> { 
        if (!isDataValid(dataContent, streamUrlContent)) { 
         throw new RuntimeException("Something went wrong."); 
        } 
        return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); 
       }).onErrorResumeNext(Observable.empty())) 
     .subscribe(System.out::println); 
+0

Что я в итоге сделал: методы 'RestCallback # onFailure()' вызывают 'subscriber.onError()', а в 'RestCallback # onSuccess()' я либо вызываю 'onNext()' и 'onComplete()' if содержимое действительно, или 'onError()', если нет. Затем «onResumeNext()», примененный к zipped «Observable», также обрабатывает такие ошибки содержимого, а моя функция zipping так же проста, как '(dataContent, streamUrlContent) -> новое содержимое (dataContent.permalink, dataContent. logoUrl, streamUrlContent.streamUrl)) '. Я добавил вызов doOnError() для дополнительного журнала. Мне нравится код сейчас. Спасибо за вашу помощь. – wujek

Смежные вопросы