2016-03-01 4 views
4

Я в настоящее время осуществляет автоматический выключатель с Акка-HTTP следующим образом:Автоматический выключатель для Scala и Акка-клиента остальное обслуживание

def sendMail(entity: MyEntity): ToResponseMarshallable = { 

     Thread.sleep(5 * 1000) 
     validateEntity(entity).map[ToResponseMarshallable] { 
     case (body, subject) if !isEmpty(body, subject) => { 
      val mailResponse = sendMail(body, subject) 
      OK -> ProcessedEmailMessage(mailResponse) 
     } 
     case _ => 
      BadRequest -> s"error: for $entity".toJson 
     } 
    } catch { 
     case e: DeserializationException => HttpResponse(BadRequest).withEntity(HttpEntity(s"error:${e.msg}").withContentType(ContentTypes.`application/json`)) 
    } 
    } 

    val maxFailures: Int = 2 
    val callTimeout: FiniteDuration = 1 second 
    val resetTimeout: FiniteDuration = 30 seconds 

    def open: Unit = { 
    logger.info("Circuit Breaker is open") 
    } 

    def close: Unit = { 
    logger.info("Circuit Breaker is closed") 
    } 

    def halfopen: Unit = { 
    logger.info("Circuit Breaker is half-open, next message goes through") 

    private lazy val breaker = CircuitBreaker(
    system.scheduler, 
    maxFailures, 
    callTimeout, 
    resetTimeout 
).onOpen(open).onClose(close).onHalfOpen(halfopen) 

    def routes: Route = { 
    logRequestResult("email-service_aggregator_email") { 
     pathPrefix("v1") { 
     path("sendmail") { 
      post { 
      entity(as[EmailMessage]) { entity => 
       complete { 
       breaker.withCircuitBreaker(Future(sendMail(entity))) 
       } 
      } 
      } 
     } 
     } 
    } 
    } 

Моя проблема заключается в том, что если я использую breaker.withCircuitBreaker(Future(sendMail(entity))) выключатель попадает в открытое состояние но ответ остальное возвращает There was an internal server error как ответ

Если вместо этого я использую breaker.withSyncCircuitBreaker(Future(sendMail(entity))) то выключатель никогда не выходит в открытом состоянии, но она возвращает ожидаемый HttpResponse

Любые мысли о том, как я могу решить эту проблему, чтобы вызвать как автоматический выключатель, так и вернуть правильный HTTP-ответ?

+0

Может вам также опубликовать код, создающий 'breaker'? –

+0

Конечно, я отредактировал это –

+0

Вам нужно использовать 'onComplete' вместо' complete'. Директива 'complete' ожидает, что ответ будет готов к немедленному завершению. В вашем случае 'withCircuitBreaker' возвращает' Future', поэтому 'complete' не будет допустимым вариантом. Директива 'onComplete' настроена для работы с« Будущим », поэтому она лучше подходит. Затем в обратном вызове 'onComplete' вы можете использовать' complete'. – cmbaxter

ответ

1
entity(as[EmailMessage]) { entity => ctx => 
    val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity))) 
    val withErrorHandling = withBreaker.recover { 
     case _: CircuitBreakerOpenException => 
     HttpResponse(TooManyRequests).withEntity("Server Busy") 
    } 
    ctx.complete(withErrorHandling) 
} 
+0

Это не работает, я получаю то же сообщение «Было сообщение о внутреннем сервере». Я попытался выполнить некоторые настройки и метод sendMail(), чтобы возвращать напрямую будущее, если я не завершу это в автоматическом выключателе, он работает, и будущее будет разрешено. –

+0

Отправка почты занимает не менее 5 секунд, пока ваш таймаут составляет 1 секунду. Разве это не послужило бы исключением? –

+0

Он должен войти в открытое состояние, которое происходит, но ответ не возвращается –

1

Я собираюсь предложить другой possibile решения, как я считаю, что onComplete является режим идиоматического путем, когда дело с результатом в Future при заполнении маршрута:

entity(as[EmailMessage]) { entity => 
    val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity))) 

    onComplete(withBreaker){ 
    case Success(trm) => 
     complete(trm) 

    //Circuit breaker opened handling 
    case Failure(ex:CircuitBreakerOpenException) => 
     complete(HttpResponse(TooManyRequests).withEntity("Server Busy")) 

    //General exception handling 
    case Failure(ex) => 
     complete(InternalServerError) 
    } 
} 
Смежные вопросы