2017-02-08 4 views
0

Я стараюсь лучше понять концепцию Akka Streams на следующем примере. Рассмотрим банковский счет. Он имеет историю прошлых транзакций, и появятся новые транзакции. Теперь мы хотим использовать его в качестве источника потока Akka. Но его данные будут использоваться в трех различных сценариях:Передача прошлых и будущих данных с использованием потоков Akka

  1. Потребительское приложение собирает все прошлые транзакции и печатает отчет.
  2. Потребительское приложение - это монитор транзакций, который печатает всю новую транзакцию, начиная с момента запуска приложения.
  3. Потребительское приложение объединяет функции (1) и (2): оно сначала печатает все прошлые транзакции и затем печатает все поступающие транзакции.

Что мы имеем здесь в отношении потоков Акка? Является ли разница в источниках потока, которые питают в противном случае те же потоки и приемники с разными данными? Или источник тот же (все транзакции с одного и того же банковского счета), но нам нужно применять различные операции фильтрации для получения разных результатов?

ответ

2

Источник Akka Источники могут быть объединены как любые другие Iterable, которые существуют в пределах scala.

На основе вашего примера, скажем, у нас есть исторические транзакции, которые сохраняются в базе данных. Мы могли бы использовать что-то вроде slick streaming, чтобы получить эти транзакции из БД:

val historicSource : Source[Transaction, _] = ??? 

Там также будет в реальном времени операции (возможно, поступающие от системы обмена сообщениями):

val realtimeSource : Source[Transaction, _] = ??? 

Эти два источника могут быть объединены:

val combinedSource = historicSource ++ realtimeSource 

Эти комбинированные события затем могут использоваться одной логикой обработки потока; например, вы можете: println любую транзакцию свыше 1000,00 долларов США:

val isLargeTransaction = (_ : Transaction).dollarAmount > 1000.0 

val reportTransaction = (transaction : Transaction) => 
    println s"Large Transaction: $transaction" 

combinedSource.filter(isLargeTransaction) 
       .runWith(Sink foreach reportTransaction) 
+0

Большое спасибо за отличное объяснение. –

+0

@VagifAbilov Добро пожаловать. Счастливый взлом. –

Смежные вопросы