Мое приложение имеет производителя и потребителя. Мой продюсер выпускает сообщения нерегулярно. Когда-нибудь моя очередь будет пустой, иногда у меня будет несколько сообщений. Я хотел бы, чтобы мой потребитель слушал очередь и когда в ней появилось сообщение, возьмите его и обработайте это сообщение. Процесс может занять несколько часов, и я не хочу, чтобы мой потребитель взял другое сообщение в очереди, если он не закончил обработку текущего сообщения.Потребительские товары/Производитель AWS SQS akka scala с потребителем синхронных товаров
Я думаю, что AKKA и AWS SQS могут удовлетворить мои потребности. Читая документы и примеры, кажется, что акка-верблюд может упростить мою работу.
Я нашел это example на github.
Я больше заинтересован в конфигурации потребителя (Thread.sleep только имитировать мою обработку):
class MySqsConsumer extends Consumer {
//The SQS URI is an in-only message exchange (autoAck=true)
override def endpointUri = "aws-sqs://sqs-akka-camel? amazonSQSClient=#client"
override def receive = {
case msg: CamelMessage => {
println("Start received %s" format msg.bodyAs[String])
Thread.sleep(4000)
println("Stop received %s" format msg.bodyAs[String])
}
}
}
StackTrace:
...
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!]
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!]
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!]
Start received Hello World1!
Stop received Hello World1!
Start received Hello World2!
Stop received Hello World2!
Start received Hello World3!
...
Моя проблема в том, что akka- Верблюд читает сообщения из очереди до того, как будет выполнен метод приема.
Как я могу заставить моего пользователя ждать завершения процесса, прежде чем принимать новое сообщение? Если я использую неправильные инструменты/библиотеки, можете ли вы направить меня на новые инструменты?