Я все еще изучаю RxJava. Каков наилучший способ использования потока в другом потоке? Или это противоречит принципам реактивного программирования?RxJava: Подайте один поток (наблюдаемый) как вход другого потока
Пример игрушки, который я пытаюсь написать, включает в себя TCP-клиент и сервер, который отправляет обратно заглавные буквы. Я бы хотел получить вход со стандартного ввода, отправить его на сервер и распечатать все, полученное как клиентом, так и сервером.
Ниже приведен ожидаемый результат выполнения программы:
(User input) apple
Server received: apple
Client received: APPLE
(User input) peach
Server received: peach
Client received: PEACH
мне удалось достичь этого с помощью трех наблюдаемых:
stdinStream
, который излучает строки из стандартного ввода,serverStream
который испускает строки, которые сервер получаетclientStream
, который испускает строки клиенту получает.
, а затем подписываются inputStream
изнутри создания clientStream
, например, так:
private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) {
return Observable.create(sub -> {
try (Socket socket = new Socket(host, port);
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
PrintWriter outWriter = new PrintWriter(outputStream, true);
) {
inputStream.subscribe(line -> {
outWriter.println(line);
try {
sub.onNext(inFromServer.readLine());
} catch (IOException e) {
sub.onError(e);
}
});
} catch (UnknownHostException e) {
sub.onError(e);
} catch (IOException e) {
sub.onError(e);
}
});
}
Примечание: Я не хочу, чтобы создать несколько клиентов и будет лучше держать одного клиента и работает поручить ему отправлять разные значения на сервер на основе ввода. Таким образом, подход отображения входа в новый clientStream
не требуется:
stdinStream.map(line -> createClientStream(line))
Так что мои вопросы:
- Это разумный способ использования RxJava? Есть ли лучшие альтернативы?
- Я создал клиентский сокет как часть создания
clientStream
. Я сделал это, чтобы я мог легко запускать его асинхронно с использованием планировщиков,clientStream.scheduleOn(Schedulers.newThread)
. Может быть, я должен сделать это по-другому, учитывая требования к одному клиенту?
Вот полный код: https://gist.github.com/lintonye/25af58abdfcc688ad3c3