Я пытаюсь интегрировать поток потоков akka в моем приложении Play 2.5. Идея состоит в том, что вы можете транслировать фотографию, а затем записывать ее на диск в виде необработанного файла, миниатюрной версии и версии с водяными знаками.Как собрать раковину Akka Streams из нескольких файлов?
мне удалось получить эту работу, используя график что-то вроде этого:
val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
.map(_.result().toArray)
def toByteArray = Flow[ByteString].map(b => b.toArray)
val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
import GraphDSL.Implicits._
val streamFan = builder.add(Broadcast[ByteString](3))
val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
val output = builder.add(Flow[ByteString].map(x => Success(Done)))
val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
streamFan.out(0) ~> rawFileSink
streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
streamFan.out(2) ~> output.in
byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink
FlowShape(streamFan.in, output.out)
})
graph
}
Тогда я проволочный его в свой контроллер воспроизведения, используя аккумулятор, как это:
val sink = Sink.head[Try[Done]]
val photoStorageParser = BodyParser { req =>
Accumulator(sink).through(graph).map(Right.apply)
}
Проблема в том, что мои два обработанных файла не были завершены, и я получаю нулевые размеры для обоих обработанных файлов, но не для исходного. Моя теория заключается в том, что аккумулятор работает только на одном из выходов моего вентилятора, поэтому, когда поток ввода завершается, и мой байтовый счетчик выплескивает полный файл, к тому времени, когда обработка закончена, игра получила материализованное значение от выхода ,
Итак, мои вопросы:
Я нахожусь на правильном пути с этим, насколько мой подход идет? Каково ожидаемое поведение для запуска графика? Как я могу собрать все мои раковины вместе, чтобы сформировать одну последнюю раковину?
Я также считаю, что причина в том, что потоки не сливаются после обработки. Вы пробовали «Sink.combine» (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat
Да, я дал Sink.combine идти, но это унифицирует несколько приемников, чтобы отправить _to_ как фанат. Я думаю, что я ищу поклонника, но, похоже, вы не можете сделать это с раковинами только из источников! – Tompey
Это похоже на аналогичный пример: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Может быть, вам нужно вернуть 'SinkShape' вместо' FlowShape', чтобы объявить, что ваш поток закончен? – devkat