2015-05-14 3 views
0

У меня есть успокоительный API, который получает массив сообщений JSON, которые будут преобразованы в отдельные сообщения Avro, а затем отправлены в Kafka. Внутри маршрута я вызываю 3 разных актера: 1) один актер выходит и извлекает схему Avro с диска 2), затем прокручивает массив сообщений JSON и сравнивает его с схемой Avro второго актера. Если какое-либо из сообщений не проверяется, тогда мне нужно вернуть ответ вызывающей стороне API и прекратить обработку. 3) Прокрутите массив и перейдите к третьему актеру, который берет объект JSON, преобразует его в сообщение Avro и отправляет в тему Kafka.Цепочка Акка Актеры в Spray Route

Где у меня проблема с закрытием головы, как прекратить обработку на маршруте, если что-то не срабатывает у одного из актеров. Я передаю контекст запроса каждому актеру и называю его полным методом, но он, кажется, не сразу останавливается, следующий актер все равно обрабатывается, даже если это не так. Вот фрагмент кода, что я делаю в маршруте:

post { 
entity(as[JsObject]) { membersObj => 
     requestContext => 
     val membersJson = membersObj.fields("messages").convertTo[JsArray].elements 
     val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2())) 
     val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService())) 
     val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService())) 

     implicit val timeout = Timeout(5 seconds) 

     val future = avroService ? AvroSchema.MemberSchema(requestContext) 
     val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema] 

     for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext) 

     for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext)) 

Я просмотрел много блогов/книг/слайдов вокруг этой темы и не уверен, что лучший подход. Я использую Scala/Akka около 2 месяцев и в основном сам преподаю только те части, которые мне нужны. Таким образом, любое понимание того, что более опытные разработчики Scala/Akka/Spray имеют в этом, очень ценится. Мне показалось, что я должен был обернуть 3 актеров в «хозяина» актера и сделать каждого ребенка этого актера и попытаться приблизиться к нему так.

ответ

0

Поскольку вы используете обработку асинхронной обработки (!), вы не можете контролировать обработку после того, как ваши сообщения были отправлены. Вам нужно будет использовать ask (?), который вернет будущее, с которым вы можете работать.

Но у меня есть идея. Вы могли отправить сообщение от первого актера ко второму. И вместо того, чтобы вернуть результат первому актеру, вы можете отправить сообщение третьему, чтобы продолжить вычисление.

+0

Карлос, спасибо вам за эту идею. Так что я сделал, это создать актера «супервизора», который вызывает маршрут, и этот актер обрабатывает интерфейс с помощью маршрута (управляя сообщениями об успешном запуске и ошибках от других 3-х актеров. Затем, как только Актер А будет успешным, его сообщения Актер B. Если Актер B успешный, он называет Актера C. Если в актерах A, B, C есть ошибка, тогда сообщение об ошибке отправляется обратно наблюдающему актеру. Теперь работает как шарм. –

Смежные вопросы