2016-12-08 2 views
3

У меня есть SourceQueue. Когда я предлагаю элемент для этого, я хочу, чтобы он прошел через Stream, и когда он достигнет Sink, результат возвращается к коду, который предложил этот элемент (аналогично Sink.head возвращает элемент вызова RunnableGraph.run()).Akka Stream возвращать объект из раковины

Как это достичь? Простой пример моей проблемы было бы:

val source = Source.queue[String](100, OverflowStrategy.fail) 
val flow = Flow[String].map(element => s"Modified $element") 
val sink = Sink.ReturnTheStringSomehow 
val graph = source.via(flow).to(sink).run() 

val x = graph.offer("foo") 
println(x) // Output should be "Modified foo" 
val y = graph.offer("bar") 
println(y) // Output should be "Modified bar" 
val z = graph.offer("baz") 
println(z) // Output should be "Modified baz" 

Edit: Для примера я дал в этом вопросе Владимир Матвеев предоставившего лучший ответ. Однако следует отметить, что это решение работает только в том случае, если элементы входят в sink в том же порядке, в каком они были предложены для source. Если это не может быть гарантировано, порядок элементов в sink может отличаться, и результат может отличаться от ожидаемого.

ответ

5

Я считаю, что проще использовать уже существующий примитив для вытягивания значений из потока, который называется Sink.queue. Вот пример:

val source = Source.queue[String](128, OverflowStrategy.fail) 
val flow = Flow[String].map(element => s"Modified $element") 
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1)) 

val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run() 

def getNext: String = Await.result(sinkQueue.pull(), 1.second).get 

sourceQueue.offer("foo") 
println(getNext) 

sourceQueue.offer("bar") 
println(getNext) 

sourceQueue.offer("baz") 
println(getNext) 

Он делает именно то, что вы хотите.

Обратите внимание, что установка атрибута inputBuffer для приемника очереди может быть или не быть важна для вашего случая использования - если вы его не установили, буфер будет нулевым и данные не будут проходить через поток пока вы не вызовете метод pull() на раковине.

sinkQueue.pull() дает Future[Option[T]], который будет успешно завершен Some, если приемник получает элемент или сбой, если поток не работает. Если поток завершится нормально, он будет заполнен None. В этом конкретном примере я игнорирую это, используя Option.get, но вы, вероятно, захотите добавить пользовательскую логику для обработки этого случая.

+0

Это классно, и он отлично работает с моим примером. Однако в моем фактическом коде я использую 'Stream' для обработки' HttpRequests'. «Поток» развивает несколько подпотоков и снова сливается. Некоторые из подпотоков будут быстрее, чем другие, и использование 'Sink' как' queue' заставляет меня думать, что я не могу гарантировать, что запросы будут в конечном итоге с правильным ответом. – RemcoW

+0

Ну, в соответствии с вашими примерами и описанием, вы хотите нажать значение в исходную очередь, а затем вывести его из раковины синхронно. До тех пор, пока вы будете следовать шаблону «нажимать на вход - вытащить результат», вы можете быть уверены, что обрабатываете ответы в правильном порядке. Но даже если ваш шаблон доступа отличается (и было бы хорошо, если бы оно было отражено в вопросе), самый простой способ сопоставить запрос с ответом - передать запрос вместе с преобразованным значением в кортеже. –

+0

Вы правы, я не упоминал об этом в своем описании. Ответ - лучший ответ на этот вопрос. – RemcoW

1

Ну, вы знаете, что offer() метод возвращает, если вы посмотрите на его определение :) Что вы можете сделать, это создать Source.queue[(Promise[String], String)], создать вспомогательную функцию, которая выталкивает пару течь через offer, убедитесь, что offer не терпит неудачу, потому что очередь может быть полной, а затем выполнить обещание внутри вашего потока и использовать будущее обещания для завершения события завершения во внешнем коде.

Я делаю это, чтобы снизить скорость до внешнего API, используемого из нескольких мест моего проекта.

Вот как это выглядело в моем проекте, прежде чем типизированный добавлены источники концентраторов Акку

import scala.concurrent.Promise 
import scala.concurrent.Future 
import java.util.concurrent.ConcurrentLinkedDeque 

import akka.stream.scaladsl.{Keep, Sink, Source} 
import akka.stream.{OverflowStrategy, QueueOfferResult} 

import scala.util.Success 

private val queue = Source.queue[(Promise[String], String)](100, OverflowStrategy.backpressure) 
    .toMat(Sink.foreach({ case (p, param) => 
     p.complete(Success(param.reverse)) 
    }))(Keep.left) 
    .run 

private val futureDeque = new ConcurrentLinkedDeque[Future[String]]() 

private def sendQueuedRequest(request: String): Future[String] = { 

    val p = Promise[String] 

    val offerFuture = queue.offer(p -> request) 

    def addToQueue(future: Future[String]): Future[String] = { 
    futureDeque.addLast(future) 
    future.onComplete(_ => futureDeque.remove(future)) 
    future 
    } 

    offerFuture.flatMap { 
    case QueueOfferResult.Enqueued => 
     addToQueue(p.future) 
    }.recoverWith { 
    case ex => 
     val first = futureDeque.pollFirst() 
     if (first != null) 
     addToQueue(first.flatMap(_ => sendQueuedRequest(request))) 
     else 
     sendQueuedRequest(request) 
    } 
} 

Я понимаю, что блокировка синхронизированной очереди может быть узким местом, и может расти до бесконечности, но потому, что вызовы API в моем проекте сделано только из другие потоки akka, которые имеют прочное давление. У меня никогда не было более дюжины предметов в futureDeque. Ваша ситуация может отличаться.

Если вы создадите MergeHub.source[(Promise[String], String)](), вместо этого вы получите многоразовую раковину. Таким образом, каждый раз, когда вам нужно обрабатывать элемент, вы создадите полный график и запустите его. В этом случае вам не потребуется взломанный контейнер java для запросов очереди.

+0

Это великолепно. Я не думал использовать «Обещание». Не могли бы вы рассказать о вспомогательной функции?Я не понимаю, что вы подразумеваете под этим. – RemcoW

+0

@ RemcoW Обновлено. – expert

+0

Sweet, спасибо за это – RemcoW

Смежные вопросы