Reactive Extensions for .NET ( “stuff happens” )

I’ve been taking a look at the Reactive Extensions for .NET. It’s early days for me at this point but I’m finding what I’m seeing to be really interesting.

This is code that’s available from the DevLabs;

image

and there are versions for .NET 3.5 Sp1, .NET 4.0, Silverlight 3, Silverlight 4 and also for JavaScript.

The essence of the library is around consuming asynchronous and event-based work using observable collections that “push” items of interest at a composable set of observers.

Update to post  – I highly recommend the Hands On Labs for Rx which should be renamed “the instruction manual” Winking smile 

I started with this video where Wes Dyer does a great job of drawing you in to the work that they’re doing;

image

I found that there’s a point in this video where there’s a leap from 0 to 60 in 3-4 seconds as a couple of base concepts are suddenly expanded into the “drag and drop example” and it’s a little on the mind-bending side.

I followed that up with a quick subscription to the team blog and then watched a bunch of other videos;

I also found the Rx Wiki;

Reactive Framework (Rx) Wiki

and I particularly like it’s categorisation page for the “operators” because that was something that I really needed and this provides a good starting point for it.

I also watched the PDC 09 session;

image

That was enough to encourage me to do a download for Silverlight 4. I had a quick poke around in System.Observable.dll;

( which I found in c:\program files\microsoft cloud programmability\reactive extensions )

image

That seems so simple and yet so much has already been built on top of it in the reactive extensions libraries.

An IObserver<T> can subscribe to something happening on an IObservable<T> and get notified when something happens (OnNext), when something fails (OnError) and when something ends (OnCompleted).

It’s interesting to contrast that with IEnumerable<T> and IEnumerator<T>;

image

and that made me think of these two being very closely paired like these two;

If I have an Enumerable I can ask it for an Enumerator and I manually pull values from that Enumerator until it tells me that there are no values left.

Where that’s more “challenging” is if the values being enumerated aren’t “ready” at the time that you’d like to enumerate them. There’s no AsyncEnumerator so the best thing that the producer of the sequence could do is to have their iteration of MoveNext() block until a value is ready. Not-so-pleasant.

IObserver<T> feels like the inverse of IEnumerator<T> in that rather than the consumer pulling values from the producer via MoveNext() the consumer registers a callback with the producer and the producer can then produce values whenever it suits them. That’s likely to work much better for asynchronous production of sequences.

I needed to start simple and so I referenced System.Reactive.dll and System.CoreEx.dll and I wrote a tiny bit of code;

  public partial class MainPage : IObserver<int>
  {
    IObservable<int> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable = Observable.Empty<int>();

        IDisposable subscription = observable.Subscribe(this);

        // who calls Dispose() and when? 
        //   .  can't call it here because my subscription would die
        //   .  would naturally call it in OnCompleted() and OnError() but
        //      appears to be unnecessary as it's called for me after
        //      OnCompleted() in some "auto-cleanup when done" mode.
      };
    }
    public void OnCompleted()
    {
      Dispatcher.BeginInvoke(() =>
        {
          MessageBox.Show("Done");
        });
    }
    public void OnError(Exception exception)
    {
      Dispatcher.BeginInvoke(() =>
        {
          MessageBox.Show(string.Format("Problemo! {0}", exception.Message));
        });
    }
    public void OnNext(int value)
    {
      Dispatcher.BeginInvoke(() =>
        {
          MessageBox.Show(string.Format("Received value {0}", value));
        });
    }
  }

Ok, so I’m just subscribing to an empty sequence of integers and displaying a message when that sequence has been exhausted ( which is likely to be immediately ) and there are ways to shorten the code a lot but I found it “interesting” to think about the lifetime of the observable and the subscription that is observing it.

Two ways to shorten the code – I don’t have to implement IObserver<T> as I can just provide lambdas that execute the OnNext/OnError/OnCompleted actions. I can also avoid Dispatcher.BeginInvoke by composing the call to Subscribe() with another call that moves the delivery of the notifications to the UI synchronisation context as in;

 public partial class MainPage 
  {
    IObservable<int> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable = Observable.Empty<int>();

        IDisposable subscription =
          observable.
            ObserveOnDispatcher().
            Subscribe(
              x => MessageBox.Show(string.Format("Received value {0}", x)),
              err => MessageBox.Show(string.Format("Problemo {0}", err.Message)),
              () => MessageBox.Show("Done"));
      };
    }
  }

watching the “empty sequence” is not so exciting though and there’s a pre-canned method called Interval() which will generate values for me on an interval so I figured I might take those values, square them and display them ( here represented by using MessageBox );

  public partial class MainPage 
  {
    IObservable<long> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable =
          Observable.Interval(new TimeSpan(0, 0, 3));

        observable.
          ObserveOnDispatcher().
          Select(x => new { Value = x + 1, Square = (x + 1) * (x + 1) }).
          Subscribe(
            x => MessageBox.Show(string.Format("Received value {0} square {1}",
              x.Value, x.Square)));
      };
    }
  }

“Interesting” but I might not want to do that forever, so maybe I’d like 10 seconds worth of that data;

 public partial class MainPage 
  {
    IObservable<long> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable =
          Observable.Interval(new TimeSpan(0, 0, 3));

        observable.          
          Select(x => new { Value = x + 1, Square = (x + 1) * (x + 1) }).
          TakeUntil(Observable.Interval(new TimeSpan(0, 0, 10))).
          ObserveOnDispatcher().
          Subscribe(
            x => MessageBox.Show(string.Format("Received value {0} square {1}",
              x.Value, x.Square)),
            () => MessageBox.Show("Done"));
      };
    }
  }

That TakeUntil() is interesting in the sense that it takes another IObservable and continues its observation until that second IObservable produces some “value” which, in this case, will be after 10 seconds. I used Observable.Interval but I could have used Observable.Timer as well I think.

That then that had me thinking about two things – the old “if a tree falls in the forest…” thing;

in the sense of “is my observable producing values if no-one is listening?” and also what happens if I’m dynamically creating subscriptions here? Are those subscriptions seeing the same values? So I added a means via which a UI button could dynamically make another subscription…

 

  public partial class MainPage 
  {
    IObservable<long> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable =
          Observable.Interval(new TimeSpan(0, 0, 3));
      };
    }
    void OnUISubscribeButtonClick(object sender, RoutedEventArgs e)
    {
      int subNumber = ++this.subscriptionNumber;

      observable.
        Select(x => new { Value = x + 1, Square = (x + 1) * (x + 1) }).
        TakeUntil(Observable.Interval(new TimeSpan(0, 0, 10))).
        ObserveOnDispatcher().
        Subscribe(
          x => MessageBox.Show(string.Format("Sub {2} received value {0} square {1}",
          x.Value, x.Square, subNumber)),
          () => MessageBox.Show(string.Format("Sub {0} done", subNumber)));
    }
    int subscriptionNumber;
  }

and I found that “very interesting” and specifically for the fact that each subscription displayed (approx) the values;

 

  • 1, 1
  • 2, 4
  • 3, 9

before ending. But why did they each re-start at 1,1? This led me back to the “Hot and Cold Observables” video;

which then made more sense to me – I’m producing a cold observable here that gets produced ( quoting Wes on that video );

  • “Only when someone subscribes and each time someone subscribes”

I pondered for quite a while as to how I’d produce a hot observable and built something that was using a DispatcherTimer;

  public partial class MainPage 
  {
    DispatcherTimer timer;
    IObservable<IEvent<EventArgs>> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        timer = new DispatcherTimer();
        timer.Interval = new TimeSpan(0, 0, 2);
        timer.Start();

        observable =
          Observable.FromEvent(
              (EventHandler<EventArgs> handler) => new EventHandler(handler),
              ev => timer.Tick += ev,
              ev => timer.Tick -= ev).
            SubscribeOnDispatcher();
      };
    }
    void OnUISubscribeButtonClick(object sender, RoutedEventArgs args)
    {
      int subNumber = ++this.subscriptionNumber;

      observable.
        TakeUntil(Observable.Timer(new TimeSpan(0, 0, 10))).
        ObserveOnDispatcher().
        Subscribe(
          e => MessageBox.Show(string.Format("Sub {0} notified", subNumber)),
          () => MessageBox.Show(string.Format("Sub {0} done", subNumber)));
    }
    int subscriptionNumber;
  }

Now, DispatcherTimer.Tick is a weird event to subscribe to because it doesn’t produce any data but it highlighted the need to use SubscribeOnDispatcher() on line 21 above. If I don’t use this code I found that the lambda that was being used to unsubscribe from the Tick event was being run on some worker thread when the TakeUntil clause caused the subscription to automatically stop. This caused the DispatcherTimer to blow up so SubscribeOnDispatcher() comes into play to ensure that both subscribing and unsubscribing is done in the right synchronisation context and so the thread affinity of the DispatcherTimer isn’t broken.

But I think that’s a hot observable. A more realistic one might be to monitor keypresses;

  public partial class MainPage 
  {
    IObservable<IEvent<KeyEventArgs>> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable =
          Observable.FromEvent<KeyEventArgs>(this, "KeyDown").
            SubscribeOnDispatcher();
      };
    }
    void OnUISubscribeButtonClick(object sender, RoutedEventArgs args)
    {
      int subNumber = ++this.subscriptionNumber;

      observable.
        TakeUntil(Observable.Timer(new TimeSpan(0, 0, 30))).
        ObserveOnDispatcher().
        Subscribe(
          e => MessageBox.Show(string.Format("Sub {0} notified of key code {1}", 
            subNumber, e.EventArgs.PlatformKeyCode)),
          () => MessageBox.Show(string.Format("Sub {0} done", subNumber)));
    }
    int subscriptionNumber;
  }

and that’s definitely a hot observable.

Looking at the wiki page again, I wanted to dig a little more into some of these “operators” – there’s tonnes of them and I found that the RxSandbox was a nice app for letting me get an explanation, a code sample and a “marble diagram” for each operator – very neat. I downloaded it, ran it (but didn’t compile the source as I wasn’t 100% sure if it was compatible with the current version of Rx).

image

There’s a lot of “operators” here that are already built and it feels like a task to figure out what they all are and what they do – they seem to fall into 3 categories;

  1. Familiar ‘operators’ from LINQ IEnumerable<T> – example here would be Count
  2. Operators new on IObservable<T> which are also applicable to IEnumerable<T> and so are implemented (by a class called EnumerableEx) in System.Interactive.dll – example here would be BufferWithCount.
  3. Operators new on IObservable<T> – example here would be BufferWithTime.

I started to play a little with (2) above on enumerables in the first place in that I wrote various bits of code to try and figure some of these out.

Things like using BufferWithCount to chunk up enumerables and Generate to create one in the first place;

 

   foreach (var item in 
        EnumerableEx.Generate(0, i => i < 10, i => i, i => i + 1).
        BufferWithCount(3))
      {
        Debug.Assert(item.Count <= 3);
        foreach (var innerItem in item)
        {
          //
        }
      }

 

and things like Throw/Catch/Finally for handling an exception in the production of an enumerable and dealing with it by returning an alternate enumerable ( also using Generate in there as well );

 

      foreach (var item in
        EnumerableEx.
          Generate(0, (int i) => i < 10, i => i, i => i + 1).
          Concat(
            EnumerableEx.Throw<int>(new ApplicationException())).
          Catch(
            (ApplicationException ex) => EnumerableEx.Return(11)).
          Finally(
            () => MessageBox.Show("Done")))
      {
        // ...
      }

 

and OnErrorResumeNext feels similar and also reminds me of VB6 :-S and, again, allows for the continuation of an enumerable from another enumerable in a situation where an error occurs in the production of the first enumerable;

 

      foreach (var item in
        EnumerableEx.
          Generate(0, 
            (int i) => i < 10, 
            i => i, 
            i => 
              {
                if (i == 5)
                {
                  throw new ApplicationException();
                }
                return(i + 1);
              })
          .OnErrorResumeNext(
            EnumerableEx.Return(101)))
      {
        // ... 
      }

and I can do some control-flow style things with Case, If, IfEmpty, While,

 

 

      foreach (var item in EnumerableEx.If(
        () => true,
        new string[] { "this", "array", "gets", "chosen" },
        new string[] { "this", "array", "does", "not" }))
      {
        //  
      }

      foreach (var item in
        EnumerableEx.Case(
        () => 3,
        new Dictionary<int, IEnumerable<char>>()
        {
          { 1, "abc" },
          { 2, "def" },
          { 3, "ghi" }
        }))
      {
        //
      }

      int count = 0;

      foreach (var item in 
        EnumerableEx.While(
          () => ++count < 10, Enumerable.Range(1, 100)))
      {
        // 
      }

 

but then I started to need a little more explanation for some of the less intuitive operators (e.g. Memoize?) and that’s when I found;

Bart’s Blog Posts on Rx

which walk through these EnumerableEx additions and are really, really helpful.

Firstly, there’s a great categorisation;

and secondly there’s posts that go through most if not all of these operators in category (2) in detail.

Fantastic Smile Here’s the breakdown;

There’s some pretty heavy-going material in there and some of it takes a second- or third- reading to try and figure out and I suspect I’ll be going back to it again to try and cement a bunch of what I read.

That leaves category (3) up above to explore – I’m going to wander through the Hands On Labs and pore over the questions being asked in the Rx Forums and I’m off to do a bit of experimentation.