2

Я пытаюсь использовать ElasticSearch REST API с Java Apache HttpAsyncClient. Я хочу использовать постоянное конвейерное соединение. Вот некоторый тестовый код (выход в комментариях):ElasticSearch и Apache HttpAsyncClient

@Test 
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException 
{ 
    testPost(HttpAsyncClients.createDefault()); 
    //201: {"_index":"test_index","_type":"test_type","_id":"AVIHYGnqdqqg_TAHm4ix","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true} 
    testPost(HttpAsyncClients.createPipelining()); 
    //400: No handler found for uri [http://127.0.0.1:9200/test_index/test_type] and method [POST] 
} 

private void testPost(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException 
{ 
    client.start(); 
    HttpPost request = new HttpPost("http://127.0.0.1:9200/test_index/test_type"); 
    request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}")); 
    Future<HttpResponse> responseFuture = client.execute(request, null); 
    HttpResponse response = responseFuture.get(); 
    System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity())); 
} 

Я не могу понять, почему он прекрасно работает с HttpAsyncClients.createDefault() клиентом, но не работает с HttpAsyncClients.createPipelining(). Также я не могу понять разницу между этими двумя методами создания.

Почему я получаю сообщение об ошибке при использовании createPipelining()?

Я попытался увидеть разницу с https://httpbin.org/post, но он показал мне тот же результат с обоими параметрами. Я использую настройки ElasticSearch по умолчанию.

Спасибо!


UPD1

Я попытался с PUT документа (PUT http://127.0.0.1/test_index/test_type/<doc id>) запроса с тем же результатом - он прекрасно работает с createDefault(), но я получил подобную ошибку, когда делают это с createPipelining() - обработчик не был найден <. ..>.

Но когда я пытаюсь выполнить запрос на создание индекса (PUT http://127.0.0.1/<index name>), есть еще одна ошибка. Смотрите код ниже:

@Test 
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException 
{ 
    testCreateIndex(HttpAsyncClients.createDefault()); 
    //200: {"acknowledged":true} 
    testCreateIndex(HttpAsyncClients.createPipelining()); 
    //400: {"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"}],"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"},"status":400} 
} 

private void testCreateIndex(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException 
{ 
    client.start(); 
    HttpPut request = new HttpPut("http://127.0.0.1:9200/" + RandomStringUtils.randomAlphabetic(8).toLowerCase()); 
    Future<HttpResponse> responseFuture = client.execute(request, null); 
    HttpResponse response = responseFuture.get(); 
    System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity())); 
} 

Как я могу видеть на this documentation page ElasticSearch поддерживает HTTP конвейерный по умолчанию. Может быть, мне нужно изменить настройки ES?


UPD2

Вот некоторые провода журналы для кода в UPD1 раздел с различными параметрами записи:

Dorg.apache.commons.logging.simplelog.log.org.apache.http=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=INFO 

http://pastebin.com/v29uvgbj

-Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.conn=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=DEBUG 

http://pastebin.com/G9ij15d6


UPD3

Я просто попытался заменить createDefault() с createMinimal(), и это вызвало ту же ошибку, что createPipelining(). Любые идеи, что в MinimalHttpAsyncClient могут вызвать эту проблему? Может быть, есть способ, которым я могу вручную создать конвейерный клиент (с классами строителей) без этой проблемы?

+0

Какую версию ES вы используете? – Val

+0

последний на сегодня, 2.1.1 – coolguy

+0

Просьба отправить проводные журналы обеих сессий – oleg

ответ

2

Сервер должен быть удушье по абсолютной URI запроса в строке запроса

[DEBUG] wire - http-outgoing-1 >> "PUT http://127.0.0.1:9200/ydiwdsid HTTP/1.1[\r][\n]" 

HttpAsyncClient в режиме конвейерного использует минимальную цепочку обработки протокола. Он не пытается переписать URI запроса объекта запроса.

Для вашего конкретного случая запрос конвейерной обработки, похоже, не имеет большого смысла. Не говоря уже о том, что, если вы не отправляете запросы в партии, вы даже не используете конвейерное исполнение.

+0

Как проверить, действительно ли это абсолютный URI запроса? О конвейерном выполнении: с клиентом 'createPipelining()' существует только одно соединение, которое используется каждый раз, когда я выполняю запрос (одному и тому же хосту) через этого клиента, правильно? И о партиях - не могли бы вы объяснить, как я могу попробовать? Моя настоящая цель - обрабатывать новые запросы снова и снова, не дожидаясь ответов (ну, я хочу их иногда получать, но снова - я не хочу, чтобы ответ ожидал блокировать новые запросы). Какой клиент мне следует использовать для этого? – coolguy

+0

curl - ваш друг – oleg

+0

Я понял, что когда я выполняю свой код с помощью 'MinimalHttpAsyncClient', он каким-то образом создает индекс, называемый' http: 'в ElasticSearch. То же самое, когда я пытаюсь выполнить следующую команду: 'nc 127.0.0.1 9200 coolguy

2

На самом деле вам просто нужно извлечь хост из URL-адреса и создать объект HttpPost только с абсолютным путем. Смотрите изменения на второй, третьей и пятой строках ниже:

client.start(); 
HttpHost targetHost = new HttpHost("127.0.0.1", 9200); 
HttpPost request = new HttpPost("/test_index/test_type"); 
request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}")); 
Future<HttpResponse> responseFuture = client.execute(targetHost, request, null); 
HttpResponse response = responseFuture.get(); 
System.out.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity())); 

Выполнение этих трех изменений и выполнения кода снова даст это:

201: {"_index":"test_index","_type":"test_type","_id":"AVISSimIZHOoPG8ibOyF","_version":1,"created":true} 
201: {"_index":"test_index","_type":"test_type","_id":"AVISSimjZHOoPG8ibOyG","_version":1,"created":true} 
+0

!!! I luv u: D Я дам этот ответ еще одной наградой, если это возможно – coolguy

+0

Рад помочь, что бы ни работало для вас. – Val

+0

кажется, что нельзя установить вторую награду в Stack Overflow :(Но в любом случае спасибо вам большое! – coolguy

Смежные вопросы