2016-05-10 2 views
8

Я пытаюсь интегрировать поток потоков akka в моем приложении Play 2.5. Идея состоит в том, что вы можете транслировать фотографию, а затем записывать ее на диск в виде необработанного файла, миниатюрной версии и версии с водяными знаками.Как собрать раковину Akka Streams из нескольких файлов?

мне удалось получить эту работу, используя график что-то вроде этого:

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray}) 
            .map(_.result().toArray) 

def toByteArray = Flow[ByteString].map(b => b.toArray) 

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder => 
    import GraphDSL.Implicits._ 
    val streamFan = builder.add(Broadcast[ByteString](3)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 
    val output = builder.add(Flow[ByteString].map(x => Success(Done))) 

    val rawFileSink = FileIO.toFile(file) 
    val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
    val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

    streamFan.out(0) ~> rawFileSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 
    streamFan.out(2) ~> output.in 

    byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink 
    byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink 

    FlowShape(streamFan.in, output.out) 
}) 

graph 

}

Тогда я проволочный его в свой контроллер воспроизведения, используя аккумулятор, как это:

val sink = Sink.head[Try[Done]] 

val photoStorageParser = BodyParser { req => 
    Accumulator(sink).through(graph).map(Right.apply) 
} 

Проблема в том, что мои два обработанных файла не были завершены, и я получаю нулевые размеры для обоих обработанных файлов, но не для исходного. Моя теория заключается в том, что аккумулятор работает только на одном из выходов моего вентилятора, поэтому, когда поток ввода завершается, и мой байтовый счетчик выплескивает полный файл, к тому времени, когда обработка закончена, игра получила материализованное значение от выхода ,

Итак, мои вопросы:
Я нахожусь на правильном пути с этим, насколько мой подход идет? Каково ожидаемое поведение для запуска графика? Как я могу собрать все мои раковины вместе, чтобы сформировать одну последнюю раковину?

+0

Я также считаю, что причина в том, что потоки не сливаются после обработки. Вы пробовали «Sink.combine» (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat

+0

Да, я дал Sink.combine идти, но это унифицирует несколько приемников, чтобы отправить _to_ как фанат. Я думаю, что я ищу поклонника, но, похоже, вы не можете сделать это с раковинами только из источников! – Tompey

+0

Это похоже на аналогичный пример: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Может быть, вам нужно вернуть 'SinkShape' вместо' FlowShape', чтобы объявить, что ваш поток закончен? – devkat

ответ

7

Ok, после небольшой помощи (Andreas был на правильном пути), я пришел к такому решению, которое делает трюк:

val rawFileSink = FileIO.toFile(file) 
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) { 
    implicit builder => (rawSink, thumbSink, waterSink) => { 
    val streamFan = builder.add(Broadcast[ByteString](2)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 

    streamFan.out(0) ~> rawSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink 
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink 

    SinkShape(streamFan.in) 
    } 
}) 

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done))) 

После чего он умер каждый назвать это из игры:

val photoStorageParser = BodyParser { req => 
    Accumulator(theSink).map(Right.apply) 
} 

def createImage(path: String) = Action(photoStorageParser) { req => 
    Created 
} 
+0

спасибо человеку, у меня была аналогичная задача, и я не мог понять, как ждать всех материализованных фьючерсов. Ваше решение очень помогло, и оно работает! –

+0

Привет! Как насчет переменного количества приемника для комбинации? – Alexander

Смежные вопросы