09.04.2011

Reactive TcpClient

Хочу поделиться методом создания Tcp соединения с помощью Reactive Extensions. Основным желанием при создании данного метода было максимальное упрощение работы с соединением и декларативный подход.



Вот пример использования такого соединения:

TcpEndpoint
    .Connect("localhost", 80) // подключаемся к серверу
    .Write("GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") // отправляем запрос
    .SelectMany(conn => conn.Reader.ToObservable()) // читаем данные с сервера
    .Repeat() // в случае разрыва соединения повторяем все с начала
    .Subscribe(Console.WriteLine); // выводим все полученные данные в консоль

на мой взгляд, вполне себе декларативный вариант создания соединения с сервером.
Теперь разберем, что скрывается внутри.

Метод TcpEndpoint.Connect:

    public static class TcpEndpoint
    {
        public static IObservable<TcpEndpointConnection> Connect(string host, int port)
        {
            return Observable.CreateWithDisposable<TcpEndpointConnection>(
                observer =>
                {
                    var client = new TcpClient();
                    client.Connect(host, port);
                    var stream = client.GetStream();
                    var reader = new StreamReader(stream);
                    var writer = new StreamWriter(stream);
                    var connection = new TcpEndpointConnection(reader, writer);
                    return
                        Observable.Return(connection)
                        .Subscribe(observer);
                });
        }
    }

здесь мы создаем соединение, читателя и писателя для потока и возвращаем все это хозяйство в виде обозреваемого объекта класса TcpEndpointConnection.

TcpEndpointConnection - это просто промежуточное хранилище ссылок на StreamReader и StreamWriter:

    public class TcpEndpointConnection
    {
        private readonly StreamReader reader;
        private readonly StreamWriter writer;
        public TcpEndpointConnection(StreamReader reader, StreamWriter writer)
        {
            this.writer = writer;
            this.reader = reader;
        }
        public StreamReader Reader
        {
            get { return this.reader; }
        }
        public StreamWriter Writer
        {
            get { return this.writer; }
        }
    }

для него имеется расширение, позволющее писать данные в исходящий поток текущего соединения:

    public static class TcpEndpointEx
    {
        public static IObservable<TcpEndpointConnection>
            Write(this IObservable<TcpEndpointConnection> connection, string toWrite)
        {
            return Observable.CreateWithDisposable<TcpEndpointConnection>(
                observer =>
                {
                    return connection
                        .Do(conn =>
                        {
                            conn.Writer.Write(toWrite);
                            conn.Writer.Flush();
                        }).Subscribe(observer);
                });
        }
    }

и последний штрих - чтение данных из соединения с помощью расширения класса StreamReader:

    public static class StreamReaderEx
    {
        public static IObservable<string> ToObservable(this StreamReader source)
        {
            return Observable.CreateWithDisposable<string>(
                observer =>
                {
                    return Observable.Generate(
                        source,
                        reader => !reader.EndOfStream,
                        reader => reader,
                        reader => reader.ReadLine())
                    .Subscribe(observer);
                });
        }
    }

Удачи!

0 коммент.:

Отправить комментарий