Мой вариант использования: Я получаю список постоянных ссылок и вам нужно выпустить два запроса REST на постоянную ссылку, чтобы получить их данные по частям. Когда оба запроса вернутся, я хочу объединить их информацию и сделать с ней что-то (здесь - распечатать). Я хочу сделать это с помощью кода с помощью оператора zip
. Вот мой текущий код (вместе с издевается для библиотеки, я использую):Обработка ошибок для zipped наблюдаемых
public class Main {
public static void main(String[] args) {
ContentManager cm = new ContentManager();
Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
if (dataContent == null || streamUrlContent == null) {
System.err.println("not zipping " + dataContent + " and " + streamUrlContent);
return Observable.empty();
}
return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}))
.subscribe(System.out::println);
}
}
class SubscribingRestCallback implements RestCallback {
private final Subscriber<? super Content> subscriber;
public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}
@Override
public void onFailure(int code, String message) {
System.err.println(message);
subscriber.onNext(null);
subscriber.onCompleted();
}
}
public class Content {
public final String permalink;
public final String logoUrl;
public final String streamUrl;
public Content(String permalink, String logoUrl, String streamUrl) {
this.permalink = permalink;
this.logoUrl = logoUrl;
this.streamUrl = streamUrl;
}
@Override
public String toString() {
return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl);
}
}
public interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public List<String> getPermalinks(int n) {
List<String> permalinks = new ArrayList<>(n);
for (int i = 1; i <= n; ++i) {
permalinks.add("perma_" + i);
}
return permalinks;
}
public void getDataByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, false);
}
public void getStreamByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, true);
}
private void getByPermalink(String permalink, RestCallback callback, boolean stream) {
// simulate network latency and unordered results
new Thread(() -> {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
String logoUrl;
String streamUrl;
if (stream) {
logoUrl = null;
streamUrl = "http://" + permalink + "/stream";
} else {
logoUrl = "http://" + permalink + "/logo.png";
streamUrl = null;
}
callback.onSuccess(new Content(permalink, logoUrl, streamUrl));
} else {
callback.onFailure(-1, permalink + " data failure");
}
}).start();
}
}
В общем, это работает, но я не люблю обработки ошибок в этой реализации. В принципе, запросы REST могут потерпеть неудачу, и в этом случае метод onFailure
вызывает subscriber.onNext(null)
, так что метод zip
всегда имеет что-то, с чем можно работать (один запрос может быть неудачным, а другой - нет, и я не знаю,). Затем в функции zip
мне нужен if
, который проверяет, что оба не являются null
(мой код сработает, если какой-либо из парциальных Content
s будет null
).
Я хочу, чтобы убрать null
с помощью оператора filter
где-нибудь, если это возможно. Или, может быть, есть лучший способ, чем исчисление null
значений для случая отказа, но так, что он все еще работает с функцией zip
?
Почему вы называете 'subscriber.onNext (нуль)' 'от onFailure' метода? В этом случае вы должны называть 'subscriber.onError (throwable)'. –
Потому что вызов 'onError()' приведет к сбою всего потока, и я не хочу этого. Я хочу собрать как можно больше результатов и в основном игнорировать/отфильтровывать сбои. Или я ошибаюсь? – wujek
Даже если вы не хотите, чтобы весь поток не сработал, вам все равно нужно вызвать метод 'subscriber.onError(). Существуют и другие способы устранения ошибок. Один из них - оператор 'onErrorResumeNext'. Вот пример того, как его использовать - https://gist.github.com/nsk-mironov/3270b103fc3326a325e2. –