2015-06-05 2 views
2

Есть ли какой-либо способ в Akka добиться пакетного кадра, как в Erlang с {пакетом, 4}? Packet выглядит примерно так:Scala - Пакет TCP-пакетов с использованием Akka

4 bytes length in big endian | body... 

Например:

00 00 00 05 H E L L O 0 0 0 5 W O R L D 

было бы два пакета "HELLO" и "Мир", но они получили в качестве одного.

Or

00 00 00 05 H E L L 

Теперь Akka получает эти 8 байт, но еще один до сих пор отсутствует, и он будет принят в следующем вызове «получить»

Проблема заключается в том, что мой актер Получим всегда называется с частичным или полным запросом, но я бы хотел получить только часть «тела» при приеме и только когда он будет полностью получен.

Таким образом, все, что было бы необходимо, это то, что он сначала считывает эти 4 байта, а затем ждет, пока он не прочитает другие N байтов (N = длина в заголовке 4 байта), и ТОГДА он отправит сообщение моему актеру. Возможно ли это как-то?

Мой код сервера:

import java.net.InetSocketAddress 

import akka.actor.{Props, Actor} 
import akka.io.Tcp.Bind 
import akka.io.{IO, Tcp} 

class Server extends Actor{ 
    import context.system 
    import Tcp._ 
    IO(Tcp) ! Bind(self, new InetSocketAddress("0.0.0.0", 1234)) 

    def receive ={ 
     case bound @ Bound(localAddr) => 
      println("Server is bound to "+localAddr.toString()) 
     case failed @ CommandFailed(_ : Bind) => 
      context stop self 
     case connected @ Connected(remote, local) => 
      val handler = context.actorOf(Props[ClientHandler]) 
      val connection = sender() 
      println(remote.toString + "connected to "+local.toString()) 

      connection ! Register(handler) 
    } 
} 

ответ

3

Насколько мне известно, нет функции библиотеки для этого в Акку или Scala. Akka торгует ByteString для чтения и письма, поэтому я собрал черту, которая будет делать именно то, что вы запрашиваете. Вы передаете ему ByteString как отправленный вашему актеру. Затем он разбивает поток вверх по длинам пакетов в заголовках. Он не имеет статуса, поэтому возвращает кортеж, содержащий список извлеченных пакетов и любых неиспользуемых данных из потока TCP в виде ByteString. Вы объединяете новые данные TCP в неиспользуемую часть потока, возвращаемого в этой байтовой строке, как показано в примере ниже.

trait Buffering { 

    val MAX_PACKET_LEN: Short = 10000 

    /** 
    * Extracts complete packets of the specified length, preserving remainder 
    * data. If there is no complete packet, then we return an empty list. If 
    * there are multiple packets available, all packets are extracted, Any remaining data 
    * is returned to the caller for later submission 
    * @param data A list of the packets extracted from the raw data in order of receipt 
    * @return A list of ByteStrings containing extracted packets as well as any remaining buffer data not consumed 
    */ 
    def getPacket(data: ByteString): (List[ByteString], ByteString) = { 

    val headerSize = 2 

    @tailrec 
    def multiPacket(packets: List[ByteString], current: ByteString): (List[ByteString], ByteString) = { 
     if (current.length < headerSize) { 
     (packets.reverse, current) 
     } else { 
     val len = current.iterator.getShort 
     if (len > MAX_PACKET_LEN || len < 0) throw new RuntimeException(s"Invalid packet length: $len") 
     if (current.length < len + headerSize) { 
      (packets.reverse, current) 
     } else { 
      val rem = current drop headerSize // Pop off header 
      val (front, back) = rem.splitAt(len) // Front contains a completed packet, back contains the remaining data 
      // Pull of the packet and recurse to see if there is another packet available 
      multiPacket(front :: packets, back) 
     } 
     } 
    } 
    multiPacket(List[ByteString](), data) 
    } 

Использование от актера как folllows:

def receive = buffer(CompactByteString()) 

def buffer(buf: ByteString): Receive = { 
    // Messages inbound from the network 
    case Received(data) => 
    val (pkt, remainder) = getPacket(buf ++ data) 
    // Do something with your packet 
    context become buffer(remainder) 
    case Other Stuff => // Etc 
} 
Смежные вопросы