Я использую библиотеку, которая реализует свои собственные запросы REST. Он берет «постоянную ссылку» и выдает запрос REST для извлечения содержимого (я предоставляю mocks в исходном коде для тестирования). У меня есть список постоянных ссылок, и я хотел бы запланировать запрос для каждого и создать поток результатов контента. Все это должно быть асинхронным, и как только все результаты будут выполнены, я хочу выпустить их список.Пользовательский наблюдаемый поток REST получается из списка ссылок
Я пытаюсь достичь этого, используя RxJava. Вот что у меня сейчас (жаль не использовать лямбды, я просто хочу, чтобы привыкнуть к именам классов RxJava):
public class Main {
public static void main(String[] args) {
int count = 10;
List<String> permalinks = new ArrayList<>(count);
for (int i = 1; i <= count; ++i) {
permalinks.add("permalink_" + i);
}
ContentManager cm = new ContentManager();
Observable
.create(new Observable.OnSubscribe<Content>() {
int gotCount = 0;
@Override
public void call(Subscriber<? super Content> subscriber) {
for (String permalink : permalinks) { // 1. is iterating here the correct way?
if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
cm.getBasicContentByPermalink(permalink, new RestCallback() {
@Override
public void onSuccess(Content content) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(content);
completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
}
}
@Override
public void onFailure(int code, String message) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
completeIfFinished();
}
}
private void completeIfFinished() {
++gotCount;
if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
subscriber.onCompleted();
}
}
});
}
}
}
})
.toList()
.subscribe(new Action1<List<Content>>() {
@Override
public void call(List<Content> contents) {
System.out.println("list count: " + contents.size());
System.out.println("results: ");
contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
}
});
System.out.println("finishing main");
}
}
// library mocks
class Content {
String basic;
String extended;
public Content(String basic, String extended) {
this.basic = basic;
this.extended = extended;
}
}
interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public void getBasicContentByPermalink(String permalink, RestCallback callback) {
// just to simulate network latency and unordered results
new Thread() {
@Override
public void run() {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
// 95% of the time we succeed
callback.onSuccess(new Content(permalink + "_basic", null));
} else {
callback.onFailure(-1, permalink + "_basic_failure");
}
}
}.start();
}
}
Этот бит вид работ, но я не уверен, если я делаю вещи правильно путь. Пожалуйста, обратите внимание на линии 1-5 числа:
- При создании наблюдаемого из списка, я должен перебирать по списку самому, или есть какая-то другой, лучше? Для экземпляра существует Observable.from (Iterable), но я не думаю, что я могу использовать его ?
- Я проверяю isUnsubscribed перед отправкой запроса REST и также в обоих обработчиках результатов (успех/сбой). Это так, как предполагается, используется ?
- Я реализую некоторую логику для вызова onComplete, когда все запросы возвращены , независимо от того, успешно или неудачно (см. Вопрос 5). Но, поскольку они , защищенные вызовом isUnsubscribed, может случиться так, что они onComplete никогда не вызываются. Как поток знает, что он должен заполнить ? Досрочно ли он заканчивается, когда подписчик подписывается, и мне не нужно об этом думать? Есть ли какие-либо оговорки? Например, что произойдет, если абонент отменит подписку в середине испускания контента? В моих тестах список всех результатов никогда не издавался, но я ожидал бы список всех результатов до этого момента.
- В методе onFailure я испускаю null с помощью onNext (null), чтобы отметить ошибку . Я делаю это, потому что в итоге у меня будет zip из 2 потоков, и будет выдавать экземпляр пользовательского класса, только если значения zip не равны нулю. Это правильный способ сделать это?
- Как я уже говорил, у меня есть специальная логика, чтобы проверить, завершен ли поток . Что я здесь делаю, я подсчитываю результаты REST и когда было обработано, так как в списке есть постоянные ссылки, это делается. Это необходимо? Правильно ли это?
Я попытался с/flatMap себя, но я сделал ошибку, код помог я делаю это правильно, я предпочитаю этот путь, мне не нужна моя логическая логика, чтобы обнаружить завершение, спасибо! В строке subscriber.onError (OnErrorThrowable.addValueAsLastCause (новое исключение RuntimeException («failed»), permalink)); Я получаю: Исключение в потоке «Thread-7» rx.exceptions.OnErrorNotImplementedException: не удалось и никогда не видит никаких выбросов из потока. Я не худею, это путь. Я просто не позвоню onNext, а затем вызовет onCompleted, который преобразует постоянную ссылку в 0 Содержание. Мне даже не нужен фильтр для нулей. – wujek
'вы сможете потреблять то, что было испущено, прежде чем отказаться от подписки.' Когда я реализую свой RestCallback # onFailure для вызова подписчика.unsubscribe(), я никогда не вижу каких-либо выбросов, независимо от того, сколько их удалось добиться - я вижу, что некоторым удалось просто поместить Sysout в метод onSuccess. Это вызвано вызовом «toList». Когда я не жду полного списка, а сам обрабатываю каждую эмиссию самостоятельно, я получаю успешные до точки отказа. Но мне нужен список. – wujek
ВЫ ДОЛЖНЫ использовать onError вместо onCompleted для уведомления об ошибке. Если вы хотите обнаружить ошибку, вы должны выполнить обратный вызов onError: «Observable.error (новое RuntimeException (« oups »). Subscribe (System.out :: println, (e) -> System.err.println («ОШИБКА ->» + e.getMessage()); '. Это позволит вам управлять ошибками в вашем потоке (используя метод onErrorResumeNext, ...) – dwursteisen