2016-06-17 1 views
0

Мое приложение имеет производителя и потребителя. Мой продюсер выпускает сообщения нерегулярно. Когда-нибудь моя очередь будет пустой, иногда у меня будет несколько сообщений. Я хотел бы, чтобы мой потребитель слушал очередь и когда в ней появилось сообщение, возьмите его и обработайте это сообщение. Процесс может занять несколько часов, и я не хочу, чтобы мой потребитель взял другое сообщение в очереди, если он не закончил обработку текущего сообщения.Потребительские товары/Производитель AWS SQS akka scala с потребителем синхронных товаров

Я думаю, что AKKA и AWS SQS могут удовлетворить мои потребности. Читая документы и примеры, кажется, что акка-верблюд может упростить мою работу.

Я нашел это example на github.

Я больше заинтересован в конфигурации потребителя (Thread.sleep только имитировать мою обработку):

class MySqsConsumer extends Consumer { 

    //The SQS URI is an in-only message exchange (autoAck=true) 
    override def endpointUri = "aws-sqs://sqs-akka-camel? amazonSQSClient=#client" 

    override def receive = { 
    case msg: CamelMessage => { 
     println("Start received %s" format msg.bodyAs[String]) 
     Thread.sleep(4000) 
     println("Stop received %s" format msg.bodyAs[String]) 

    } 

    } 
} 

StackTrace:

... 
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!] 
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!] 
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!] 
Start received Hello World1! 
Stop received Hello World1! 
Start received Hello World2! 
Stop received Hello World2! 
Start received Hello World3! 
... 

Моя проблема в том, что akka- Верблюд читает сообщения из очереди до того, как будет выполнен метод приема.

Как я могу заставить моего пользователя ждать завершения процесса, прежде чем принимать новое сообщение? Если я использую неправильные инструменты/библиотеки, можете ли вы направить меня на новые инструменты?

ответ

1

Не уверен, что если вы используете AKKA потоки, но если вы, отправить источник в Sink.queue

val graph=yourSource.to(Sink.queue) 

Когда материализуется (graph.run) он будет возвращать SinkQueue, вы можете тянуть вручную элементы от него. Он будет использовать механизм противодавления, поэтому вам не нужно беспокоиться о том, что произойдет, если вы не потянете предметы.

Примечание: Я специально игнорировал дженерики! но не забывайте их.

0

Насколько я знаю, вы не можете этого сделать. Верблюд все время потребляет сообщение из очереди.

Вы можете установить override def autoAck = false - он не отправит другое сообщение , пока вы не подтвердите текущий. Но «скрытый» актер верблюда все равно будет поглощать.

Возможно, вы можете предоставить индивидуальный amazonSQSClient и как-то контролировать его.

Смежные вопросы