2015-01-27 5 views
1

Я ищу SinkSource, который предоставляет Sink и Source. Если элемент втекает в этот Sink, он должен быть предоставлен в соответствующем Source. Следующий код показывает, что я имею в виду:Есть ли что-то вроде SinkSource [T]?

object SinkSource { 
    def apply[T] = new { 
    def sink: Sink[T] = ??? 
    def source: Source[T] = ??? 
    } 
} 
val flowgraph = FlowGraph { implicit fgb => 
    import FlowGraphImplicits._ 
    val sinksource = SinkSource[Int] 
    Source(1 to 5) ~> sinksource.sink 
        sinksource.source ~> Sink.foreach(print) 
} 
implicit val actorSystem = ActorSystem(name = "System") 
implicit val flowMaterializer = FlowMaterializer() 
val materializedMap = flowgraph.run() 

Если выполнено это должно напечатать: 12345
Таким образом, делает SinkSource существуют (не видел его в API) или кто-нибудь знает, как это осуществить? Следует отметить, что мне нужен отчетливый доступ к Sink и Source, так что Flow не является решением в этой конкретной форме:

Source(1 to 5) ~> Flow[Int] ~> Sink.foreach(println) 

ответ

1

Как часто идеи приходят в голове, если вопрос уже задавал: Оказался, , Мне не нужны Sink и Source, JunctionInPort и JunctionOutPort.
Так вот оно:

object SinkSource { 
    def apply[T](implicit fgb: FlowGraphBuilder) = new SinkSource[T] 
} 
class SinkSource[T](implicit fgb: FlowGraphBuilder) { 
    import FlowGraphImplicits._ 
    private val merge = Merge[T] 
    private val bcast = Broadcast[T] 
    Source.empty ~> merge 
    merge ~> bcast 
    bcast ~> Sink.ignore 
    def in: JunctionInPort[T] = merge 
    def out: JunctionOutPort[T] = bcast 
} 
val flowgraph = FlowGraph { implicit fgb => 
    import FlowGraphImplicits._ 
    val source = Source(1 to 5) 
    val sink = Sink.foreach(println) 
    val sinkSource = SinkSource[Int] 
    source ~> sinkSource.in 
      sinkSource.out ~> sink 
} 
implicit val actorSystem = ActorSystem(name = "System") 
implicit val flowMaterializer = FlowMaterializer() 
val materializedMap = flowgraph.run() 
Смежные вопросы