Хочу поделиться методом создания Tcp соединения с помощью Reactive Extensions. Основным желанием при создании данного метода было максимальное упрощение работы с соединением и декларативный подход.
Вот пример использования такого соединения:
TcpEndpoint
.Connect("localhost", 80) // подключаемся к серверу
.Write("GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") // отправляем запрос
.SelectMany(conn => conn.Reader.ToObservable()) // читаем данные с сервера
.Repeat() // в случае разрыва соединения повторяем все с начала
.Subscribe(Console.WriteLine); // выводим все полученные данные в консоль
на мой взгляд, вполне себе декларативный вариант создания соединения с сервером.
Теперь разберем, что скрывается внутри.
Метод TcpEndpoint.Connect:
public static class TcpEndpoint
{
public static IObservable<TcpEndpointConnection> Connect(string host, int port)
{
return Observable.CreateWithDisposable<TcpEndpointConnection>(
observer =>
{
var client = new TcpClient();
client.Connect(host, port);
var stream = client.GetStream();
var reader = new StreamReader(stream);
var writer = new StreamWriter(stream);
var connection = new TcpEndpointConnection(reader, writer);
return
Observable.Return(connection)
.Subscribe(observer);
});
}
}
здесь мы создаем соединение, читателя и писателя для потока и возвращаем все это хозяйство в виде обозреваемого объекта класса TcpEndpointConnection.
TcpEndpointConnection - это просто промежуточное хранилище ссылок на StreamReader и StreamWriter:
public class TcpEndpointConnection
{
private readonly StreamReader reader;
private readonly StreamWriter writer;
public TcpEndpointConnection(StreamReader reader, StreamWriter writer)
{
this.writer = writer;
this.reader = reader;
}
public StreamReader Reader
{
get { return this.reader; }
}
public StreamWriter Writer
{
get { return this.writer; }
}
}
для него имеется расширение, позволющее писать данные в исходящий поток текущего соединения:
public static class TcpEndpointEx
{
public static IObservable<TcpEndpointConnection>
Write(this IObservable<TcpEndpointConnection> connection, string toWrite)
{
return Observable.CreateWithDisposable<TcpEndpointConnection>(
observer =>
{
return connection
.Do(conn =>
{
conn.Writer.Write(toWrite);
conn.Writer.Flush();
}).Subscribe(observer);
});
}
}
и последний штрих - чтение данных из соединения с помощью расширения класса StreamReader:
public static class StreamReaderEx
{
public static IObservable<string> ToObservable(this StreamReader source)
{
return Observable.CreateWithDisposable<string>(
observer =>
{
return Observable.Generate(
source,
reader => !reader.EndOfStream,
reader => reader,
reader => reader.ReadLine())
.Subscribe(observer);
});
}
}
Удачи!
0 коммент.:
Отправить комментарий