2016-11-28 3 views
2

Одним из этапов моего вычислительного графика является поток типа Flow[Seq[Request], Seq[Response], NotUsed]. Очевидно, что этот этап должен назначить ответ на каждый запрос и испустить seq после разрешения всех запросов.akka stream http limit limit

Теперь базовый API имеет жесткую политику ограничения скорости, поэтому я могу запускать только один запрос в секунду. Если бы у меня был Flow одного Request, я мог бы zip этот поток с тем, который испускает один элемент в секунду (How to limit an Akka Stream to execute and send down one message only once per second?), но в этом случае я не вижу подобного решения.

Есть ли хороший способ выразить это? Идея, которая приходит мне на ум, заключается в использовании низкоуровневого графика DSL и с потоком в одну секунду-тик в качестве состояния там и с его использованием для обработки последовательностей запросов, но я сомневаюсь, что это получится красиво.

+1

Вы считаете flow.throttle? –

+0

Да, но поскольку у меня есть 'Seq [Request]', мне нужно подождать между каждым запросом в этом 'Seq'. Итак, мне нужен куполообразный вид внутреннего дросселя, а также –

+0

flatMapConcat (seq => Source (seq) .throttle (...) .grouped (seq.size))? –

ответ

1

Вот что я в конечном итоге с помощью:

case class FlowItem[I](i: I, requests: Seq[HttpRequest], responses: Seq[String]) { 
    def withResponse(resp: String) = copy(responses = resp +: responses) 
    def extractNextRequest = (requests.head, copy(requests = requests.tail)) 
    } 


def apiFlow[I, O](requestPer: FiniteDuration, 
        buildRequests: I => Seq[HttpRequest], 
        buildOut: (I, Seq[String]) => O 
        )(implicit system: ActorSystem, materializer: ActorMaterializer) = { 
    GraphDSL.create() { implicit b => 
     import GraphDSL.Implicits._ 

     val in: FlowShape[I, FlowItem[I]] = 
     b.add(Flow[I].map(i => FlowItem(i, buildRequests(i), Seq.empty))) 

     val merge: MergePreferredShape[FlowItem[I]] = 
     b.add(MergePreferred[FlowItem[I]](1)) 

     val throttle: FlowShape[FlowItem[I], FlowItem[I]] = 
     b.add(Flow[FlowItem[I]].throttle(1, requestPer, 1, ThrottleMode.shaping)) 

     val prepareRequest: FlowShape[FlowItem[I], (HttpRequest, FlowItem[I])] = 
     b.add(Flow[FlowItem[I]].map(_.extractNextRequest)) 

     val log = 
     b.add(Flow[(HttpRequest, FlowItem[I])].map { r => Console.println(s"rquest to ${r._1.uri}"); r}) 

     val pool: FlowShape[(HttpRequest, FlowItem[I]), (Try[HttpResponse], FlowItem[I])] = 
     b.add(Http(system).superPool[FlowItem[I]]()) 

     val transformResponse: FlowShape[(Try[HttpResponse], FlowItem[I]), FlowItem[I]] = 
     b.add(Flow[(Try[HttpResponse], FlowItem[I])].mapAsync(1) { 
      case (Success(HttpResponse(StatusCodes.OK, headers, entity, _)), flowItem) => 
      entity.toStrict(1.second).map(resp => flowItem.withResponse(resp.data.utf8String)) 
     }) 

     val split: UniformFanOutShape[FlowItem[I], FlowItem[I]] = 
     b.add(Partition[FlowItem[I]](2, fi => if (fi.requests.isEmpty) 0 else 1)) 


     val out: FlowShape[FlowItem[I], O] = 
     b.add(Flow[FlowItem[I]].map(fi => buildOut(fi.i, fi.responses))) 

     in ~> merge ~> throttle ~> prepareRequest ~> log ~> pool ~> transformResponse ~> split ~> out 
       merge.preferred <~              split 

     FlowShape(in.in, out.out) 
    } 
    } 

Идея заключается в том, чтобы передать элементы бросить дроссельную заслонку столько раз, сколько есть запросы, и хранить дополнительные (пока не выполняется) запросов по с сообщениями. Элемент split проверяет, осталось ли больше запросов.

2

Как сказал Виктор, вы, вероятно, должны использовать дроссель по умолчанию. Но в случае, если вы хотите сделать это самостоятельно, может выглядеть следующим образом

private def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val ticker = Source.tick(rate, rate, Unit) 

    val zip = builder.add(Zip[T, Unit.type]) 
    val map = Flow[(T, Unit.type)].map { case (value, _) => value } 
    val messageExtractor = builder.add(map) 

    ticker ~> zip.in1 
    zip.out ~> messageExtractor.in 

    FlowShape.of(zip.in0, messageExtractor.out) 
}) 

// And it will be used in your flow as follows 
// .via(throttleFlow(FiniteDuration(200, MILLISECONDS))) 

Кроме того, поскольку вы ограничиваете доступ к некоторым API вы можете ограничить звонки на него в централизованно. Предположим, у вас есть несколько мест в вашем проекте, которые вызывают вызовы на тот же внешний API, но поскольку вызовы, поступающие из одного и того же IP-регулирования, должны применяться ко всем из них. Для такого случая подумайте об использовании MergeHub.source для вашего (предположительно) потока akka-http. Каждый вызывающий объект создает и выполняет новый график для совершения вызова.

+0

Проблема, с которой я сталкиваюсь, заключается в том, что это ограничит поток запросов 'Seq [Request]', но предел скорости применяется к каждому 'Request'. То есть даже если ограничение 'Seq [Request]' flow до 1 в секунду, я буду чаще запускать запросы. Я считал сплющивание потока, но его сложно собрать снова, и это не кажется правильным. –