2015-10-24 4 views
1

Я все еще изучаю RxJava. Каков наилучший способ использования потока в другом потоке? Или это противоречит принципам реактивного программирования?RxJava: Подайте один поток (наблюдаемый) как вход другого потока

Пример игрушки, который я пытаюсь написать, включает в себя TCP-клиент и сервер, который отправляет обратно заглавные буквы. Я бы хотел получить вход со стандартного ввода, отправить его на сервер и распечатать все, полученное как клиентом, так и сервером.

Ниже приведен ожидаемый результат выполнения программы:

(User input) apple 
Server received: apple 
Client received: APPLE 
(User input) peach 
Server received: peach 
Client received: PEACH 

мне удалось достичь этого с помощью трех наблюдаемых:

  • stdinStream, который излучает строки из стандартного ввода,
  • serverStream который испускает строки, которые сервер получает
  • clientStream, который испускает строки клиенту получает.

, а затем подписываются inputStream изнутри создания clientStream, например, так:

private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) { 
    return Observable.create(sub -> { 
     try (Socket socket = new Socket(host, port); 
      BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream())); 
      DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream()); 
      PrintWriter outWriter = new PrintWriter(outputStream, true); 
     ) { 
      inputStream.subscribe(line -> { 
       outWriter.println(line); 
       try { 
        sub.onNext(inFromServer.readLine()); 
       } catch (IOException e) { 
        sub.onError(e); 
       } 
      }); 
     } catch (UnknownHostException e) { 
      sub.onError(e); 
     } catch (IOException e) { 
      sub.onError(e); 
     } 
    }); 
} 

Примечание: Я не хочу, чтобы создать несколько клиентов и будет лучше держать одного клиента и работает поручить ему отправлять разные значения на сервер на основе ввода. Таким образом, подход отображения входа в новый clientStream не требуется:

stdinStream.map(line -> createClientStream(line))

Так что мои вопросы:

  1. Это разумный способ использования RxJava? Есть ли лучшие альтернативы?
  2. Я создал клиентский сокет как часть создания clientStream. Я сделал это, чтобы я мог легко запускать его асинхронно с использованием планировщиков, clientStream.scheduleOn(Schedulers.newThread). Может быть, я должен сделать это по-другому, учитывая требования к одному клиенту?

Вот полный код: https://gist.github.com/lintonye/25af58abdfcc688ad3c3

ответ

2

Что вам нужно using. Поместите все связанные с сокетом объекты в класс Connection и задайте входную последовательность, сопоставьте ее с парой println/readLine, поддерживая одно соединение. Вот gist for a runnable example.

static class Connection { 
    Socket socket; 
    BufferedReader inFromServer; 
    DataOutputStream outputStream; 
    PrintWriter outWriter; 

    public Connection(String host, int port) { 
     try { 
      socket = new Socket(host, port); 
      inFromServer = new BufferedReader(
       new InputStreamReader(socket.getInputStream())); 
      outputStream = new DataOutputStream(socket.getOutputStream()); 
      outWriter = new PrintWriter(outputStream, true); 
     } catch (IOException ex) { 
      Exceptions.propagate(ex); 
     } 
    } 

    public void close() { 
     try { 
      outWriter.close(); 
      outputStream.close(); 
      inFromServer.close(); 
      socket.close(); 
     } catch (IOException ex) { 
      Exceptions.propagate(ex); 
     } 
    } 
} 

public static void main(String[] args) { 
    runServer(); 

    Observable<String> source = Observable.just("a", "b", "c"); 

    String host = "localhost"; 
    int port = 8080; 

    Observable.<String, Connection>using(() -> new Connection(host, port), 
    conn -> 
     source 
     .map(v -> { 
      conn.outWriter.println(v); 
      try { 
       return conn.inFromServer.readLine(); 
      } catch (IOException ex) { 
       throw Exceptions.propagate(ex); 
      } 
     }) 
    , Connection::close) 
    .subscribe(System.out::println); 
} 
Смежные вопросы