2016-09-04 3 views
3

У меня есть случай, когда sink (или промежуточный flow) может фактически производить некоторые побочные эффекты, которые необходимо оттолкнуть (или добавить) в Source. Есть ли способ сделать это с использованием потокового DSL? Я мог бы использовать некоторую блокирующую очередь или ее сортировку для создания source, а затем передавать данные непосредственно в эту очередь, однако это то, что нарушает абстракции потоков. Возможно, есть лучшее решение, о котором я не знаю?Акка поток - подключить раковину к источнику?

+1

Если «слив» производит выход, то это не раковина, а скорее поток. –

+0

@ViktorKlang в порядке, поэтому я могу связать «поток» с его «источником» условно, поэтому при определенных обстоятельствах событие, испущенное этим «потоком», пройдет через корень Графа, поскольку оно испускается графом 'Source'? – jdevelop

+1

Да, вы используете GraphDSL и включаете круговые графы. Имейте в виду, что круговые графы с пониженным давлением требуют немного глубокого мышления, чтобы получить право. –

ответ

2

Как сказал Виктор, вы можете использовать круговые графы.

Например, этап partition позволяет вам выбрать определенные элементы вашего потока.

def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1 

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 
    val in = Source(1 to 10) 
    val out = Sink.foreach[Int](println) 

    val addOne = Flow[Int].map(_ + 1) 

    val partition = builder.add(Partition[Int](2, partitionFunction)) 
    val merge = builder.add(Merge[Int](2)) 

          in ~> merge ~> partition 
    partition.out(0) ~> addOne ~> merge 
    partition.out(1) ~> out 

    ClosedShape 
    }) 

В этом примере источник in подключен к одному входу merge. Затем целые числа проходят через этап partition, который будет разделяться четным и нечетным.

Даже проходят через поток addOne, а затем на второй вход merge (который вернет их обратно на этап partition).

Нечетные непосредственно идут к раковине out.

Это позволяет вернуть некоторые значения обратно в график, но это может легко привести к циклу (поэтому здесь важна стадия addOne, без нее четные числа были бы захвачены на графике).

1

Reactive-kafka сделал что-то вроде этого (в версии 0.8 по крайней мере): он передает сообщения, потребляемые Раковиной обратно в источник (потребитель Кафки).

KafkaCommitterSink - это реализация. На самом деле это не круговой график, он скорее «обновляет» источник независимо от потока, насколько я понимаю.

Смежные вопросы