Я пытаюсь выяснить, как обрабатывать ошибки при отображении элементов внутри потока.Как обрабатывать ошибки во время выполнения Flux.map()
Например, I'm разбор строки CSV в один из моей деловой POJOs:
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
Некоторые из этих линий могут содержать ошибки, так что я получаю в журнале является:
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
я прочитал в API некоторые ошибки методы обработки, но большинство refered возвращения в «значение ошибки» или используя запасной вариант флюса, как этот:
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
Однако использование этого с моим myflux
означает, что весь поток обрабатывается снова.
Итак, есть ли способ обрабатывать ошибки при обработке отдельных элементов (I.e игнорируя их/регистрировать их) и продолжать обрабатывать остальную часть потока?
UPDATE с @akarnokd обходного
public Flux<StockQuotation> getQuotes(List<String> tickers)
{
Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list of raw quotes string in a new Flux<String>
.flatMap(list -> Flux.fromIterable(list))
// Convert the string to POJOs
.flatMap(x -> {
try {
return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));
}
catch (IllegalArgumentException ex){
System.out.println("Error decoding stock quotation: " + x);
return Flux.empty();
}
});
return processingFlux;
}
Это работает как шарм, однако, как вы можете увидеть код менее изящна, чем раньше. Не имеет ли API Flux какой-либо метод делать то, что делает этот код?
retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
прекрасно работает (собирается принять этот ответ), но я хотел бы знать, может ли это быть сделано с API. Если нет, я открою запрос функции. Благодаря! – Victor
Это стандартный API-интерфейс для выполнения такого поведения. Ошибки - это терминальные события, и вы должны преобразовать их во что-то еще в лямбда, чтобы избежать прекращения. – akarnokd
Хорошо. Я предложил создать новый метод обработки отдельных сбоев (возможно, публикация этих сбоев как поток «мертвой буквы»?). Может быть, это может быть полезно ... – Victor