27.03.2011

Чтение потокового XML с помощью Reactive Extensions.

Решил выложить пример реализации класса XmlFeed с помощью расширений Rx. XmlFeed представляет из себя реализацию интерфейса IConnectableDisposable<XmlReader>, поставляющий отдельный XmlReader для каждого фрагмента XML, полученного из входящего потока. Позволяет обрабатывать бесконечные потоки xml напрямую из сетевых соединений.
Логика работы простая - в конструктор передаем поток (Stream) и XName элементов, которые ожидаем из него получать. Затем подписываемся с помощью Subscribe() и запускаем с помощью Connect();


using System;
using System.Collections.Generic;
using System.Concurrency;
using System.Disposables;
using System.IO;
using System.Xml;
using System.Xml.Linq;
using System.Xml.Schema;
namespace XmlFeedReader
{
    public sealed class XmlFeed : IConnectableObservable<XmlReader>
    {
        private readonly Stream inputStream;
        private readonly XName nodeName;
        private readonly Subject<XmlReader> dataStream;
        public XmlFeed(Stream inputStream, XName nodeName)
        {
            this.inputStream = inputStream;
            this.nodeName = nodeName;
            this.dataStream = new Subject<XmlReader>();
        }
        public IDisposable Connect()
        {
            BooleanDisposable cancel = new BooleanDisposable();
            Scheduler.NewThread.Schedule(() =>
            {
                try
                {
                    XmlReader reader = XmlReader.Create(this.inputStream, GetReaderSettings());
                    while (!cancel.IsDisposed && !reader.EOF && reader.ReadToFollowing(this.nodeName.LocalName, this.nodeName.NamespaceName))
                    {
                        this.dataStream.OnNext(reader.ReadSubtree());
                    }
                    this.dataStream.OnCompleted();
                }
                catch (Exception error)
                {
                    this.dataStream.OnError(error);
                }
            });
            return cancel;
        }
        public IDisposable Subscribe(IObserver<XmlReader> observer)
        {
            return this.dataStream.Subscribe(observer);
        }
        private static XmlReaderSettings GetReaderSettings()
        {
            XmlReaderSettings settings = new XmlReaderSettings();
            settings.ConformanceLevel = ConformanceLevel.Fragment;
            settings.CloseInput = false;
            settings.CheckCharacters = false;
            settings.DtdProcessing = DtdProcessing.Ignore;
            settings.IgnoreComments = true;
            settings.IgnoreProcessingInstructions = true;
            settings.IgnoreWhitespace = true;
            settings.MaxCharactersInDocument = 0;
            settings.ValidationFlags = XmlSchemaValidationFlags.None;
            settings.ValidationType = ValidationType.None;
            return settings;
        }
    }
}


Ниже приведен пример использования данной реализации XmlFeed для получения ленты новостей из RSS сайта vesti.ru.

using System;
using System.Net;
using System.Text;
using System.Xml;
using System.Xml.Linq;
namespace XmlFeedReader.SampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            WebClient client = new WebClient();
            var stream = client.OpenRead("http://www.vesti.ru/vesti.rss");
            var xmlFeed = new XmlFeed(stream, XName.Get("item"));
            xmlFeed.Subscribe(reader =>
            {
                using (reader)
                {
                    XmlDocument doc = new XmlDocument();
                    doc.Load(reader);
                    Console.WriteLine(doc.OuterXml);
                }
            },
            error => Console.WriteLine("error: ", error.Message),
            () => Console.WriteLine("completed."));
            using (var connection = xmlFeed.Connect())
            {
                Console.ReadLine();
            }
        }
    }
}

0 коммент.:

Отправить комментарий