2016-06-18 2 views
1

Я хочу передать сообщения на автобус через REST и вернуть его. Но я не могу правильно настроить приемник шины сообщений, он выбрасывает java.lang.IllegalStateException: Ответ уже написан. В реальной жизни шина сообщений должна получать сообщения из разных источников и передавать сообщение другой цели. Поэтому нам просто нужно опубликовать сообщение на автобусе. Но как правильно читать сообщения и обрабатывать их все? Например, из интерфейса REST: прочитайте эти сообщения! Мой простой запуск приложения:Vert.x как передавать/получать сообщения от REST до шины сообщений?

public static void main(String[] args) { 
     Vertx vertx = Vertx.vertx(); 
     vertx.deployVerticle(new RESTVerticle()); 
     vertx.deployVerticle(new Receiver()); 
     EventBus eventBus = vertx.eventBus(); 
     eventBus.registerDefaultCodec(MessageDTO.class, new CustomMessageCodec()); 

    } 

Оставшуюся часть

public class RESTVerticle extends AbstractVerticle { 

    private EventBus eventBus = null; 

    @Override 
    public void start() throws Exception { 
     Router router = Router.router(vertx); 
     eventBus = vertx.eventBus(); 
     router.route().handler(BodyHandler.create()); 
     router.route().handler(CorsHandler.create("*") 
       .allowedMethod(HttpMethod.GET) 
       .allowedHeader("Content-Type")); 

     router.post("/api/message").handler(this::publishToEventBus); 
     // router.get("/api/messagelist").handler(this::getMessagesFromBus); 

     router.route("/*").handler(StaticHandler.create()); 
     vertx.createHttpServer().requestHandler(router::accept).listen(9999); 
     System.out.println("Service running at 0.0.0.0:9999"); 

    } 

private void publishToEventBus(RoutingContext routingContext) { 
     System.out.println("routingContext.getBodyAsString() " + routingContext.getBodyAsString()); 
     final MessageDTO message = Json.decodeValue(routingContext.getBodyAsString(), 
       MessageDTO.class); 

     HttpServerResponse response = routingContext.response(); 
     response.setStatusCode(201) 
       .putHeader("content-type", "application/json; charset=utf-8") 
       .end(Json.encodePrettily(message)); 

     eventBus.publish("messagesBus", message); 

    } 

И приемник: Я переместить его в другой класс, но это не помогает

public class Receiver extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     EventBus eventBus = vertx.eventBus(); 
     Router router = Router.router(vertx); 

     router.route().handler(BodyHandler.create()); 
     router.route().handler(CorsHandler.create("*") 
       .allowedMethod(HttpMethod.GET) 
       .allowedHeader("Content-Type")); 

     router.get("/api/messagelist").handler(this::getMessagesFromBus); 
     router.route("/*").handler(StaticHandler.create()); 

     vertx.createHttpServer().requestHandler(router::accept).listen(9998); 
     System.out.println("Service Receiver running at 0.0.0.0:9998"); 

private void getMessagesFromBus(RoutingContext routingContext) { 
     EventBus eventBus = vertx.eventBus(); 
     eventBus.consumer("messagesBus", message -> { 
      MessageDTO customMessage = (MessageDTO) message.body(); 
      HttpServerResponse response = routingContext.response(); 
      System.out.println("Receiver ->>>>>>>> " + customMessage); 
      if (customMessage != null) { 
       response.putHeader("content-type", "application/json; charset=utf-8") 
         .end(Json.encodePrettily(customMessage)); 
      } 
      response.closed(); 

     }); 
    } 

Так что, если я отправляю сообщение REST и обработчик опубликовать его на шине, когда я нахожусь во время выполнения http://localhost:9998/api/messagelist, это возврат json, но второй раз это исключение trow

java.lang.IllegalStateException: Response has already been written 
    at io.vertx.core.http.impl.HttpServerResponseImpl.checkWritten(HttpServerResponseImpl.java:561) 
    at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:154) 
    at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:52) 
    at com.project.backend.Receiver.lambda$getMessagesFromBus$0(Receiver.java:55) 
    at io.vertx.core.eventbus.impl.HandlerRegistration.handleMessage(HandlerRegistration.java:207) 
    at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:201) 
    at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$127(EventBusImpl.java:498) 
    at io.vertx.core.impl.ContextImpl.lambda$wrapTask$18(ContextImpl.java:335) 
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) 
    at java.lang.Thread.run(Thread.java:745) 

Receiver ->>>>>>>> Message{username=Aaaewfewf2d, message=41414wefwef2d2} 

Как правильно получить все сообщения с ресивера? Или, если автобус получил сообщения, я должен немедленно сохранить их в db? Может ли шина сообщений сохранять сообщения и не потерять их?

Благодаря

ответ

2

Каждое попадание в точку входа «/ api/messagelist» создает одного нового потребителя с контекстом маршрутизации запроса.

Первый запрос создаст потребителя и ответит на запрос. Когда второе сообщение было опубликовано, этот потребитель получит сообщение и ответит на предыдущий запрос (экземпляр), и это было закрыто.

Я думаю, что вы неправильно поняли назначение автобуса, и я действительно рекомендую вам ознакомиться с документацией. http://vertx.io/docs/vertx-core/java/#event_bus

1

Я не имел возможности проверить свой код, но он, кажется, что операция публиковать бросает исключение и VertX будет пытаться отправить обратно сообщение об ошибке. Однако вы уже ответили и закончили соединение.

Теперь ошибка может быть из вашего кодека, но из-за асинхронного характера vertx вы видите его только на более позднем этапе и искажаете внутренний обработчик ошибок.

Смежные вопросы