Вот что я в конечном итоге с помощью:
case class FlowItem[I](i: I, requests: Seq[HttpRequest], responses: Seq[String]) {
def withResponse(resp: String) = copy(responses = resp +: responses)
def extractNextRequest = (requests.head, copy(requests = requests.tail))
}
def apiFlow[I, O](requestPer: FiniteDuration,
buildRequests: I => Seq[HttpRequest],
buildOut: (I, Seq[String]) => O
)(implicit system: ActorSystem, materializer: ActorMaterializer) = {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val in: FlowShape[I, FlowItem[I]] =
b.add(Flow[I].map(i => FlowItem(i, buildRequests(i), Seq.empty)))
val merge: MergePreferredShape[FlowItem[I]] =
b.add(MergePreferred[FlowItem[I]](1))
val throttle: FlowShape[FlowItem[I], FlowItem[I]] =
b.add(Flow[FlowItem[I]].throttle(1, requestPer, 1, ThrottleMode.shaping))
val prepareRequest: FlowShape[FlowItem[I], (HttpRequest, FlowItem[I])] =
b.add(Flow[FlowItem[I]].map(_.extractNextRequest))
val log =
b.add(Flow[(HttpRequest, FlowItem[I])].map { r => Console.println(s"rquest to ${r._1.uri}"); r})
val pool: FlowShape[(HttpRequest, FlowItem[I]), (Try[HttpResponse], FlowItem[I])] =
b.add(Http(system).superPool[FlowItem[I]]())
val transformResponse: FlowShape[(Try[HttpResponse], FlowItem[I]), FlowItem[I]] =
b.add(Flow[(Try[HttpResponse], FlowItem[I])].mapAsync(1) {
case (Success(HttpResponse(StatusCodes.OK, headers, entity, _)), flowItem) =>
entity.toStrict(1.second).map(resp => flowItem.withResponse(resp.data.utf8String))
})
val split: UniformFanOutShape[FlowItem[I], FlowItem[I]] =
b.add(Partition[FlowItem[I]](2, fi => if (fi.requests.isEmpty) 0 else 1))
val out: FlowShape[FlowItem[I], O] =
b.add(Flow[FlowItem[I]].map(fi => buildOut(fi.i, fi.responses)))
in ~> merge ~> throttle ~> prepareRequest ~> log ~> pool ~> transformResponse ~> split ~> out
merge.preferred <~ split
FlowShape(in.in, out.out)
}
}
Идея заключается в том, чтобы передать элементы бросить дроссельную заслонку столько раз, сколько есть запросы, и хранить дополнительные (пока не выполняется) запросов по с сообщениями. Элемент split
проверяет, осталось ли больше запросов.
Вы считаете flow.throttle? –
Да, но поскольку у меня есть 'Seq [Request]', мне нужно подождать между каждым запросом в этом 'Seq'. Итак, мне нужен куполообразный вид внутреннего дросселя, а также –
flatMapConcat (seq => Source (seq) .throttle (...) .grouped (seq.size))? –