Поскольку размер куска зависит от содержимого потока, но все этапы обработки должны быть реализованы до обработки данных потока, вы не можете легко использовать удобный метод, например Source.group(chunkSize)
. Я бы предложил разделить метаданные с начала потока (используя другой подход, чем потоки Akka), и передать оставшуюся часть потока в Source.group(chunkSize)
.
В качестве альтернативы вы можете свернуть/сканировать поток, используя государственную машину, но это гораздо более громоздким:
implicit val system = ActorSystem("Test")
implicit val materializer = ActorMaterializer()
val input = """17{"splitEvery": 5}aaaaabbbbbccccc"""
def getChunkSize(json: String) = 5 // dummy implementation
sealed trait State
case class GetLength(number: String) extends State
case class GetJson(n: Int, json: String) extends State
case class ProcessData(chunkSize: Int, s: String) extends State
type Out = (State, Option[String])
val future = Source.fromIterator(() => input.iterator).
scan[Out]((GetLength(""), None)) {
case ((GetLength(s), _), e) if e.isDigit => (GetLength(s + e), None)
case ((GetLength(s), _), e) => (GetJson(s.toInt - 1, e.toString), None)
case ((GetJson(0, json), _), e) => (ProcessData(getChunkSize(json), e.toString), None)
case ((GetJson(n, json), _), e) => (GetJson(n - 1, json + e), None)
case ((ProcessData(chunkSize, s), _), e) if s.length == chunkSize - 1 => (ProcessData(chunkSize, ""), Some(s + e))
case ((ProcessData(chunkSize, s), _), e) => (ProcessData(chunkSize, s + e), None)
}.
collect { case (_, Some(s)) => s }.
runForeach(println)
println(Await.result(future, 1 second))
// aaaaa
// bbbbb
// ccccc
Для записи, вот такой подход, который не будет работать, так как takeWhile
потребляет следующий элемент итератора (когда _.isDigit
терпит неудачу), который еще необходимо для последующего разбора JSON стадии:
val it = input.iterator
def nextSource = Source.fromIterator(() => it)
implicit class Stringify[+Out, +Mat](val source: Source[Out, Mat]) {
def stringify = source.runFold("")(_ + _)
}
val future2 = nextSource.
takeWhile(_.isDigit).
stringify.
map(_.toInt).
map { l =>
nextSource.
take(l).
stringify.
map(getChunkSize).
map { chunkSize =>
nextSource.
grouped(chunkSize).
map(_.mkString).
runForeach(println)
}
}
println(Await.result(future2, 1 second))
// aaaab
// bbbbc
// cccc
Мне это нравится, очень чисто и ясно. Мне здесь много, чтобы пожевать ... отличный пример обучения. Благодаря! – djanderson