2016-03-26 2 views
7

Я пытаюсь выяснить, как обрабатывать ошибки при отображении элементов внутри потока.Как обрабатывать ошибки во время выполнения Flux.map()

Например, I'm разбор строки CSV в один из моей деловой POJOs:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock)); 

Некоторые из этих линий могут содержать ошибки, так что я получаю в журнале является:

reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001)) 
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999)) 
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo) 
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo 

я прочитал в API некоторые ошибки методы обработки, но большинство refered возвращения в «значение ошибки» или используя запасной вариант флюса, как этот:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff); 

Однако использование этого с моим myflux означает, что весь поток обрабатывается снова.

Итак, есть ли способ обрабатывать ошибки при обработке отдельных элементов (I.e игнорируя их/регистрировать их) и продолжать обрабатывать остальную часть потока?

UPDATE с @akarnokd обходного

public Flux<StockQuotation> getQuotes(List<String> tickers) 
{ 
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers) 
    // Get each set of quotes in a separate thread 
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s))) 
    // Convert each list of raw quotes string in a new Flux<String> 
    .flatMap(list -> Flux.fromIterable(list)) 
    // Convert the string to POJOs 
    .flatMap(x -> { 
      try { 
       return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));  
      } 
      catch (IllegalArgumentException ex){ 
       System.out.println("Error decoding stock quotation: " + x); 
       return Flux.empty(); 
      } 
    }); 

    return processingFlux; 
} 

Это работает как шарм, однако, как вы можете увидеть код менее изящна, чем раньше. Не имеет ли API Flux какой-либо метод делать то, что делает этот код?

retry(...) 
retryWhen(...) 
onErrorResumeWith(...) 
onErrorReturn(...) 

ответ

5

Вам нужно flatMap вместо которой давайте вы возвращаете пустую последовательность, если обработка не удалась:

myflux.flatMap(v -> { 
    try { 
     return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock)); 
    } catch (IllegalArgumentException ex) { 
     return Flux.empty(); 
    } 
}); 
+0

прекрасно работает (собирается принять этот ответ), но я хотел бы знать, может ли это быть сделано с API. Если нет, я открою запрос функции. Благодаря! – Victor

+0

Это стандартный API-интерфейс для выполнения такого поведения. Ошибки - это терминальные события, и вы должны преобразовать их во что-то еще в лямбда, чтобы избежать прекращения. – akarnokd

+0

Хорошо. Я предложил создать новый метод обработки отдельных сбоев (возможно, публикация этих сбоев как поток «мертвой буквы»?). Может быть, это может быть полезно ... – Victor

Смежные вопросы