Я новичок в Rx, поэтому я, вероятно, делаю некоторые важные ошибки здесь.Наблюдаемый TcpListener заканчивается после однократного соединения
Я хотел создать очень простой сервер сокетов, который мог бы получать сообщения от клиентов с помощью Observables. Для этого я использую Rxx, который предоставляет методы расширения в пространстве имен System.Net.Sockets, а также предоставляет статический заводский класс ObserableTcpListener.
Вот что я до сих пор в значительной степени украл ее из различных источников:
IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);
TcpListener listener = new TcpListener(endpoint);
IObservable<TcpClient> clients = listener
.StartSocketObservable(1)
.SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket));
.Finally(listener.Stop)
clients.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
private static IObservable<TcpClient> SocketToTcpClient(Socket socket)
{
TcpClient client = new TcpClient();
client.Client = socket;
return Observable.Return<TcpClient>(client);
}
private static IObservable<byte[]> OnConnect(TcpClient client)
{
return client.Client.ReceiveUntilCompleted(SocketFlags.None);
}
private static void OnMessage(TcpClient client, byte[] message)
{
Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message));
}
private static void OnCompleted(TcpClient client)
{
Console.WriteLine("Completed.");
}
private static void OnException(TcpClient client, Exception ex)
{
Console.WriteLine("Exception: {0}", ex.ToString());
}
Это работает ... до точки. Я могу создать одно клиентское соединение. Как только это соединение завершается, кажется, что последовательность Observable завершается и вызывается .Finally(listener.Stop)
. Очевидно, это не то, что я хочу.
Я пробовал использовать фабричный класс ObserableTcpListener.Start()
, но это дает мне тот же результат.
IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint);
sockets.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
Я предполагаю, что я понимаю, проблема здесь: clients
наблюдаемая последовательность просто пустой после того, как первый клиент прекращает, таким образом .Finally(listener.Stop)
называется.
Что мне нужно сделать, чтобы обойти это? Как я могу продолжать слушать входящие соединения?
https://searchcode.com/codesearch/view/14317362/ мне кажется, что код принимает только одно соединение. В любом случае, посоветуйте отказаться от этого подхода и использовать стандартные методы для запуска сокетов. Я не вижу никакого преимущества, связанного с использованием TcpListener/Client напрямую, возможно, с ожиданием. – usr
@usr Основными причинами являются то, что по большей части мне нравится писать код Rx-way, потому что он очень читабельен и четко выражает намерение. Вторая причина заключается в том, что я пишу кучу разных кодов событийного стиля, а Rx обеспечивает отличную абстракцию над всеми этими стилями, позволяя сохранять шаблоны одинаковыми. В-третьих, я просто пытаюсь изучить Rx. Спасибо за совет! –
Если я могу добавить аргумент счетчика к этому: Этот стиль разбивает нормальный последовательный код на обратные вызовы, что, как правило, является ужасным для качества кода. Например, у вас есть классическая ошибка в коде, где вы предполагаете, что TCP отправляет «сообщения». Это не так, и поэтому 'Encoding.UTF8.GetString' будет иногда возвращать мусор, если вам не повезет, и ваше« сообщение »разделяется на полпути через кодированную кодовую точку UTF8. Это трудно исправить в этом стиле. В последовательном коде вы можете использовать StreamReader или BinaryReader и вытаскивать данные. С толчком вы должны взять то, что приходит. – usr