2015-01-08 2 views
4

У меня есть поток, который зависит от ответов API. Когда ответы не соответствуют тому, что я ожидаю, генерируется исключение. Эта стратегия хорошо работает для Spray и для прямого тестирования методов с specs2.Почему исключения не останавливаются поток потока Akka

Однако, когда я пытаюсь использовать потоки с исключениями, бросая модули, поток просто останавливается.

Это мой поток:

Source(() => file) 
     .via(csvToSeq) 
     .via(getFromElastic) 
     .via(futureExtrtactor) 
     .via(findLocaionOfId) 
     .foreach(v => v.map(v => println("foreached", v))) 
     .onComplete(_ => system.shutdown()) 

Моя стратегия для этого является использование map фьючерсов.

так:

val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].map(future => future.map(jsSeq => { 
     jsSeq.zipWithIndex.flatMap { case (x, i) => x._2.asJsObject.getFields("_source").flatMap(js => { 
     js.asJsObject("Couldn't convert").getFields("externalId").map({ 
      case JsString(str) => { 
       (i + 1, i == 0, js) 
      } 
      else (i, false, js) 
      } 
      case _ => (i, false, x) 
     }) 
     }) 
     } 
    })) 

Это потенциальный метатель исключение в совершенно другом месте:

val encoded_url = URLEncoder.encode(url, "UTF-8") 

Кажется, я что-то не хватает, но не может видеть, что. Спасибо за любые указатели.

+0

Я думаю, вам нужно быть более конкретным здесь. Каково поведение, которое сейчас происходит, и что вы предпочтете вместо этого? Если шаг преобразования терпит неудачу, и это останавливает остальную часть потока, что бы вы предпочли вместо этого? Предоставление такой специфики поможет людям дать вам лучший ответ. – cmbaxter

+2

В вашем 'onComplete' вы игнорируете значение' Try', которое предоставляется с помощью '_'. Вместо этого попробуйте сопоставить «Успех» и «Неудача» и посмотреть, что вы получаете. Если сбой, вы должны иметь стек. – cmbaxter

+0

Программа никогда не заканчивается так, что она не завершена – raam86

ответ

3

Это звучит как проблема, которая будет рассмотрена после того, как будет реализован Supervision for Akka Streams. Потоки Akka по-прежнему «пред-экспериментальные», так что функция еще не реализована, но, безусловно, планируется вскоре включить.

// На момент написания настоящего комментария текущая версия 1.0-M2 (предварительный этап).

3

Дело в том, что я отсутствовал был mapAsync.

изменения выше функцию:

val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].mapAsync({...}) 

Таким образом, фьючерсы разворачивают и исключения остановить программу, как ожидалось.

+0

Да; когда я прочитал ваш вопрос, я был удивлен, увидев Поток [Будущее]; Это казалось излишним. (хотя есть некоторые причины, по которым требуется принятие «Будущего вниз по потоку»). –

+0

Это на самом деле очень хороший момент. Мне было непонятно, как я раскрываю будущее, и это все. возможно, я должен опубликовать новый набор вопросов/ответов – raam86

Смежные вопросы