У меня есть поток, который зависит от ответов API. Когда ответы не соответствуют тому, что я ожидаю, генерируется исключение. Эта стратегия хорошо работает для Spray и для прямого тестирования методов с specs2.Почему исключения не останавливаются поток потока Akka
Однако, когда я пытаюсь использовать потоки с исключениями, бросая модули, поток просто останавливается.
Это мой поток:
Source(() => file)
.via(csvToSeq)
.via(getFromElastic)
.via(futureExtrtactor)
.via(findLocaionOfId)
.foreach(v => v.map(v => println("foreached", v)))
.onComplete(_ => system.shutdown())
Моя стратегия для этого является использование map
фьючерсов.
так:
val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].map(future => future.map(jsSeq => {
jsSeq.zipWithIndex.flatMap { case (x, i) => x._2.asJsObject.getFields("_source").flatMap(js => {
js.asJsObject("Couldn't convert").getFields("externalId").map({
case JsString(str) => {
(i + 1, i == 0, js)
}
else (i, false, js)
}
case _ => (i, false, x)
})
})
}
}))
Это потенциальный метатель исключение в совершенно другом месте:
val encoded_url = URLEncoder.encode(url, "UTF-8")
Кажется, я что-то не хватает, но не может видеть, что. Спасибо за любые указатели.
Я думаю, вам нужно быть более конкретным здесь. Каково поведение, которое сейчас происходит, и что вы предпочтете вместо этого? Если шаг преобразования терпит неудачу, и это останавливает остальную часть потока, что бы вы предпочли вместо этого? Предоставление такой специфики поможет людям дать вам лучший ответ. – cmbaxter
В вашем 'onComplete' вы игнорируете значение' Try', которое предоставляется с помощью '_'. Вместо этого попробуйте сопоставить «Успех» и «Неудача» и посмотреть, что вы получаете. Если сбой, вы должны иметь стек. – cmbaxter
Программа никогда не заканчивается так, что она не завершена – raam86