Mike Taulty's Blog
Bits and Bytes from Microsoft UK
Reactive Extensions for .NET ( “stuff happens” )

Blogs

Mike Taulty's Blog

Elsewhere

Archives

I’ve been taking a look at the Reactive Extensions for .NET. It’s early days for me at this point but I’m finding what I’m seeing to be really interesting.

This is code that’s available from the DevLabs;

image

and there are versions for .NET 3.5 Sp1, .NET 4.0, Silverlight 3, Silverlight 4 and also for JavaScript.

The essence of the library is around consuming asynchronous and event-based work using observable collections that “push” items of interest at a composable set of observers.

Update to post  – I highly recommend the Hands On Labs for Rx which should be renamed “the instruction manual” Winking smile 

I started with this video where Wes Dyer does a great job of drawing you in to the work that they’re doing;

image

I found that there’s a point in this video where there’s a leap from 0 to 60 in 3-4 seconds as a couple of base concepts are suddenly expanded into the “drag and drop example” and it’s a little on the mind-bending side.

I followed that up with a quick subscription to the team blog and then watched a bunch of other videos;

I also found the Rx Wiki;

Reactive Framework (Rx) Wiki

and I particularly like it’s categorisation page for the “operators” because that was something that I really needed and this provides a good starting point for it.

I also watched the PDC 09 session;

image

That was enough to encourage me to do a download for Silverlight 4. I had a quick poke around in System.Observable.dll;

( which I found in c:\program files\microsoft cloud programmability\reactive extensions )

image

That seems so simple and yet so much has already been built on top of it in the reactive extensions libraries.

An IObserver<T> can subscribe to something happening on an IObservable<T> and get notified when something happens (OnNext), when something fails (OnError) and when something ends (OnCompleted).

It’s interesting to contrast that with IEnumerable<T> and IEnumerator<T>;

image

and that made me think of these two being very closely paired like these two;

If I have an Enumerable I can ask it for an Enumerator and I manually pull values from that Enumerator until it tells me that there are no values left.

Where that’s more “challenging” is if the values being enumerated aren’t “ready” at the time that you’d like to enumerate them. There’s no AsyncEnumerator so the best thing that the producer of the sequence could do is to have their iteration of MoveNext() block until a value is ready. Not-so-pleasant.

IObserver<T> feels like the inverse of IEnumerator<T> in that rather than the consumer pulling values from the producer via MoveNext() the consumer registers a callback with the producer and the producer can then produce values whenever it suits them. That’s likely to work much better for asynchronous production of sequences.

I needed to start simple and so I referenced System.Reactive.dll and System.CoreEx.dll and I wrote a tiny bit of code;

  public partial class MainPage : IObserver<int>
  {
    IObservable<int> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable = Observable.Empty<int>();

        IDisposable subscription = observable.Subscribe(this);

        // who calls Dispose() and when? 
        //   .  can't call it here because my subscription would die
        //   .  would naturally call it in OnCompleted() and OnError() but
        //      appears to be unnecessary as it's called for me after
        //      OnCompleted() in some "auto-cleanup when done" mode.
      };
    }
    public void OnCompleted()
    {
      Dispatcher.BeginInvoke(() =>
        {
          MessageBox.Show("Done");
        });
    }
    public void OnError(Exception exception)
    {
      Dispatcher.BeginInvoke(() =>
        {
          MessageBox.Show(string.Format("Problemo! {0}", exception.Message));
        });
    }
    public void OnNext(int value)
    {
      Dispatcher.BeginInvoke(() =>
        {
          MessageBox.Show(string.Format("Received value {0}", value));
        });
    }
  }
Ok, so I’m just subscribing to an empty sequence of integers and displaying a message when that sequence has been exhausted ( which is likely to be immediately ) and there are ways to shorten the code a lot but I found it “interesting” to think about the lifetime of the observable and the subscription that is observing it.

Two ways to shorten the code – I don’t have to implement IObserver<T> as I can just provide lambdas that execute the OnNext/OnError/OnCompleted actions. I can also avoid Dispatcher.BeginInvoke by composing the call to Subscribe() with another call that moves the delivery of the notifications to the UI synchronisation context as in;

 public partial class MainPage 
  {
    IObservable<int> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable = Observable.Empty<int>();

        IDisposable subscription =
          observable.
            ObserveOnDispatcher().
            Subscribe(
              x => MessageBox.Show(string.Format("Received value {0}", x)),
              err => MessageBox.Show(string.Format("Problemo {0}", err.Message)),
              () => MessageBox.Show("Done"));
      };
    }
  }

watching the “empty sequence” is not so exciting though and there’s a pre-canned method called Interval() which will generate values for me on an interval so I figured I might take those values, square them and display them ( here represented by using MessageBox );
  public partial class MainPage 
  {
    IObservable<long> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable =
          Observable.Interval(new TimeSpan(0, 0, 3));

        observable.
          ObserveOnDispatcher().
          Select(x => new { Value = x + 1, Square = (x + 1) * (x + 1) }).
          Subscribe(
            x => MessageBox.Show(string.Format("Received value {0} square {1}",
              x.Value, x.Square)));
      };
    }
  }

“Interesting” but I might not want to do that forever, so maybe I’d like 10 seconds worth of that data;

 public partial class MainPage 
  {
    IObservable<long> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable =
          Observable.Interval(new TimeSpan(0, 0, 3));

        observable.          
          Select(x => new { Value = x + 1, Square = (x + 1) * (x + 1) }).
          TakeUntil(Observable.Interval(new TimeSpan(0, 0, 10))).
          ObserveOnDispatcher().
          Subscribe(
            x => MessageBox.Show(string.Format("Received value {0} square {1}",
              x.Value, x.Square)),
            () => MessageBox.Show("Done"));
      };
    }
  }
That TakeUntil() is interesting in the sense that it takes another IObservable and continues its observation until that second IObservable produces some “value” which, in this case, will be after 10 seconds. I used Observable.Interval but I could have used Observable.Timer as well I think.

That then that had me thinking about two things – the old “if a tree falls in the forest…” thing;

in the sense of “is my observable producing values if no-one is listening?” and also what happens if I’m dynamically creating subscriptions here? Are those subscriptions seeing the same values? So I added a means via which a UI button could dynamically make another subscription…

 

  public partial class MainPage 
  {
    IObservable<long> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable =
          Observable.Interval(new TimeSpan(0, 0, 3));
      };
    }
    void OnUISubscribeButtonClick(object sender, RoutedEventArgs e)
    {
      int subNumber = ++this.subscriptionNumber;

      observable.
        Select(x => new { Value = x + 1, Square = (x + 1) * (x + 1) }).
        TakeUntil(Observable.Interval(new TimeSpan(0, 0, 10))).
        ObserveOnDispatcher().
        Subscribe(
          x => MessageBox.Show(string.Format("Sub {2} received value {0} square {1}",
          x.Value, x.Square, subNumber)),
          () => MessageBox.Show(string.Format("Sub {0} done", subNumber)));
    }
    int subscriptionNumber;
  }
and I found that “very interesting” and specifically for the fact that each subscription displayed (approx) the values;

 

  • 1, 1
  • 2, 4
  • 3, 9

before ending. But why did they each re-start at 1,1? This led me back to the “Hot and Cold Observables” video;

which then made more sense to me – I’m producing a cold observable here that gets produced ( quoting Wes on that video );

  • “Only when someone subscribes and each time someone subscribes”

I pondered for quite a while as to how I’d produce a hot observable and built something that was using a DispatcherTimer;

  public partial class MainPage 
  {
    DispatcherTimer timer;
    IObservable<IEvent<EventArgs>> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        timer = new DispatcherTimer();
        timer.Interval = new TimeSpan(0, 0, 2);
        timer.Start();

        observable =
          Observable.FromEvent(
              (EventHandler<EventArgs> handler) => new EventHandler(handler),
              ev => timer.Tick += ev,
              ev => timer.Tick -= ev).
            SubscribeOnDispatcher();
      };
    }
    void OnUISubscribeButtonClick(object sender, RoutedEventArgs args)
    {
      int subNumber = ++this.subscriptionNumber;

      observable.
        TakeUntil(Observable.Timer(new TimeSpan(0, 0, 10))).
        ObserveOnDispatcher().
        Subscribe(
          e => MessageBox.Show(string.Format("Sub {0} notified", subNumber)),
          () => MessageBox.Show(string.Format("Sub {0} done", subNumber)));
    }
    int subscriptionNumber;
  }

Now, DispatcherTimer.Tick is a weird event to subscribe to because it doesn’t produce any data but it highlighted the need to use SubscribeOnDispatcher() on line 21 above. If I don’t use this code I found that the lambda that was being used to unsubscribe from the Tick event was being run on some worker thread when the TakeUntil clause caused the subscription to automatically stop. This caused the DispatcherTimer to blow up so SubscribeOnDispatcher() comes into play to ensure that both subscribing and unsubscribing is done in the right synchronisation context and so the thread affinity of the DispatcherTimer isn’t broken.

But I think that’s a hot observable. A more realistic one might be to monitor keypresses;

  public partial class MainPage 
  {
    IObservable<IEvent<KeyEventArgs>> observable;

    public MainPage()
    {
      InitializeComponent();

      this.Loaded += (s, e) =>
      {
        observable =
          Observable.FromEvent<KeyEventArgs>(this, "KeyDown").
            SubscribeOnDispatcher();
      };
    }
    void OnUISubscribeButtonClick(object sender, RoutedEventArgs args)
    {
      int subNumber = ++this.subscriptionNumber;

      observable.
        TakeUntil(Observable.Timer(new TimeSpan(0, 0, 30))).
        ObserveOnDispatcher().
        Subscribe(
          e => MessageBox.Show(string.Format("Sub {0} notified of key code {1}", 
            subNumber, e.EventArgs.PlatformKeyCode)),
          () => MessageBox.Show(string.Format("Sub {0} done", subNumber)));
    }
    int subscriptionNumber;
  }
and that’s definitely a hot observable.

Looking at the wiki page again, I wanted to dig a little more into some of these “operators” – there’s tonnes of them and I found that the RxSandbox was a nice app for letting me get an explanation, a code sample and a “marble diagram” for each operator – very neat. I downloaded it, ran it (but didn’t compile the source as I wasn’t 100% sure if it was compatible with the current version of Rx).

image

There’s a lot of “operators” here that are already built and it feels like a task to figure out what they all are and what they do – they seem to fall into 3 categories;

  1. Familiar ‘operators’ from LINQ IEnumerable<T> – example here would be Count
  2. Operators new on IObservable<T> which are also applicable to IEnumerable<T> and so are implemented (by a class called EnumerableEx) in System.Interactive.dll – example here would be BufferWithCount.
  3. Operators new on IObservable<T> – example here would be BufferWithTime.

I started to play a little with (2) above on enumerables in the first place in that I wrote various bits of code to try and figure some of these out.

Things like using BufferWithCount to chunk up enumerables and Generate to create one in the first place;

 

   foreach (var item in 
        EnumerableEx.Generate(0, i => i < 10, i => i, i => i + 1).
        BufferWithCount(3))
      {
        Debug.Assert(item.Count <= 3);
        foreach (var innerItem in item)
        {
          //
        }
      }

 

and things like Throw/Catch/Finally for handling an exception in the production of an enumerable and dealing with it by returning an alternate enumerable ( also using Generate in there as well );

 

      foreach (var item in
        EnumerableEx.
          Generate(0, (int i) => i < 10, i => i, i => i + 1).
          Concat(
            EnumerableEx.Throw<int>(new ApplicationException())).
          Catch(
            (ApplicationException ex) => EnumerableEx.Return(11)).
          Finally(
            () => MessageBox.Show("Done")))
      {
        // ...
      }

 

and OnErrorResumeNext feels similar and also reminds me of VB6 :-S and, again, allows for the continuation of an enumerable from another enumerable in a situation where an error occurs in the production of the first enumerable;

 

      foreach (var item in
        EnumerableEx.
          Generate(0, 
            (int i) => i < 10, 
            i => i, 
            i => 
              {
                if (i == 5)
                {
                  throw new ApplicationException();
                }
                return(i + 1);
              })
          .OnErrorResumeNext(
            EnumerableEx.Return(101)))
      {
        // ... 
      }
and I can do some control-flow style things with Case, If, IfEmpty, While,

 

 

      foreach (var item in EnumerableEx.If(
        () => true,
        new string[] { "this", "array", "gets", "chosen" },
        new string[] { "this", "array", "does", "not" }))
      {
        //  
      }

      foreach (var item in
        EnumerableEx.Case(
        () => 3,
        new Dictionary<int, IEnumerable<char>>()
        {
          { 1, "abc" },
          { 2, "def" },
          { 3, "ghi" }
        }))
      {
        //
      }

      int count = 0;

      foreach (var item in 
        EnumerableEx.While(
          () => ++count < 10, Enumerable.Range(1, 100)))
      {
        // 
      }

 

but then I started to need a little more explanation for some of the less intuitive operators (e.g. Memoize?) and that’s when I found;

Bart’s Blog Posts on Rx

which walk through these EnumerableEx additions and are really, really helpful.

Firstly, there’s a great categorisation;

and secondly there’s posts that go through most if not all of these operators in category (2) in detail.

Fantastic Smile Here’s the breakdown;

There’s some pretty heavy-going material in there and some of it takes a second- or third- reading to try and figure out and I suspect I’ll be going back to it again to try and cement a bunch of what I read.

That leaves category (3) up above to explore - I’m going to wander through the Hands On Labs and pore over the questions being asked in the Rx Forums and I’m off to do a bit of experimentation.


Posted Wed, Aug 18 2010 3:17 PM by mtaulty
Filed under: , ,

Comments

Luciano Evaristo Guerche (Gorše) wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Wed, Aug 18 2010 8:21 PM

Mike,

I am wondering there be a concurrent Observable<T> so that the observable collection be fed in parallel (PLINQ, TPL, etc). Have you undertaken any research on a topic like that?

Cheers,

Luciano Evaristo Guerche (Gorše) wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Wed, Aug 18 2010 8:28 PM

Really wondering how Parallelism + RX would fit together (parallel task pushing data into Observable<T> or Observable<T> selecting from PLINQ or whatever scenario I am not able to foresee right now)

Jafar Husain wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Wed, Aug 18 2010 9:01 PM

Welcome to the initiated.  I think you'll find Rx highly applicable to Silverlight programming.  It's become an indispensable tool and I'm now using Rx for javascript it to tame AJAX.

Steve Strong wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Wed, Aug 18 2010 10:37 PM

Mike,  

Push Me - Pull You,  great reference from Dr. Dolittle..

Thanks for putting this together,  Many time I have wanted to ig into Rx, because I believe it can simplify the async coding model for silverlight and phone applications, but I could never think of a set of cases to try.  You have given us a good place to start.  Thanks again.

Paul Betts wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Thu, Aug 19 2010 5:55 AM

Great post! This really sums up a lot of good resources on Rx (especially the Rx Sandbox, that app is great and not well-known). Since you're hacking on Silverlight, you should also check out my Rx+MVVM library that I've been working on for awhile now, it works for SL4, .NET 4 and .NET 3.5 (just like Rx):

github.com/.../ReactiveXaml

My blog is also a great place to learn about Rx and RxXaml, check out the Reactive Extensions category.

Paul Betts wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Thu, Aug 19 2010 5:59 AM

Great post! This really sums up a lot of good resources on Rx (especially the Rx Sandbox, that app is great and not well-known). Since you're hacking on Silverlight, you should also check out my Rx+MVVM library that I've been working on for awhile now, it works for SL4, .NET 4 and .NET 3.5 (just like Rx):

github.com/.../ReactiveXaml

My blog is also a great place to learn about Rx and RxXaml, check out the Reactive Extensions category.

mtaulty wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Thu, Aug 19 2010 9:11 AM

Luciano,

On the question of concurrency and Rx - these two topics are very much linked as you suggest.

The Rx framework is using the Parallel Extensions - there's some info on that over here channel9.msdn.com/.../Wes-Dyer-and-Stephen-Toub-Rx-and-Px-Working-Together

Mike.

DotNetShoutout wrote Reactive Extensions for .NET “stuff happens” - Mike Taulty
on Thu, Aug 19 2010 10:19 AM

Thank you for submitting this cool story - Trackback from DotNetShoutout

Joe wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Thu, Aug 19 2010 12:43 PM

I'm wondering how far Rx will go?  Will it end up performing evaluations from changing resultsets (live resultsets?).  Like CLINQ does now?

Will IObservable replace INotifyPropertyChanged?

Luciano Evaristo Guerche (Gorše) wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Thu, Aug 19 2010 1:37 PM

Mike,

Thanks for the link to "Wes Dyer and Stephen Toub: Rx and Px - Working Together" channel9.msdn.com/.../Wes-Dyer-and-Stephen-Toub-Rx-and-Px-Working-Together

I'll watch for sure.

WayneBee wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Sat, Aug 21 2010 6:02 PM

I have been trying to learn Rx so that I can do publish/subscribe scenarios with it, but I haven't seen one example of this.

I think this is a common request, can you perhaps do an example of this for us Mike? Here's an example of what I'm looking for:

social.msdn.microsoft.com/.../74d41079-64d3-4a95-a174-fb7ab71f009d

MarcinNajder wrote re: Reactive Extensions for .NET ( “stuff happens” )
on Thu, Aug 26 2010 8:03 AM

New version of RxSandbox

mnajder.blogspot.com/.../rxsandbox-v1.html

Changes: solution converted to VS2010, Net 4.0; 56 operators, grouping operators on the tree control; zoom

Mike Taulty's Blog wrote Reactive FlickR Searching…
on Tue, Feb 1 2011 9:59 AM

A bit of an experiment… I was tinkering with some code the other day to pull down some images from FlickR

Mike Taulty's Blog wrote Silverlight and NESL Redux–Touch
on Wed, Feb 2 2011 12:04 AM

Stephen pointed out last week that there’s a preview update to the “Native Extensions to Silverlight

Mike Taulty's Blog wrote Reactive Extensions “for the rest of us” (DevDays, Holland, Part 1)
on Tue, May 3 2011 9:01 AM

I was lucky to be able to spend part of last week at the ever impressive DevDays conference out at The

Mike Taulty's Blog wrote Reactive Extensions RTW
on Thu, Jun 30 2011 2:52 PM

I’m a big fan of the Reactive Extensions (Rx) and I’ve put together a post or two in the past and also

Chaves wrote Programação Reactiva (RX)
on Sun, Jul 3 2011 7:25 PM

Habituados a programar de uma forma imperativa onde estruturamos as acções da mesma forma que “pensamos

Chaves wrote Programação Reactiva (RX)
on Sun, Jul 3 2011 8:24 PM

Habituados a programar de uma forma imperativa onde estruturamos as acções da mesma forma que “pensamos

weight loss wrote weight loss
on Tue, Nov 11 2014 3:32 PM

Reactive Extensions for .NET ( “stuff happens” ) - Mike Taulty's Blog - Mike Taulty's Blog