2012-03-16 3 views
8

Я новичок в netty и все еще изо всех сил пытаюсь найти свой путь. Я ищу создать клиент http, который работает асинхронно. Нетто-примеры http только показывают, как ждать операций ввода-вывода, а не как использовать addListener, и поэтому я пытался понять это в течение последних нескольких дней.Асинхронный HTTP-клиент с Netty

Я пытаюсь создать класс запроса, который будет обрабатывать все различные состояния запроса, от соединения, отправки данных, обработки ответа и последующего закрытия соединения. Для этого мой класс расширяет SimpleChannelUpstreamHandler и реализует ChannelFutureListener. Я использую ChannelPipelineFactory, который добавляет (этот) экземпляр класса (как SimpleChannelUpstreamHandler) к конвейеру как обработчик.

Соединение создается следующим образом:

this.state = State.Connecting; 
this.clientBootstrap.connect(this.address).addListener(this); 

Тогда operationComplete метод:

@Override 
public void operationComplete(ChannelFuture future) throws Exception { 
    State oldState = this.state; 

    if (!future.isSuccess()) { 
     this.status = Status.Failed; 
     future.getChannel().disconnect().addListener(this); 
    } 
    else if (future.isCancelled()) { 
     this.status = Status.Canceled; 
     future.getChannel().disconnect().addListener(this); 
    } 
    else switch (this.state) { 
     case Connecting: 
      this.state = State.Sending; 
      Channel channel = future.getChannel(); 
      channel.write(this.createRequest()).addListener(this); 
      break; 

     case Sending: 
      this.state = State.Disconnecting; 
      future.getChannel().disconnect().addListener(this); 
      break; 

     case Disconnecting: 
      this.state = State.Closing; 
      future.getChannel().close().addListener(this); 
      break; 

     case Closing: 
      this.state = State.Finished; 
      break; 
    } 
    System.out.println("request operationComplete start state: " + oldState + ", end state: " + this.state + ", status: " + this.status); 
} 

private HttpRequest createRequest() { 
    String url = this.url.toString(); 

    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); 
    request.setHeader(HttpHeaders.Names.HOST, this.url.getHost()); 
    request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); 
    request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); 

    return request; 
} 

Класс также имеет приоритет над messageReceived метод:

@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
    System.out.println("messageReceived"); 
    HttpResponse response = (HttpResponse) e.getMessage(); 

    ChannelBuffer content = response.getContent(); 
    if (content.readable()) { 
     System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8)); 
    } 
} 

Проблема заключается в том, что я получаю этот выход:

request operationComplete start state: Connecting, end state: Sending, status: Unknown 
request operationComplete start state: Sending, end state: Disconnecting, status: Unknown 
request operationComplete start state: Closing, end state: Finished, status: Unknown 
request operationComplete start state: Disconnecting, end state: Finished, status: Unknown 

Как вы можете видеть messageReceived из не выполняются по какой-то причине, даже если завод трубопровода добавляет экземпляр этого класса к трубопроводу.

Любые идеи, что мне здесь не хватает? Спасибо.


Редактировать

мне удалось, наконец, получить эту работу благодаря помощи @JestanNirojan, в случае, если кто-то будет заинтересован в решении:

public class ClientRequest extends SimpleChannelUpstreamHandler { 

    .... 

    public void connect() { 
     this.state = State.Connecting; 
     System.out.println(this.state); 
     this.clientBootstrap.connect(this.address); 
    } 

    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     this.state = State.Sending; 
     System.out.println(this.state); 
     ctx.getChannel().write(this.createRequest()); 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
     HttpResponse response = (HttpResponse) e.getMessage(); 

     ChannelBuffer content = response.getContent(); 
     if (content.readable()) { 
      System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8)); 
     } 

     this.state = State.Disconnecting; 
     System.out.println(this.state); 
    } 

    @Override 
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     this.state = State.Closing; 
     System.out.println(this.state); 
    } 

    @Override 
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     this.state = State.Finished; 
     System.out.println(this.state); 
    } 

    private HttpRequest createRequest() { 
     String url = this.url.toString(); 

     HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); 
     request.setHeader(HttpHeaders.Names.HOST, this.url.getHost()); 
     request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); 
     request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); 

     return request; 
    } 
} 
+0

HttpResponse полный HttpResponse или может быть куском?У меня есть 1000 кусков, возвращающихся назад, и вам нужно одно событие за кусок, или память взорвется, что приведет к нехватке памяти. –

+0

HttpResponse - это полный ответ, вы не можете его обрезать, насколько я знаю. Вы должны пойти ниже, чем, возможно, с помощью [HttpResponseDecoder] (http://static.netty.io/3.5/api/org/jboss/netty/handler/codec/http/HttpResponseDecoder.html). –

+0

Если вы не заинтересованы в chunking, используйте легкий http-клиент здесь @ https://github.com/arungeorge81/netty-http-client –

ответ

3

Вы используете ChannelFutureListener в все операции в канале (что плохо), а будущий слушатель будет выполнен сразу после вызова этих операций с каналом.

Проблема заключается в том, что после отправки сообщения канал немедленно отключается, и обработчик не может получить ответное сообщение, которое приходит позже.

 ........ 
    case Sending: 
     this.state = State.Disconnecting; 
     future.getChannel().disconnect().addListener(this); 
     break; 
     ........ 

Вы не должны блокировать нить канала в будущем. Наилучший подход заключается в распространении методов и реагирования на эти события в методе SimpleChannelUpstreamHandler

channelConnected(..) {} 
    messageReceived(..) {} 
    channelDisconnected(..) {} 

. вы также можете сохранить состояние в этом обработчике.

+1

Oh. Это было просто. Большое спасибо за информацию, я хочу, чтобы у Netty была лучшая документация по этому поводу. –