2016-10-22 2 views
0

Я пытаюсь использовать Vertx для реализации TCP-сервера, принимая входящие соединения, а затем обрабатывая разные сокеты. Поскольку каждый сокет можно обрабатывать независимо, обработчики, принадлежащие разным сокетам, должны выполняться в разных потоках цикла событий одновременно.Как отправлять входящие обработчики NetSocket в разные потоки цикла событий?

Согласно Vert.x document,

Стандартным verticles назначаются в потоке событий цикла, когда они созданы, и способ запуска вызываются с этим циклом событий. Когда вы вызываете любые другие методы, которые берут обработчик в базовом API из цикла событий, Vert.x гарантирует, что эти обработчики при вызове будут выполняться в одном цикле событий.

Я думаю, этот фрагмент кода может напечатать различные имена потоков:

Vertx vertx = Vertx.vertx(); // The number of event loop threads is 2*core. 
vertx.createNetServer().connectHandler(socket -> { 
    vertx.deployVerticle(new AbstractVerticle() { 
     @Override 
     public void start() throws Exception { 
      socket.handler(buffer -> { 
       log.trace(socket.toString() + ": Socket Message"); 
       socket.close(); 
      }); 
     } 
    }); 
}).listen(port); 

Но, к сожалению, все обработчики были расположены в одной и той же теме.

23:59:42.359 [vert.x-eventloop-thread-1] TRACE Server - [email protected]: Socket Message 
23:59:42.364 [vert.x-eventloop-thread-1] TRACE Server - [email protected]: Socket Message 
23:59:42.365 [vert.x-eventloop-thread-1] TRACE Server - [email protected]: Socket Message 
23:59:42.366 [vert.x-eventloop-thread-1] TRACE Server - [email protected]: Socket Message 
23:59:42.367 [vert.x-eventloop-thread-1] TRACE Server - [email protected]: Socket Message 
23:59:42.368 [vert.x-eventloop-thread-1] TRACE Server - [email protected]: Socket Message 
23:59:42.369 [vert.x-eventloop-thread-1] TRACE Server - [email protected]: Socket Message 
23:59:42.370 [vert.x-eventloop-thread-1] TRACE Server - [email protected]: Socket Message 
... more than 100+ lines ... 

Обратный пример аналогичен this echo server written in BOOST.ASIO. Обработчики выполняются в разных потоках цикла событий, если пул потоков используется для выполнения io_service::run().

Итак, мой вопрос заключается в том, как запускать эти обработчики одновременно?

ответ

0

Фактически, вы делаете что-то совершенно иное, чем вы намерены.

Каждый раз, когда вы получаете соединение на сокет, вы запускаете новый актер,

Простейший способ доказать, что:

Vertx vertx = Vertx.vertx(); // The number of event loop threads is 2*core. 
vertx.createHttpServer().requestHandler(request -> { 

    vertx.deployVerticle(new AbstractVerticle() { 

     String uuid = UUID.randomUUID().toString(); // Some random unique number 

     @Override 
     public void start() throws Exception { 
      request.response().end(uuid + " " + Thread.currentThread().getName()); 
     } 

    }); 
}).listen(8888); 



vertx.setPeriodic(1000, r -> { 
    System.out.println(vertx.deploymentIDs().size()); // Print verticles count every second 
}); 

Я использую HTTPServer только потому, что легче проверить в браузере.
Как неправильно, как это может быть, вы будете видеть, что вы должны получить различные темы:

fe931b18-89cc-4c6a-9d6a-8565bb1f1c12 vert.x-eventloop-thread-9 
277330da-4df8-4e91-bd8f-82c0f62156d0 vert.x-eventloop-thread-11 
bbd3207c-80a4-41d8-9be5-b40727badc84 vert.x-eventloop-thread-13 

Теперь, как вы должны это сделать:

// We create 10 workers 
for (int i = 0; i < 10; i++) { 
    vertx.deployVerticle(new AbstractVerticle() { 

     @Override 
     public void start() { 

      vertx.eventBus().consumer("processMessage", (request) -> { 
       // Do something smart 

       // Reply 
       request.reply("I'm on thread " + Thread.currentThread().getName()); 
      }); 
     } 
    }); 
} 

// This is your handler 
vertx.createHttpServer().requestHandler(request -> { 
    // Only one server, that should dispatch events to workers as quickly as possible 
    vertx.eventBus().send("processMessage", null, (response) -> { 
     if (response.succeeded()) { 
      request.response().end("Request :" + response.result().body().toString()); 
     } 
     // Handle errors 
    }); 
}).listen(8888); 

vertx.setPeriodic(1000, r -> { 
    System.out.println(vertx.deploymentIDs().size()); // Notice that number of workers doesn't change 
}); 
+0

Я еще не понял. Разница между вашим кодом и моим - это местоположение для развертывания вертикальных рабочих элементов. Вы развертываете их снаружи, а я их развертываю в обработчике. Означает ли это, что вертикали, развернутые в некотором обработчике, наследуют поток развертывания обработчика? И что это действительно задумано, если да? –

+0

Вы создаете вертицы для каждого запроса. Вы не должны этого делать. Вертикаль не наследует нить. Если вы запустите мой пример, вы увидите, что, хотя я создаю вертикулы так же, как и вы, я получаю разные потоки каждый раз. Причина, по которой вы думаете, что вы в одном потоке, вероятно, связана с вашим клиентом Net. Но это еще одна тема. –

+0

Почему это не должно быть? Какая разница? Я не могу найти его в документах vert.x или, возможно, мне нужно прочитать его реализацию. :) –

0

Это не представляется возможным определить, какое событие loop Vert.x присваивает каждой из ваших вершин без дополнительной информации (например, количество ядер ваших тестовых машин).

В любом случае, не рекомендуется развертывать вертикальную линию для каждого входящего соединения. Вертикаль - это единицы развертывания в Vert.x. Обычно вы создаете один за «функциональность».

Обратно к случаю использования, цель программирования, управляемая событиями, заключается в том, чтобы избежать использования нити для каждого соединения. Вы можете обрабатывать много одновременных соединений с одним циклом события. Если на вашем компьютере имеется несколько ядер, вы можете развернуть несколько экземпляров вашей вертицы, чтобы использовать их все (1 цикл событий на ядро).

int processors = Runtime.getRuntime().availableProcessors(); 
Vertx vertx = Vertx.vertx(); 
vertx.deployVerticle(TCPServerVerticle.class.getName(), new DeploymentOptions().setInstances(processors)); 

public class TCPServerVerticle extends AbstractVerticle { 

    @Override 
    public void start(Future<Void> startFuture) throws Exception { 
    vertx.createNetServer().connectHandler(socket -> { 
     socket.handler(buffer -> { 
     log.trace(socket.toString() + ": Socket Message"); 
     socket.close(); 
     }); 
    }).listen(port, ar -> { 
     if (ar.succeeded()) { 
     startFuture.complete(); 
     } else { 
     startFuture.fail(ar.cause()); 
     } 
    }); 
    } 
} 

С Vertx TCP server sharing в Connect обработчики будет вызываться на циклическом режиме.

Смежные вопросы