2016-10-24 3 views
2

У меня есть немного путаницы с веером стратегий в Akka streams, я прочитал, что Broadcast - (1 вход, N выходов), данные входной элемент испускает к каждому выходу, а Balance - (1 вход, N выходов), если входной элемент испускает один из его выходных портов.разница между весами и Broadcast веером в Akka Streams

Можете ли вы объяснить мне:

  1. Как баланс работает с несколькими потребителями?
  2. Значение фразы "излучает на один из ее выходных портов"
  3. Порт аналогичен нисходящему потоку?
  4. Подходит ли «Баланс» для репликации входного потока в несколько разделов вывода.
  5. Что означает «баланс позволяет графам разделить друг от друга, а несколько экземпляров последующих абонентов, реплицированных для обработки тома» означает?
+1

Broadcast просто берет сообщение и отправляет его на каждый порт вывода, который у него есть. Баланс принимает сообщение и отправляет его только на один из выходов в зависимости от доступности и противодавления этого выходного порта. –

+0

@ Брайан Пендлтон, но что, если у меня одновременно есть два нисходящих потребителя (Z <~ D, F <~ D), где D - builder.add (Баланс [T] (2))? – pacman

+0

Либо 'Z', либо' F' получит сообщение. Не они оба. Если вы хотите, чтобы оба из них обрабатывали каждое сообщение, используйте «Broadcast». 'Balance' используется, когда' Z' и 'F' являются, вероятно, идентичными шагами обработки, но вы просто хотите разделить работу на нескольких участников. Посмотрите здесь: http://doc.akka.io/docs/akka/2.4.11/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers –

ответ

3

Из документации ... радиовещание излучает (передает) элемент каждому потребителю. баланс исходит только от первого доступного потребителя.

broadcast

Emit каждый входящий элемент каждого из п выходов.

balance

Фан-поток из нескольких потоков. Каждый элемент выше по потоку - , испускаемый первому доступному нижерасположенному потребителю.

EDIT от комментариев:

С вашей сущности, вы должны сделать две averageCarrierDelay функции, по одному для каждого Z и F. Затем вы можете увидеть все элементы, отправленные каждому.

val averageCarrierDelayZ = 
    Flow[FlightDelayRecord] 
     .groupBy(30, _.uniqueCarrier) 
     .fold(("", 0, 0)){ 
      (x: (String, Int, Int), y:FlightDelayRecord) => { 
      println(s"Z Received Element: ${y}") 
      val count = x._2 + 1 
      val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0) 
      (y.uniqueCarrier, count, totalMins) 
      } 
     }.mergeSubstreams 


val averageCarrierDelayF = 
    Flow[FlightDelayRecord] 
     .groupBy(30, _.uniqueCarrier) 
     .fold(("", 0, 0)){ 
      (x: (String, Int, Int), y:FlightDelayRecord) => { 
      println(s"F Received Element: ${y}") 
      val count = x._2 + 1 
      val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0) 
      (y.uniqueCarrier, count, totalMins) 
      } 
     }.mergeSubstreams 

Edit 2: Для того, чтобы проверить вещи в будущем я бы рекомендовал универсальный регистратор для этапов потока, так что вы можете увидеть, что происходит.

def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a } 

Это позволит вам сделать что-то вроде:

D ~> logElement[FlightDelayRecord]("F received: ") ~> F 
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z 

Таким образом, вы можете проверить области вашего графика для странного поведения, которые могут или не могут быть ожидающей.

+0

У меня есть такой граф (Z <~ D, F <~ D), где D является builder.add (Balance [T] (2)), в этой ситуации D посылает значения как Z, так и F. Согласно вашему ответу, D должен отправлять только одному из них, или мое мышление ошибочно? – pacman

+0

Не могли бы вы разместить весь файл кода? Если то, что вы говорите, верно, тогда у Акки есть ошибка. Баланс должен отправлять только первому доступному потребителю. –

+0

Это https://gist.github.com/anonymous/4515afdd9c40709b23a19e55beb76b3e весь мой файл кода, код был изменен из учебника. В этом коде val D отправьте «Баланс» на Z и F, и оба из них получат данные и результат печати для консоли с diff-формированием. Возможно, мое понимание ситуации неверно, в этом случае, пожалуйста, уточните мне мою ошибку. – pacman

1

Как и другие уже говорил, широковещательный испускает свой вклад в всех выходных портов, в то время как баланс испускает свой входной сигнал на один выходного порта на основе противодавления.

Когда вы используете GraphStage, вам нужно выбрать, какой порт вывода вы хотите использовать.Рассмотрим этот пример:

val q1 = Source.queue[Int](10, OverflowStrategy.fail) 
val q2 = Source.queue[Int](10, OverflowStrategy.fail) 
GraphDSL.create(q1, q2)(Keep.both) { implicit b => (input1, input2) => 
    import GraphDSL.Implicits._ 

    val broadcast = b.add(Broadcast[Int](2)) 
    val balance = b.add(Balance[Int](2)) 

    val consumer1, consumer2, consumer3, consumer4 = b.add(Sink.foreach[Int](println)) 

    input1 ~> broadcast.in 
    input2 ~> balance.in 

    broadcast.out(0) ~> consumer1 
    broadcast.out(1) ~> consumer2 

    balance.out(0) ~> consumer3 
    balance.out(1) ~> consumer4 

    ClosedShape 
} 

Здесь мы подключаем один вход к этапу вещания и один к этапу баланса. Затем мы подключаем разные выходные порты вещательной и балансовой ступеней к соответствующим потребителям.

В этом конкретном случае, когда вы запускаете поток, элементы, проходящие через первый вход, будут переданы как consumer1, так и этап consumer2, поскольку этап трансляции копирует свой вход на все его выходы (и здесь два выхода), и элементы, проходящие через второй вход, будут равномерно распределены по consumer3 и consumer4 на основе скорости вашего терминала (т.е. скорость println), так как Sink.foreach противодавления, когда его функция выполняется в течение длительного времени.

Обратите внимание на то, что мы указали, что этапы вещания и баланса имеют по 2 порта каждый (при вызове их заводских методов), и что мы указали, какой выходной порт мы подключаем к какому потребителю.

Смежные вопросы