Reactive Extensions “for the rest of us” (DevDays, Holland, Part 2)

I wanted to follow up on that previous post with the video for the Reactive Extensions talk from DevDays with a blow-by-blow account of the code that I ran through.

As an aside, since the talk I’ve updated my Rx libraries to pick up the very latest version and I’d recommend that you do the same before following along with this blog post. I downloaded from the web site and picked up the v1.0.10425 release that is the latest there.

A quick aside – if you’re looking for the assemblies having done this installation you need to go to your equivalent of c:\program files\microsoft reactive extensions sdk\ and not get stuck looking inside of c:\program files\microsoft cloud programmability which is where the previous versions resided.

As a reminder, here’s the video link again;

and you can download it in various formats from here;

http://channel9.msdn.com/Events/DevDays/DevDays-2011-Netherlands/Devdays014

and here’s a “brain dump” of everything that goes on in the session.

Demo 1 – the blocked enumerable

This is just a simple example with some WPF code which attempts to enumerate through all of the PNG files on my computer and display them in a ListBox. Because it uses a single thread and an enumerable it all gets blocked up and the UI grinds to a halt.

I’m not claiming that this is a problem that can’t be fixed without Rx, it was just a place where I could easily demonstrate how Rx might help.

And so initially, I use this enumeration to populate my ListBox;

      foreach (var pngFile in this.GetPngSearchResults())
      {
        this.Images.Add(pngFile);
      } 

and then subsequently I used this observable to populate my ListBox;

      var observable =
        this.GetPngSearchResults()
        .ToObservable(Scheduler.TaskPool);

      var interval =
        Observable.Interval(TimeSpan.FromMilliseconds(100));

      var subscription =
        observable
        .Zip(interval, (d,i) => d)
        .ObserveOn(SynchronizationContext.Current)
        .Subscribe(
          image => this.Images.Add(image));

and here’s the download for that code. You can define the symbol USE_RX to use the Rx version.

Demo 2 – Constructing, Subscribing, Disposing

In the second demo segment of the talk, I ran through a bunch of different ways in which you can create observables in Rx which I’ll reproduce below. These were run through in a .NET 4.0 console application and I referenced System.Reactive.dll in the new 10425 build and they generally make use of this little implementation of IObserver<T>;

class MyObserver<T> : IObserver<T>
  {
    ConsoleColor mainColor;

    public MyObserver(ConsoleColor mainColor = ConsoleColor.Cyan)
    {
      this.mainColor = mainColor;
    }
    public void OnCompleted()
    {
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.WriteLine("\tObservable Completed");
      Console.ResetColor();
    }

    public void OnError(Exception error)
    {
      Console.ForegroundColor = ConsoleColor.Green;
      Console.WriteLine("\tObservable Exception! Message {0}", error.Message);
      Console.ResetColor();
    }

    public void OnNext(T value)
    {
      Console.ForegroundColor = mainColor;
      Console.WriteLine("\tObservable Value {0}", value);
      Console.ResetColor();
    }
  }
Empty Sequence

Just showing that an empty sequence completes quickly Smile

class Program
  {
    static void Main(string[] args)
    {
      // Empty sequence
      var observable = Observable.Empty<int>();

      var subscription =
        observable.Subscribe(new MyObserver<int>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();
    }
  }
Endless Sequence

Showing that an endless sequence never completes (!);

 class Program
  {
    static void Main(string[] args)
    {
      // Empty sequence
      var observable = Observable.Never<int>();

      var subscription =
        observable.Subscribe(new MyObserver<int>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();
    }
  }
Single Value Sequence

Showing that we can easily construct a single valued sequence;

  class Program
  {
    static void Main(string[] args)
    {
      // Empty sequence
      var observable = Observable.Return<int>(122);

      var subscription =
        observable.Subscribe(new MyObserver<int>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();
    }
  }
A Sequence which Errors

Showing that we can cause OnError to fire;

class Program
  {
    static void Main(string[] args)
    {
      // Empty sequence
      var observable = Observable.Throw<int>(new Exception("Failed!!"));

      var subscription =
        observable.Subscribe(new MyObserver<int>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();
    }
  }
Constructing a Range
  class Program
  {
    static void Main(string[] args)
    {
      // Empty sequence
      var observable = Observable.Range(1, 20);

      var subscription =
        observable.Subscribe(new MyObserver<int>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();
    }
  }
Constructing via Generate
 static void Main(string[] args)
    {
      // Empty sequence
      var observable =
        Observable.Generate(
          "a",
          str => str.Length < 20,
          str => str + "a",
          str => str);

      var subscription =
        observable.Subscribe(new MyObserver<string>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();
    }
Turning an Enumerable into an Observable
 class Program
  {
    static void Main(string[] args)
    {
      // Empty sequence
      var observable = (new string[] { "abc", "def", "ghi" }).ToObservable();

      var subscription =
        observable.Subscribe(new MyObserver<string>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();
    }
  }
Constructing via Create – turning the Console characters into an Observable
    static void Main(string[] args)
    {
      // Empty sequence
      var observable =
        Observable.Create<char>(
          observer =>
          {
            while (true)
            {
              ConsoleKeyInfo keyInfo = Console.ReadKey(true);

              if (keyInfo.Key == ConsoleKey.X)
              {
                observer.OnCompleted();
                break;
              }
              observer.OnNext(keyInfo.KeyChar);
            }

            return (() => { });
          });

      var subscription =
        observable
        .SubscribeOn(Scheduler.TaskPool)
        .Subscribe(new MyObserver<char>());

      Console.WriteLine("Waiting...");
      (new AutoResetEvent(false)).WaitOne();
    }
Constructing a Sequence on an Interval
class Program
  {
    static void Main(string[] args)
    {
      var observable =
        Observable.Interval(TimeSpan.FromSeconds(1));

      var subscription =
        observable.Subscribe(new MyObserver<long>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subscription.Dispose();
    }
  }
Noting That We Have a Cold Observable – Second Subscription gets a new Sequence
    static void Main(string[] args)
    {
      var observable =
        Observable.Interval(TimeSpan.FromSeconds(1));

      var subscription =
        observable.Subscribe(new MyObserver<long>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      var subscription2 =
        observable.Subscribe(new MyObserver<long>(ConsoleColor.Green));

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subscription2.Dispose();

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subscription.Dispose();
    }

Demo 3 – Cold Observable to Hot Observable via Publish()

This was to show the difference between the last example in the previous section and this example;

  class Program
  {
    static void Main(string[] args)
    {
      IConnectableObservable<long> observable =
        Observable.Publish(
          Observable.Interval(TimeSpan.FromSeconds(1)));

      var connection =
        observable.Connect();

      var subscription =
        observable.Subscribe(new MyObserver<long>());

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      var subscription2 =
        observable.Subscribe(new MyObserver<long>(ConsoleColor.Green));

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      connection.Dispose();

      Console.WriteLine("Waiting...");
      Console.ReadLine();
    }
  }

Demo 4 – LINQ Operators

This was to show that I can use standard LINQ functions like Where and so on with my IObservable<T> and so there were just some simple examples built around the console character reading code that I had previously used;

    static void Main(string[] args)
    {
      // Empty sequence
      var observable =
        Observable.Create<char>(
          observer =>
          {
            while (true)
            {
              ConsoleKeyInfo keyInfo = Console.ReadKey(true);

              if (keyInfo.Key == ConsoleKey.X)
              {
                observer.OnCompleted();
                break;
              }
              observer.OnNext(keyInfo.KeyChar);
            }

            return (() => { });
          });

      var subscription =
        observable        
        .Where(ch => !("aeiou".Contains(ch)))
        .Skip(3)
        .Take(5)
        .Select(ch => Char.ToUpper(ch))
        .SubscribeOn(Scheduler.TaskPool)
        .Subscribe(new MyObserver<char>());

      Console.WriteLine("Waiting...");
      (new AutoResetEvent(false)).WaitOne();
    }

and use new operators like Timestamp and Timeout;

  class Program
  {
    static void Main(string[] args)
    {
      // Empty sequence
      var observable =
        Observable.Create<char>(
          observer =>
          {
            while (true)
            {
              ConsoleKeyInfo keyInfo = Console.ReadKey(true);

              if (keyInfo.Key == ConsoleKey.X)
              {
                observer.OnCompleted();
                break;
              }
              observer.OnNext(keyInfo.KeyChar);
            }

            return (() => { });
          });

      var subscription =
        observable        
        .Timestamp()
        .Do(tch => Console.WriteLine("\tobserved value at {0}", tch.Timestamp))
        .Select(tch => tch.Value)
        .Timeout(TimeSpan.FromSeconds(5))
        .Select(ch => Char.ToUpper(ch))
        .SubscribeOn(Scheduler.TaskPool)
        .Subscribe(new MyObserver<char>());

      Console.WriteLine("Waiting...");
      (new AutoResetEvent(false)).WaitOne();
    }

Demo 5 – Errors

I attempted to show that I could turn the Error that the Timeout below causes into a regular sequence completion by using Catch although I managed to make a mess of it in the live session Smile 

    static void Main(string[] args)
    {
      // Empty sequence
      var observable =
        Observable.Create<char>(
          observer =>
          {
            while (true)
            {
              ConsoleKeyInfo keyInfo = Console.ReadKey(true);

              if (keyInfo.Key == ConsoleKey.X)
              {
                observer.OnCompleted();
                break;
              }
              else if (keyInfo.Key == ConsoleKey.E)
              {
                observer.OnError(new Exception("Failed!"));
                break;
              }
              observer.OnNext(keyInfo.KeyChar);
            }

            return (() => { });
          });

      var subscription =
        observable        
        .Timeout(TimeSpan.FromSeconds(5))
        .Select(ch => Char.ToUpper(ch))        
        .SubscribeOn(Scheduler.TaskPool)
        .Catch("sequence has ended".ToObservable())
        .Subscribe(new MyObserver<char>());

      Console.WriteLine("Waiting...");
      (new AutoResetEvent(false)).WaitOne();
    }

I could equally have use Observable.Empty<char>() in my call to Catch() in order to simply complete the sequence with no additional sequence elements.

I also illustrated Retry;

    static void Main(string[] args)
    {
      // Empty sequence
      var observable =
        Observable.Create<char>(
          observer =>
          {
            while (true)
            {
              ConsoleKeyInfo keyInfo = Console.ReadKey(true);

              if (keyInfo.Key == ConsoleKey.X)
              {
                observer.OnCompleted();
                break;
              }
              else if (keyInfo.Key == ConsoleKey.E)
              {
                observer.OnError(new Exception("Failed!"));
                break;
              }
              observer.OnNext(keyInfo.KeyChar);
            }

            return (() => { });
          });

      var subscription =
        observable        
        .Retry(3)
        .Select(ch => Char.ToUpper(ch))        
        .SubscribeOn(Scheduler.TaskPool)
        .Catch("sequence has ended".ToObservable())
        .Subscribe(new MyObserver<char>());

      Console.WriteLine("Waiting...");
      (new AutoResetEvent(false)).WaitOne();
    }

Demo 6 – Subject<T>

I illustrated that I can easily “make a sequence” using Subject<T>;

    static void Main(string[] args)
    {
      Subject<long> subject = new Subject<long>();

      var subscription =
        subject.Subscribe(
          value => Console.WriteLine("Value produced {0}", value),
          () => Console.WriteLine("Completed"));

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subject.OnNext(10);

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subject.OnNext(20);

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subject.OnCompleted();

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subscription.Dispose();
    }
  }

and that the Subject<T> can subscribe to things;

    static void Main(string[] args)
    {
      var observable =
        Observable.Interval(TimeSpan.FromSeconds(1));

      Subject<long> subject = new Subject<long>();

      var subscriptionA = 
        observable.Subscribe(subject);

      var subscriptionB =
        subject.Subscribe(
          value => Console.WriteLine("Value produced {0}", value),
          () => Console.WriteLine("Completed"));

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subject.OnNext(10);

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subject.OnNext(20);

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subject.OnCompleted();

      Console.WriteLine("Waiting...");
      Console.ReadLine();

      subscriptionA.Dispose();
    }
  }
“The Kettle Example”

I also tried to illustrate using Subject<T> by showing my simple “fill the kettle” example;

image

You can grab the source code for that example here and the particular file of interest is called KettleViewModel.show.cs.

Demo 6 – The Mouse Events Example

This attempts to capture mouse events. It’s a very simple example where you drag some images out onto this WPF window;

image

and then capture mouse events to drag those images around.

The code for that sample is here for download.

Demo 7 – The FlickR Search Example

The final example that I showed was making a FlickR search from a WPF application;

image

and the source code for that sample is here for download.