2016-08-12 2 views
1

У меня есть приложение, которое получает TCP сокета соединения, которое будет посылать данные в форме:обработки Akka потока с заголовком Одноразового

n{json}bbbbbbbbbb...

где n длина следующего json в байты, а json может быть чем-то вроде {'splitEvery': 5}, что будет определять, как я разбиваю и обрабатываю потенциально бесконечную строку байтов.

Я хочу обработать этот поток с помощью Akka в Scala. I думаю,streams - правильный инструмент для этого, но мне трудно найти пример, который использует потоки с различными этапами обработки. Большинство потоков потоков, похоже, повторяются снова и снова, например, prefixAndTail пример here. Это очень близко к тому, как я хочу обработать часть моего потока, но разница в том, что мне нужно только это сделать после за соединение, а затем перейти на другую «стадию» обработки.

Может ли кто-нибудь указать мне пример использования потоков Akka с различными этапами?

ответ

2

Вот GraphStage, который обрабатывает поток ByteString с:

  • Экстракт размер порции из заголовка
  • Эмит ByteString х размеров указанных порций
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} 
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} 
import akka.util.ByteString 

class PreProcessor extends GraphStage[FlowShape[ByteString, ByteString]] { 

    val in: Inlet[ByteString] = Inlet("ParseHeader.in") 
    val out: Outlet[ByteString] = Outlet("ParseHeader.out") 

    override val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 

     var buffer = ByteString.empty 
     var chunkSize: Option[Int] = None 
     private var upstreamFinished = false 

     private val headerPattern = """^\d+\{"splitEvery": (\d+)\}""".r 

     /** 
     * @param data The data to parse. 
     * @return The chunk size and header size if the header 
     * could be parsed. 
     */ 
     def parseHeader(data: ByteString): Option[(Int, Int)] = 
     headerPattern. 
     findFirstMatchIn(data.decodeString("UTF-8")). 
     map { mtch => (mtch.group(1).toInt, mtch.end) } 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      if (isClosed(in)) emit() 
      else pull(in) 
     } 
     }) 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val elem = grab(in) 
      buffer ++= elem 
      if (chunkSize.isEmpty) { 
      parseHeader(buffer) foreach { case (chunk, headerSize) => 
       chunkSize = Some(chunk) 
       buffer = buffer.drop(headerSize) 
      } 
      } 
      emit() 
     } 

     override def onUpstreamFinish(): Unit = { 
      upstreamFinished = true 
      if (chunkSize.isEmpty || buffer.isEmpty) completeStage() 
      else { 
      if (isAvailable(out)) emit() 
      } 
     } 
     }) 

     private def continue(): Unit = 
     if (isClosed(in)) completeStage() 
     else pull(in) 

     private def emit(): Unit = { 
     chunkSize match { 
      case None => continue() 
      case Some(size) => 
      if (upstreamFinished && buffer.isEmpty || 
       !upstreamFinished && buffer.size < size) { 
       continue() 
      } else { 
       val (chunk, nextBuffer) = buffer.splitAt(size) 
       buffer = nextBuffer 
       push(out, chunk) 
      } 
     } 
     } 
    } 
} 

И тестового примера, чтобы проиллюстрировать использование:

import akka.actor.ActorSystem 
import akka.stream._ 
import akka.stream.scaladsl.Source 
import akka.util.ByteString 
import org.scalatest._ 

import scala.concurrent.Await 
import scala.concurrent.duration._ 
import scala.util.Random 

class PreProcessorSpec extends FlatSpec { 

    implicit val system = ActorSystem("Test") 
    implicit val materializer = ActorMaterializer() 

    val random = new Random 

    "" should "" in { 

    def splitRandom(s: String, n: Int): List[String] = s match { 
     case "" => Nil 
     case s => 
     val (head, tail) = s splitAt random.nextInt(n) 
     head :: splitRandom(tail, n) 
    } 

    val input = """17{"splitEvery": 5}aaaaabbbbbcccccddd""" 

    val strings = splitRandom(input, 7) 
    println(strings.map(s => s"[$s]").mkString(" ") + "\n") 

    val future = Source.fromIterator(() => strings.iterator). 
     map(ByteString(_)). 
     via(new PreProcessor()). 
     map(_.decodeString("UTF-8")). 
     runForeach(println) 

    Await.result(future, 5 seconds) 
    } 

} 

Пример вывода:

[17{"] [splitE] [very"] [] [: 5}] [aaaaa] [bbb] [bbcccc] [] [cddd] 

aaaaa 
bbbbb 
ccccc 
ddd 
+0

Мне это нравится, очень чисто и ясно. Мне здесь много, чтобы пожевать ... отличный пример обучения. Благодаря! – djanderson

1

Поскольку размер куска зависит от содержимого потока, но все этапы обработки должны быть реализованы до обработки данных потока, вы не можете легко использовать удобный метод, например Source.group(chunkSize). Я бы предложил разделить метаданные с начала потока (используя другой подход, чем потоки Akka), и передать оставшуюся часть потока в Source.group(chunkSize).

В качестве альтернативы вы можете свернуть/сканировать поток, используя государственную машину, но это гораздо более громоздким:

implicit val system = ActorSystem("Test") 
implicit val materializer = ActorMaterializer() 

val input = """17{"splitEvery": 5}aaaaabbbbbccccc""" 

def getChunkSize(json: String) = 5 // dummy implementation 

sealed trait State 
case class GetLength(number: String) extends State 
case class GetJson(n: Int, json: String) extends State 
case class ProcessData(chunkSize: Int, s: String) extends State 

type Out = (State, Option[String]) 

val future = Source.fromIterator(() => input.iterator). 
    scan[Out]((GetLength(""), None)) { 
    case ((GetLength(s), _), e) if e.isDigit => (GetLength(s + e), None) 
    case ((GetLength(s), _), e) => (GetJson(s.toInt - 1, e.toString), None) 
    case ((GetJson(0, json), _), e) => (ProcessData(getChunkSize(json), e.toString), None) 
    case ((GetJson(n, json), _), e) => (GetJson(n - 1, json + e), None) 
    case ((ProcessData(chunkSize, s), _), e) if s.length == chunkSize - 1 => (ProcessData(chunkSize, ""), Some(s + e)) 
    case ((ProcessData(chunkSize, s), _), e) => (ProcessData(chunkSize, s + e), None) 
    }. 
    collect { case (_, Some(s)) => s }. 
    runForeach(println) 

println(Await.result(future, 1 second)) 

// aaaaa 
// bbbbb 
// ccccc 

Для записи, вот такой подход, который не будет работать, так как takeWhile потребляет следующий элемент итератора (когда _.isDigit терпит неудачу), который еще необходимо для последующего разбора JSON стадии:

val it = input.iterator 
def nextSource = Source.fromIterator(() => it) 

implicit class Stringify[+Out, +Mat](val source: Source[Out, Mat]) { 
    def stringify = source.runFold("")(_ + _) 
} 

val future2 = nextSource. 
    takeWhile(_.isDigit). 
    stringify. 
    map(_.toInt). 
    map { l => 
    nextSource. 
     take(l). 
     stringify. 
     map(getChunkSize). 
     map { chunkSize => 
     nextSource. 
      grouped(chunkSize). 
      map(_.mkString). 
      runForeach(println) 
     } 
    } 

println(Await.result(future2, 1 second)) 

// aaaab 
// bbbbc 
// cccc 
+0

Спасибо! Вы предложили разбить первые биты без использования «потоков», а затем передать остальные в «Источник». Я ссылаюсь на часть 'server' этого примера TcpEcho для потоковой передачи TCP (http://www.lightbend.com/activator/template/akka-stream-scala#code/src/main/scala/sample/stream/TcpEcho .scala), и этот пример для TCP с Актерами (http://doc.akka.io/docs/akka/2.4.9-RC2/scala/io-tcp.html # io-scala-tcp), но я все еще не уверен, как, скажем, начать с получения Актера, а затем передать открытое соединение с Источником. – djanderson

+0

Другими словами, без звучания неблагодарного (потому что это уже очень полезно!) Вы дали один пример «громоздкого» метода и один пример метода, который не будет работать. Не могли бы вы добавить пример вашего предложенного метода? – djanderson

+1

Спасибо за дополнительную информацию; когда вы реализуете TCP-сервер, невозможно извлечь и вырезать заголовок без буферизации. Я добавил еще один ответ с реализацией «GraphStage», который, вероятно, является самым «акка-иш». – devkat

Смежные вопросы