2016-04-03 3 views
2

У меня есть кусок кода (см. Ниже), который порождает сервер, который перекликается с каждым потоком ByteString, который он получает от порта 6001. В этом примере также определяется клиент, который подключается к серверу и отправляет поток ByteString, содержащий список символов от буквы 'a' до 'z'.Akka поток объектов по http:

Мой вопрос на данный момент, . Akka предлагает способ отправки и получения потока объектов вместо ByStreams через http? Например, объекты класса Client.

Если да, то как я мог отправлять и получать такой поток объектов? Не могли бы вы предоставить мне фрагмент, который показывает, как его выполнять?

Akka документация не удобно для примера, не игрушка ...

Спасибо за вашу помощь

общественного класса TcpEcho {

/** 
* Use without parameters to start both client and server. 
* 
* Use parameters `server 0.0.0.0 6001` to start server listening on port 
* 6001. 
* 
* Use parameters `client 127.0.0.1 6001` to start client connecting to 
* server on 127.0.0.1:6001. 
* 
*/ 
public static void main(String[] args) throws IOException { 
    if (args.length == 0) { 
     ActorSystem system = ActorSystem.create("ClientAndServer"); 
     InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000); 
     server(system, serverAddress); 
     client(system, serverAddress); 
    } else { 
     InetSocketAddress serverAddress; 
     if (args.length == 3) { 
      serverAddress = new InetSocketAddress(args[1], Integer.valueOf(args[2])); 
     } else { 
      serverAddress = new InetSocketAddress("127.0.0.1", 6000); 
     } 
     if (args[0].equals("server")) { 
      ActorSystem system = ActorSystem.create("Server"); 
      server(system, serverAddress); 
     } else if (args[0].equals("client")) { 
      ActorSystem system = ActorSystem.create("Client"); 
      client(system, serverAddress); 
     } 
    } 
} 

public static void server(ActorSystem system, InetSocketAddress serverAddress) { 
    final ActorMaterializer materializer = ActorMaterializer.create(system); 

    final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> { 
     System.out.println("Client connected from: " + conn.remoteAddress()); 
     conn.handleWith(Flow.<ByteString> create(), materializer); 
    }); 

    final CompletionStage<ServerBinding> bindingFuture = Tcp.get(system) 
      .bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(materializer); 

    bindingFuture.whenComplete((binding, throwable) -> { 
     System.out.println("Server started, listening on: " + binding.localAddress()); 
    }); 

    bindingFuture.exceptionally(e -> { 
     System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage()); 
     system.terminate(); 
     return null; 
    }); 

} 

public static void client(ActorSystem system, InetSocketAddress serverAddress) { 
    final ActorMaterializer materializer = ActorMaterializer.create(system); 

    final List<ByteString> testInput = new ArrayList<>(); 
    for (char c = 'a'; c <= 'z'; c++) { 
     testInput.add(ByteString.fromString(String.valueOf(c))); 
    } 

    Source<ByteString, NotUsed> responseStream = Source.from(testInput) 
      .via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort())); 

    CompletionStage<ByteString> result = responseStream.runFold(ByteString.empty(), (acc, in) -> acc.concat(in), 
      materializer); 

    result.whenComplete((success, failure) -> { 

     if (failure != null) { 
      System.err.println("Failure: " + failure.getMessage()); 
     } else { 
      System.out.println("Result: " + success.utf8String()); 
     } 
     System.out.println("Shutting down client"); 
     system.terminate(); 

    }); 
} 

}

+0

Вы видели [этот пример] (http://doc.akka.io/docs/akka/2.4.3/java/stream/stream-graphs.html#Bidirectional_Flows) о том, как создать поток Bidi, я думаю он делает больше или меньше, о чем вы просите? – lpiepiora

+0

Я не посмотрел, но буду. В любом случае, есть ли у вас лучшая идея/предложение в виде фрагмента кода? благодаря – broga

ответ

2

akka.stream.{javadsl,scaladsl}.Framing содержит утилиты чтобы помочь вам создавать согласованные сообщения. Например, вы можете отправлять свои сообщения через Framing.simpleFramingProtocolEncoder(maxLength), чтобы автоматически добавлять к ним информацию о длине. С другой стороны, Framing.simpleFramingProtocolDecoder(maxLength) позаботится об декодировании сообщения в соответствии с его вложенной информацией о длине.

Если вы хотите манипулировать обычными объектами, вам просто нужно их сериализовать в ByteString перед отправкой их через кодировщик и десериализовать их из ByteString после получения их представления от декодера.

Смежные вопросы