Из документации ... радиовещание излучает (передает) элемент каждому потребителю. баланс исходит только от первого доступного потребителя.
broadcast
Emit каждый входящий элемент каждого из п выходов.
balance
Фан-поток из нескольких потоков. Каждый элемент выше по потоку - , испускаемый первому доступному нижерасположенному потребителю.
EDIT от комментариев:
С вашей сущности, вы должны сделать две averageCarrierDelay функции, по одному для каждого Z
и F
. Затем вы можете увидеть все элементы, отправленные каждому.
val averageCarrierDelayZ =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"Z Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
val averageCarrierDelayF =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"F Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
Edit 2: Для того, чтобы проверить вещи в будущем я бы рекомендовал универсальный регистратор для этапов потока, так что вы можете увидеть, что происходит.
def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
Это позволит вам сделать что-то вроде:
D ~> logElement[FlightDelayRecord]("F received: ") ~> F
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
Таким образом, вы можете проверить области вашего графика для странного поведения, которые могут или не могут быть ожидающей.
Broadcast просто берет сообщение и отправляет его на каждый порт вывода, который у него есть. Баланс принимает сообщение и отправляет его только на один из выходов в зависимости от доступности и противодавления этого выходного порта. –
@ Брайан Пендлтон, но что, если у меня одновременно есть два нисходящих потребителя (Z <~ D, F <~ D), где D - builder.add (Баланс [T] (2))? – pacman
Либо 'Z', либо' F' получит сообщение. Не они оба. Если вы хотите, чтобы оба из них обрабатывали каждое сообщение, используйте «Broadcast». 'Balance' используется, когда' Z' и 'F' являются, вероятно, идентичными шагами обработки, но вы просто хотите разделить работу на нескольких участников. Посмотрите здесь: http://doc.akka.io/docs/akka/2.4.11/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers –