Kinect for Windows V2 SDK: Treating Skeletal Data as a Sequence of Sequences

In experimenting with the Kinect for Windows V2 SDK, something that I keep returning to is trying to come up with a decent way of handling the scenarios where bodies enter/leave the sensor’s tracking.

The SDK team has thought about this and made all the right bits to deal with it but I’ve found myself more than once trying to figure out a way to handle scenarios like;

  1. Body A enters the sensor’s view
  2. Body B enters the sensor’s view
  3. Body A leaves the sensor’s view
  4. Body A returns

and so on – the general idea is that while the sensor is up and running it’s going to see a lot of bodies entering/leaving its field of vision and it’d be nice to come up with a good way of handling that.

When tracking skeletal data, the SDK bits deliver 30 fps into a developer’s code with each frame containing an array of Body instances for 6 bodies although if there’s less than 6 bodies being watched by the sensor a number of those array entries will flag themselves as “not being tracked”.

What’s a bit of a struggle there is that at step 1 above the body may show up in any array element – e.g. slot index 2. Then at step 2 that second body could show up in e.g. slot index 4. Then in step 4 the original body could come back in slot index 0. That means that, as a developer, you have to watch all these array entries to see if there’s body data present in them and then act appropriately.

I wondered whether it would be good to think of all the frames that are acquired from when a body arrives in front of the sensor (step 1) through to when it departs (step 3) as a sequence of Body entries. That is – as an IObservable<Body>.

That seems like a small leap and I made that leap before in an earlier post.

What I hadn’t done in that post though was to take that one level “up” and to think of the sensor as providing a sequence of sequences of body entries. That is – the sensor can be viewed as producing an IObservable<IObservable<Body>>.

I played with this a little with the class below which is essentially managing a Dictionary<ulong,IObservable<Body>> where the ulong is the tracking ID given to a body by the SDK.

The idea of this is that when the SDK reports a new uniquely tracked body, this code creates a new Subject<Body> for it, slots it into the dictionary and then “publishes” all subsequent frames for that tracking ID via that Subject<Body>.

Should the tracking ID in question disappear from a BodyFrame then the code will mark that particular sequence as being completed and remove it from the dictionary.

namespace RecognitionK
{
  using Microsoft.Kinect;
  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Reactive.Subjects;
  using System.Threading;

  public class ObservableBodySource : IDisposable
  {
    public ObservableBodySource(bool trackEntireBodies = true)
    {
      this.trackEntireBodies = trackEntireBodies;
      this.lockObject = new object();
    }
    public IObservable&lt;IObservable&lt;Body&gt;&gt; Open()
    {
      if (this.sensor != null)
      {
        throw new InvalidOperationException(&quot;DataSource is already open&quot;);
      }
      this.subBodies = new Subject&lt;IObservable&lt;Body&gt;&gt;();
      this.observables = new Dictionary&lt;ulong, Subject&lt;Body&gt;&gt;();

      this.sensor = KinectSensor.GetDefault();
      this.sensor.Open();

      this.bodies = new Body[this.sensor.BodyFrameSource.BodyCount];
      this.bodyReader = this.sensor.BodyFrameSource.OpenReader();
      this.bodyReader.FrameArrived += this.OnFrameArrived;

      return (this.subBodies);
    }
    void OnFrameArrived(object sender, BodyFrameArrivedEventArgs e)
    {
      if ((e.FrameReference != null) &amp;&amp;
        Monitor.TryEnter(this.lockObject))
      {
        using (var frame = e.FrameReference.AcquireFrame())
        {
          frame.GetAndRefreshBodyData(this.bodies);

          // we need bodies that are tracked and, dependending on the flag, we
          // might want every single joint to report itself as tracked.
          var trackedBodies = this.bodies.Where(
            b =&gt;
              (b.IsTracked) &amp;&amp;
              ((!this.trackEntireBodies) ||
              (b.Joints.All(j =&gt; j.Value.TrackingState == TrackingState.Tracked))));

          this.ProcessNewlyArrivedBodies(trackedBodies);

          this.ProcessNewlyDepartedBodies(trackedBodies);

          this.PublishFramesForTrackedBodies(trackedBodies);
        }
        Monitor.Exit(this.lockObject);
      }
    }
    void PublishFramesForTrackedBodies(IEnumerable&lt;Body&gt; trackedBodies)
    {
      foreach (var body in trackedBodies)
      {
        this.observables[body.TrackingId].OnNext(body);
      }
    }    
    void ProcessNewlyArrivedBodies(IEnumerable&lt;Body&gt; trackedBodies)
    {
      var newBodies = trackedBodies.Where(
        b =&gt; !this.observables.ContainsKey(b.TrackingId));

      foreach (var newBody in newBodies)
      {
        var newSubject = new Subject&lt;Body&gt;();
        this.observables[newBody.TrackingId] = newSubject;
        this.subBodies.OnNext(newSubject);
      }
    }
    void ProcessNewlyDepartedBodies(IEnumerable&lt;Body&gt; trackedBodies)
    {
      var oldBodies = this.observables.Keys
        .Where(
          trackingId =&gt; !trackedBodies.Any(b =&gt; b.TrackingId == trackingId))
        .ToList();

      foreach (var oldBody in oldBodies)
      {
        this.observables[oldBody].OnCompleted();
        this.observables.Remove(oldBody);
      }
    }
    public void Dispose()
    {
      this.Dispose(true);
      GC.SuppressFinalize(this);
    }
    ~ObservableBodySource()
    {
      this.Dispose(false);
    }
    void Dispose(bool disposing)
    {
      if (disposing)
      {
        lock (this.lockObject)
        {
          // NB: not attempting to dispose the sequence that was returned
          // from Open() nor any sequence that has been returned as part
          // of it - ownership of that sequence has to live with calling
          // code.
          if (this.bodyReader != null)
          {
            this.bodyReader.FrameArrived -= this.OnFrameArrived;
            this.bodyReader.Dispose();
            this.bodyReader = null;
          }
          if (this.sensor != null)
          {
            this.sensor.Close();
            this.sensor = null;
          }
        }
      }
    }
    Subject&lt;IObservable&lt;Body&gt;&gt; subBodies;
    Dictionary&lt;ulong, Subject&lt;Body&gt;&gt; observables;
    Body[] bodies;
    KinectSensor sensor;
    BodyFrameReader bodyReader;
    bool trackEntireBodies;
    object lockObject;
  }
}

I’m not sure how generally useful that could be or whether it just happens to suit the particular purpose that I’m currently trying to put it to but I thought I’d share it either way.

I can then consume this code from something like a console app by doing;

namespace TestApp
{
  using RecognitionK;
  using System;
  using System.Collections.Generic;

  class Program
  {
    static void Main(string[] args)
    {
      ObservableBodySource source = new ObservableBodySource(true);

      var bodiesSequence = source.Open();
      var bodySubscriptions = new List&lt;IDisposable&gt;();

      var bodiesSubscription = bodiesSequence.Subscribe(
        bodySequence =&gt;
        {
          Console.WriteLine(&quot;Gained body&quot;);
          IDisposable innerSub = null;

          innerSub = bodySequence.Subscribe(
            body =&gt;
            {
              Console.WriteLine(&quot;Frame from body {0}&quot;, body.TrackingId);
            },
            () =&gt;
            {
              bodySubscriptions.Remove(innerSub);
              innerSub.Dispose();
              Console.WriteLine(&quot;Lost body&quot;);
            });

          bodySubscriptions.Add(innerSub);
        });

      Console.ReadLine();

      foreach (var sub in bodySubscriptions)
      {
        sub.Dispose();
      }
      bodiesSubscription.Dispose();
      
      source.Dispose();
    }
  }
}

and that seems to work nicely enough although, naturally, there might be a few gremlins lurking in there somewhere.

Kinect for Windows V2 SDK: Hello (BodyIndex, Infra Red) World for the .NET WPF Developer

I’ve been writing these posts about my beginner’s experiments with the Kinect for Windows V2 SDK;

with reference to the official materials up on Channel 9;

Programming-Kinect-for-Windows-v2

I realised that I hadn’t done anything with;

  • The Infra Red data source that comes from the sensor – this one probably speaks for itself and is documented here.
  • The Body Index data source that comes from the sensor – for the (up to) 6 bodies that a sensor is tracking, this source gives you frames that contain a 2D grid where each co-ordinate gives you a simple 0-5 integer representing which body (if any)  the sensor associates with that co-ordinate.

I also realised that I hadn’t tried to do anything where I link together multiple data sources to produce some kind of combined view.

I figured that what I could do is take the IR data and display it but correlate it with the body index data to remove any values that didn’t relate to a tracked body (this is far from an original idea – there’s samples like this everywhere although they sometimes use the colour video frames rather than the infra-red frames).

I stuck with using a bit of the reactive extensions as per my previous post and started off with a little WPF UI to display 2 images;

image

Simple enough stuff – a wallpaper image underneath another image that I have named myImage. From there, I wrote some code to run when the UI has finished loading;

    void OnLoaded(object sender, RoutedEventArgs e)
    {
      this.OpenSensor();
      this.CaptureFrameDimensions();
      this.CreateBitmap();
      this.CreateObservable();

      this.obsFrameData
        .SubscribeOn(TaskPoolScheduler.Default)
        .ObserveOn(SynchronizationContext.Current)
        .Subscribe(
          fd =>
          {
            this.CopyFrameDataToBitmap(fd);
          }
      );
    } 

I’m hoping that this is a fairly “logical” structure. First off, I open up the sensor which I keep around in a member variable;

    void OpenSensor()
    {
      // get the sensor
      this.sensor = KinectSensor.GetDefault();
      sensor.Open();
    }

and then I try and figure out what the dimensions are on the frames that I’m planning to deal with – the body index frames and the infra-red frames and I keep a few things around in member variables again;

    void CaptureFrameDimensions()
    {
      this.irFrameDesc = this.sensor.InfraredFrameSource.FrameDescription;
      this.biFrameDesc = this.sensor.BodyIndexFrameSource.FrameDescription;

      this.frameRect = new Int32Rect(
        0,
        0,
        this.irFrameDesc.Width,
        this.irFrameDesc.Height);
    }

and then I create a WriteableBitmap which can be the source for the Image named myImage in my UI;

    void CreateBitmap()
    {
      this.bitmap = new WriteableBitmap(
        this.irFrameDesc.Width,
        this.irFrameDesc.Height,
        96,
        96,
        PixelFormats.Bgra32,
        null);

      this.myImage.Source = this.bitmap;
    }

and then finally I attempt to create an observable sequence of frames of data which contain both the infra-red frame and the body index frame. To do that, I made a simple class to hold both arrays of data;

    class FrameData
    {
      public ushort[] IrBits { get; set; }
      public byte[] BiBits { get; set; }

      public bool IsValid
      {
        get
        {
          return (this.IrBits != null & this.BiBits != null);
        }
      }
    }

and then I attempted to make an observable sequence of these;

    void CreateObservable()
    {
      this.indexFrameReader = sensor.OpenMultiSourceFrameReader(
        FrameSourceTypes.Infrared | FrameSourceTypes.BodyIndex);

      var events = Observable.FromEventPattern<MultiSourceFrameArrivedEventArgs>(
        handler => this.indexFrameReader.MultiSourceFrameArrived += handler,
        handler => this.indexFrameReader.MultiSourceFrameArrived -= handler);

      // lots of allocations here, going for a simple approach and hoping that 
      // the GC digs me out of the hole I'm making for myself 🙂
      this.obsFrameData = events
         .Select(
           ev => ev.EventArgs.FrameReference.AcquireFrame())
         .Where(
           frame => frame != null)
         .Select(
           frame =>
           {
             ushort[] irBits = null;

             byte[] biBits = null;
             using (InfraredFrame ir = frame.InfraredFrameReference.AcquireFrame())
             {
               using (BodyIndexFrame bi = frame.BodyIndexFrameReference.AcquireFrame())
               {
                 irBits = ir == null ? null : new ushort[this.irFrameDesc.LengthInPixels];
                 biBits = ((bi == null) || (irBits == null)) ? null : new byte[this.biFrameDesc.LengthInPixels];

                 if ((irBits != null) && (biBits != null))
                 {
                   ir.CopyFrameDataToArray(irBits);
                   bi.CopyFrameDataToArray(biBits);
                 }
               }
             }
             return (
               new FrameData
               {
                 IrBits = irBits,
                 BiBits = biBits
               }
             );
           }
         )
         .Where(
          fd => fd.IsValid);
    }

That’s quite a long bit of code Confused smile As in my previous post, I’ve taken the decision to allocate arrays for each frame that I get off the sensor and “live with the consequences” which (so far) has worked out fine running on my meaty i7 laptop with lots of RAM. I’m actually hoping that the GC mostly deals with it for me given how short the lifetime of these arrays is going to be.

So, effectively, this is just trying to use a MultiSourceFrameReader to bring back frames from both the Infrared and BodyIndex data sources at the same time. Where both of those frames can be acquired, this code produces a new instance of my FrameData class with the members IrBits and BiBits containing copies of the data that was present in those frames.

It looks a bit “wordy” but that’s what the intention is and most of the code is really there just to make sure that the code is acquiring the 2 frames that I’m asking the sensor for so it’s a couple of AcquireFrame() calls plus some null reference checks.

Once that observable sequence is set up, it gets stored into a member variable and then I consume it from that OnLoaded() method (in the first code sample above) and that, essentially, is routing the captured frame data through to a method called CopyFrameDataToBitmap which looks like;

    void CopyFrameDataToBitmap(FrameData frameData)
    {
      this.bitmap.Lock();

      unsafe
      {
        var pBackBuffer = this.bitmap.BackBuffer;

        for (int i = 0; i < frameData.IrBits.Length; i++)
        {
          // pixels not related to a body disappear...
          UInt32 colourValue = 0x00000000;

          int bodyIndex = frameData.BiBits[i];
          
          if (bodyIndex < BODY_COUNT)
          {
            // throwing away the lower 8 bits to give a 0..FF 'magnitude'
            UInt32 irTopByte = (UInt32)(frameData.IrBits[i] >> 8);

            // copy that value into red, green, blue slots with FF alpha
            colourValue = 0xFF000000 + irTopByte;
            colourValue |= irTopByte << 8;
            colourValue |= irTopByte << 16;

            // apply a mask to pickup the colour for the particular body.
            colourValue &= colourMasks[bodyIndex];
          }
          UInt32* pBufferValue = (UInt32*)(pBackBuffer + i * 4);
          *pBufferValue = colourValue;
        }
      }
      this.bitmap.AddDirtyRect(this.frameRect);

      this.bitmap.Unlock();
    }

this is making reference to a little static array of colour-based bitmasks;

    static UInt32[] colourMasks = 
    {
      0xFFFF0000, // red
      0xFF00FF00, // green
      0xFF0000FF, // blue
      0xFFFFFF00, // yellow
      0xFF00FFFF, // cyan
      0xFFFF00FF  // purple
    };
    const int BODY_COUNT = 6;

and, essentially, this code is building up the 2D image from the captured frame data by on a per-pixel basis by;

  • checking the value from the body index frame to see whether the “pixel” should be included in the image – i.e. does it correspond to a body being tracked by the sensor?
  • taking the high byte of the Infra Red data and using it as a value 0..255 and then turning that value into a shade of a colour such that each body gets a different colour and such that the ‘depth’ of the colour represents the value from the IR sensor.

and that’s pretty much it. What I really like with respect to this code is that the SDK makes it easy to acquire data from multiple sources using the same pattern as acquiring data from one source – i.e. via the MultiSourceFrameReader and I don’t have to manually do the work myself.

The sort of output that I get from this code looks something like;

image

when I’m a little more distant from the sensor and perhaps a bit more like;

image

for a case where I’m nearer to the sensor and it looks like in this particular case the infrared data is being drawn in yellow.

Again, I’m impressed with the SDK in terms of how it makes this kind of capture relatively easy – if you want the code from this post then it’s here for download. For me, I’ve taken brief looks at getting hold of;

  • video
  • depth
  • infra-red
  • body index
  • skeletal

data from the sensor. I also want to dig into some other bits and pieces and my next step would be to look into some of the controls that the Kinect SDK comes with and how they can be used to “Kinectify” a user interface…

Rx and Getting Paged Data from the MovieDb API

I haven’t written any code with the Reactive Extensions for a little while, so much so that I’d forgotten how much fun it can be.

I got a ping from Javanie on Twitter about some code that he was a bit blocked on writing which needed to call the MovieDb API and the particular problem was in the way that the API provides paged results of data back to the caller.

What was nice about this was that the code he sent me was using RestSharp and he’d already done the work to create the right classes to serialize the results which come back from the service so I had only to do a bit of work in order to try and get those pieces to plug in nicely into Rx.

What the code was intending to do was to query the movie db API for its list of movie genres. This can be found by hitting the URL;

http://api.themoviedb.org/3/genre/list?api_key=MY_API_KEY

 

but you’ll need an API key I think to get proper results which come back something like this;

{"genres":[{"id":28,"name":"Action"},{"id":12,"name":"Adventure"},{"id":16,"name":"Animation"},{"id":35,"name":"Comedy"},{"id":80,"name":"Crime"},{"id":105,"name":"Disaster"},{"id":99,"name":"Documentary"},{"id":18,"name":"Drama"},{"id":82,"name":"Eastern"},{"id":2916,"name":"Erotic"},{"id":10751,"name":"Family"},{"id":10750,"name":"Fan Film"},{"id":14,"name":"Fantasy"},{"id":10753,"name":"Film Noir"},{"id":10769,"name":"Foreign"},{"id":36,"name":"History"},{"id":10595,"name":"Holiday"},{"id":27,"name":"Horror"},{"id":10756,"name":"Indie"},{"id":10402,"name":"Music"},{"id":22,"name":"Musical"},{"id":9648,"name":"Mystery"},{"id":10754,"name":"Neo-noir"},{"id":1115,"name":"Road Movie"},{"id":10749,"name":"Romance"},{"id":878,"name":"Science Fiction"},{"id":10755,"name":"Short"},{"id":9805,"name":"Sport"},{"id":10758,"name":"Sporting Event"},{"id":10757,"name":"Sports Film"},{"id":10748,"name":"Suspense"},{"id":10770,"name":"TV movie"},{"id":53,"name":"Thriller"},{"id":10752,"name":"War"},{"id":37,"name":"Western"}]}

from there, the code wants to get hold of the “Action” genre, grab its Id and use it to form a query to get all the movies of that genre with a URL like;

http://api.themoviedb.org/3/genre/28/movies?api_key=MY_API_KEY

and that returns the first page of results for that particular genre;

{"id":28,"page":1,"results":[{"adult":false,"backdrop_path":"/hXOWCT2HjZSK7qM4Se7TBjbktZF.jpg","id":26842,"original_title":"The Message: The Story of Islam","release_date":"1977-03-09","poster_path":"/6zOWn1mtcSPfWekydQKbxnCyaTw.jpg","popularity":1.510080781875,"title":"The Message: The Story of Islam","vote_average":8.5,"vote_count":13},{"adult":false,"backdrop_path":"/6I3GKrpNGGFEXx7VhUZGMgpCqaV.jpg","id":3482,"original_title":"The Train","release_date":"1964-09-22","poster_path":"/idpWIwrJK5I8eiMkNYBddXvBlbW.jpg","popularity":1.05039008694455,"title":"The Train","vote_average":8.4,"vote_count":10},{"adult":false,"backdrop_path":"/dfSaFmcQBTu68JcTaNhaaSB0If1.jpg","id":11712,"original_title":"Tsubaki Sanjûrô","release_date":"1962-01-01","poster_path":"/fOP13eRzTGAvWo7fEiuWfrYlihC.jpg","popularity":0.625777660667093,"title":"Sanjuro","vote_average":8.4,"vote_count":15},{"adult":false,"backdrop_path":"/14qGZyC10dSWadsNuctFEM1QqHZ.jpg","id":832,"original_title":"M","release_date":"1931-05-11","poster_path":"/5jAaU3LStpQNlaBnZYsExMlvsBQ.jpg","popularity":3.83229663224416,"title":"M","vote_average":8.4,"vote_count":33},{"adult":false,"backdrop_path":"/jxIscmrDkgTEcYpKD9FePOoSLWk.jpg","id":118406,"original_title":"Gekijōban Naruto: Rōdo tu Ninja","release_date":"2012-07-28","poster_path":"/hELlIPO1u294ICcderb4PLG3NCk.jpg","popularity":1.731211250625,"title":"Naruto Shippuden the Movie: Road to Ninja","vote_average":8.3,"vote_count":12},{"adult":false,"backdrop_path":"/6XxsHC0Ty35lyL7nUJicE7vHMTP.jpg","id":11016,"original_title":"Key Largo","release_date":"1948-07-16","poster_path":"/iU9j8ACFKjTB1b7nTfVfLjMVGAe.jpg","popularity":2.07485774914062,"title":"Key Largo","vote_average":8.3,"vote_count":10},{"adult":false,"backdrop_path":"/z9qC0tssrzo88UKy0V8RCuZ12fj.jpg","id":15003,"original_title":"ช็อคโกแลต","release_date":"2008-02-06","poster_path":"/daFKjI9gZKcglJhmn5i2lz4PLA.jpg","popularity":1.578905725,"title":"Chocolate","vote_average":8.3,"vote_count":14},{"adult":false,"backdrop_path":"/qtxiemVTpPjDoueStK6fRliU46Z.jpg","id":13855,"original_title":"Chugyeogja","release_date":"2008-02-14","poster_path":"/6Zqbrb7y8FdwOS5zLTEKbEZyXxz.jpg","popularity":0.873446149678136,"title":"The Chaser","vote_average":8.2,"vote_count":19}],"total_pages":74,"total_results":1477}

I snipped out most of that JSON but the important bit is that it contains an array of results and it also contains a total_pages and total_results so it’s possible to figure out how many pages are required and by appending a page parameter to the query string it’s possible to request each of the pages.

What Javanie wanted from his code here was to bring back some details of each movie in this genre (dealing with the paged nature of the data). He handed me a bunch of really useful classes;

  public class Genre
  {
    public int Id { get; set; }
    public String Name { get; set; }
  }

  public class GenreCollection
  {
    public List<Genre> Genres { get; set; }
  }

  public class MovieCollection
  {
    public int Id { get; set; }
    public int Page { get; set; }
    public List<MovieResult> Results { get; set; }
    public int TotalPages { get; set; }
    public int TotalResults { get; set; }
  }
  public class MovieResult
  {
    public int Id { get; set; }
    public String BackdropPath { get; set; }
    public String OrignalTitle { get; set; }
    public String PosterPath { get; set; }
    public String ReleaseDate { get; set; }
    public String Title { get; set; }
    public Double VoteAverage { get; set; }
    public int VoteCount { get; set; }
  }


and so the task became one of how we might use a bit of Rx in order to make a web request to get back the GenreCollection from the service and then find the “Action” genre and then make some more web requests in order to get back all the movies.

Something a little like;

Web Request for The Genre List

Extract the ID of the “Action” Genre

Web Request for the first page of the movies in the “Action” genre

Extract the total pages count

For each page Web Request for that page

For each movie on that page extract some detail of that movie.

It’s very procedural when we hit the “loop” parts of that logic and the trick (if there is a trick) is to perhaps think of that looping in terms of sequences rather than loops.

As is often the case on this blog, I wrote a bit of code to experiment with this. I didn’t spend a lot of time on it. It might not be quite right. “Buyer beware” as they say Smile

Step 1 – Making a Web Request in an Observable Way

The first thing I wanted to do was to try and use the RestClient from Rest Sharp in an observable way. Because it has methods that return Task it becomes fairly natural to link it up with Rx and produce something IObservable. I wanted a method that would hit a URL, deserialize the results into some <T> and then return a sequence of those values and so I came up with;

    static readonly string baseUrl = @"http://api.themoviedb.org/3/";
    static readonly string apiKey = "get your own key :-)";

    static string MakeUrl(string insert)
    {
      return (string.Format("{0}?api_key={1}", insert, apiKey));
    }

    static IObservable<T> MakeObservableMovieDbRequest<T>(string url,
      params KeyValuePair<string,string>[] additionalParameters)
    {
      var observable = 
        Observable.FromAsync<IRestResponse<T>>(
          () =>
          {
            RestClient client = new RestClient(baseUrl);
          
            StringBuilder parameterisedUrl = new StringBuilder(MakeUrl(url));

            foreach (var keyValuePair in additionalParameters)
            {
              parameterisedUrl.AppendFormat(@"&{0}={1}",
                keyValuePair.Key, keyValuePair.Value);
            }

            RestRequest request = new RestRequest(parameterisedUrl.ToString());

            return (client.ExecuteGetTaskAsync<T>(request));
          }
        );

      var observableData = observable.Select(o => o.Data);

      return(observableData);
    }

And then I could make use of this with something along the lines of;

      var genresCollection = MakeObservableMovieDbRequest<GenreCollection>(
        @"genre/list");

      // this observable feels like a sequence but it really only ever returns one
      // entry which then has a number of genres within it. we can then flatten 
      // that out here.
      var genres = genresCollection.SelectMany(collection => collection.Genres);

      genres.Subscribe(
        genre =>
        {
          Console.WriteLine(genre.Name);
        });

      Console.ReadLine();

One of the “interesting” parts of this piece of code is when the HTTP request is sent to the service. That is – is it sent after line 1 above executes or is it sent as the subscription starts on line 9 onwards? In the Rx terminology that’s the difference between a “hot” observable and a “cold” observable.

In this case, without that call to Subscribe on line 9, no HTTP request is sent to the service. It’s produced “on demand” which is probably what I want.

Step 2 – Getting to the “Action” Genre and Figuring out Page Counts

Having got the ability to make a web request, I can now call the API to get hold of the id of the genre relating to “Action” movies and I can then grab the first page of data back from that web request to figure out how many pages of data there are in total.

That is, I can write code such as;

      var genresCollection = MakeObservableMovieDbRequest<GenreCollection>(
        @"genre/list");

      // this observable feels like a sequence but it really only ever returns one
      // entry which then has a number of genres within it. we can then flatten 
      // that out here.
      var genres = genresCollection.SelectMany(collection => collection.Genres);

      // likely to only be one of these in reality.
      var actionGenres = genres.Where(genre => genre.Name == "Action");

      // this should represent the first page of data for any genre that
      // identifies itself as "Action"
      var actionGenreMovies = actionGenres.SelectMany(
        actionGenre =>
        {
          string url = string.Format(@"genre/{0}/movies", actionGenre.Id);

          return (MakeObservableMovieDbRequest<MovieCollection>(url));
        }
      );

      actionGenreMovies.Subscribe(
        movies =>
        {
          Console.WriteLine(movies.TotalPages);
        });

and so now, we’ve managed to produce a sequence of integers (probably a sequence of 1) which relates to the total number of pages of data available in each of the genres that identifies itself as “Action”.

Step 3 – Producing a Sequence of Page Numbers

It takes a bit of head-scratching to try and conjure up a way to go from this sequence containing the total number of pages to a sequence of HTTP calls which return pages of data but it’s not too bad. We have functions like Enumerable.Range() (or Observable.Range()) which can produce a sequence of integers for us so that could be use to produce page numbers 1…N where N is the total number of pages available.

If we work that way then we can produce all the pages of the result set by going back to the service again as below;

      var genresCollection = MakeObservableMovieDbRequest<GenreCollection>(
        @"genre/list");

      // this observable feels like a sequence but it really only ever returns one
      // entry which then has a number of genres within it. we can then flatten 
      // that out here.
      var genres = genresCollection.SelectMany(collection => collection.Genres);

      // likely to only be one of these in reality.
      var actionGenres = genres.Where(genre => genre.Name == "Action");

      // this should represent the first page of data for any genre that
      // identifies itself as "Action"
      var actionGenreMovies = actionGenres.SelectMany(
        actionGenre =>
        {
          string url = string.Format(@"genre/{0}/movies", actionGenre.Id);

          return (MakeObservableMovieDbRequest<MovieCollection>(url));
        }
      );

      // produce a sequence of page numbers 1..N but don't lose the genre
      // data, pass it along as part of the sequence.
      var genrePagesToRequest = actionGenreMovies.SelectMany(
        genreData =>
        {
          var numberSequence = Enumerable.Range(1, genreData.TotalPages).ToObservable();

          return (numberSequence.Select(
            n => new 
              { 
                Page = n, 
                GenreInfo = genreData 
              }
            ));
        }
      );

and that produces a nice output to the console of (1, N), (2, N), (3,N) … (N,N) with the next step being to produce a set of web requests for each of those pages in order to get the actual movie data.

Step 4 – Producing a Sequence of Page HTTP Requests

Now to try and turn that previous sequence of integers into a sequence of results from HTTP requests for each of the pages of data in question. I don’t think that’s too difficult by adding another step to the pipeline;

      var genresCollection = MakeObservableMovieDbRequest<GenreCollection>(
        @"genre/list");

      // this observable feels like a sequence but it really only ever returns one
      // entry which then has a number of genres within it. we can then flatten 
      // that out here.
      var genres = genresCollection.SelectMany(collection => collection.Genres);

      // likely to only be one of these in reality.
      var actionGenres = genres.Where(genre => genre.Name == "Action");

      // this should represent the first page of data for any genre that
      // identifies itself as "Action"
      var actionGenreMovies = actionGenres.SelectMany(
        actionGenre =>
        {
          string url = string.Format(@"genre/{0}/movies", actionGenre.Id);

          return (MakeObservableMovieDbRequest<MovieCollection>(url));
        }
      );

      // produce a sequence of page numbers 1..N but don't lose the genre
      // data, pass it along as part of the sequence.
      var genrePagesToRequest = actionGenreMovies.SelectMany(
        genreData =>
        {
          var numberSequence = Enumerable.Range(1, genreData.TotalPages).ToObservable();

          return (numberSequence.Select(
            n => new 
              { 
                Page = n, 
                GenreInfo = genreData 
              }
            ));
        }
      );

      // take each one of those entries and produce an HTTP request which goes and
      // gets the data for that particular page's movies.
      var movieDetails = genrePagesToRequest.SelectMany(
          pageInfo =>
          {
            string url = string.Format(@"genre/{0}/movies", pageInfo.GenreInfo.Id);

            var parameter = new KeyValuePair<string, string>("page",
              pageInfo.Page.ToString());

            var request = MakeObservableMovieDbRequest<MovieCollection>(url,
              parameter);

            return (request.SelectMany(collection => collection.Results));
          }
        );

      movieDetails.Subscribe(
          movie =>
          {
            Console.WriteLine("Movie {0}", movie.Title);
          }
        );

and so now we end up with a sequence of MovieResult which is being pulled from each web request that we make to the service with 1 request bringing back multiple movie results.

The only downside of this is that it’s a bit haphazard. There’s a bunch of concurrent work going on in there. For instance, if I set a breakpoint on line 60 up above then take a look at what’s going on in Fiddler then I see;

image

that’s quite a lot of HTTP requests! The order in which requests come back is also somewhat random. For instance, each time I run this code I can find a different movie returned as the first movie which might not be what I’m looking for.

Step 5 – Keeping the Pages in Sequence

Back on line 42 of that previous code snippet I’m using the SelectMany operator to go from a sequence of anonymous types (containing a page number and a MovieCollection) to a sequence of MovieResults. The way in which that is done though is to produce an IObservable<MovieResult> for each of the input sequence and each of those IObservable<MovieResult> is produced by doing an asynchronous web request to get that particular page of details from the web service.

The ordering in which those web requests might complete is not deterministic and so it’s possible that the second web request completes before the first one and so movies from the second page of results are produced before movies from the first page of results and so on.

I think that’s the nature of the SelectMany operator – it takes an IObservable<T> and for each T it runs some function to produce an IObservable<U> which means that you ultimately end up with a set of IObservable<U> and values of type U are going to be produced by those observables as and when they are available.

If I was to replace that call to SelectMany with a call to Select then things are a little different. As below;

      var movieDetails = genrePagesToRequest.Select(
          pageInfo =>
          {
            string url = string.Format(@"genre/{0}/movies", pageInfo.GenreInfo.Id);

            var parameter = new KeyValuePair<string, string>("page",
              pageInfo.Page.ToString());

            var request = MakeObservableMovieDbRequest<MovieCollection>(url,
              parameter);

            return (request.SelectMany(collection => collection.Results));
          }
        );

the return type of this function is now IObservable<IObservable<MovieResult>> – it’s a sequence of sequences because it is no longer being flattened for me by SelectMany.

What that means is that I could use some other operator to combine these sequences. For instance, if I use the Concat operator then that’s an operator that preserves ordering so if I write something like the code below (including the entire code again);

      var genresCollection = MakeObservableMovieDbRequest<GenreCollection>(
        @"genre/list");

      // this observable feels like a sequence but it really only ever returns one
      // entry which then has a number of genres within it. we can then flatten 
      // that out here.
      var genres = genresCollection.SelectMany(collection => collection.Genres);

      // likely to only be one of these in reality.
      var actionGenres = genres.Where(genre => genre.Name == "Action");

      // this should represent the first page of data for any genre that
      // identifies itself as "Action"
      var actionGenreMovies = actionGenres.SelectMany(
        actionGenre =>
        {
          string url = string.Format(@"genre/{0}/movies", actionGenre.Id);

          return (MakeObservableMovieDbRequest<MovieCollection>(url));
        }
      );

      // produce a sequence of page numbers 1..N but don't lose the genre
      // data, pass it along as part of the sequence that comes out of
      // this operator.
      var genrePagesToRequest = actionGenreMovies.SelectMany(
        genreData =>
        {
          var numberSequence = Observable.Range(1, genreData.TotalPages);

          return (numberSequence.Select(
            n => new 
              { 
                Page = n, 
                GenreInfo = genreData 
              }
            ));
        }
      );

      // take each one of those entries and produce an HTTP request which goes and
      // gets the data for that particular page's movies.
      var movieDetails = genrePagesToRequest.Select(
          pageInfo =>
          {
            string url = string.Format(@"genre/{0}/movies", pageInfo.GenreInfo.Id);

            var parameter = new KeyValuePair<string, string>("page",
              pageInfo.Page.ToString());

            var request = MakeObservableMovieDbRequest<MovieCollection>(url,
              parameter);

            return (request.SelectMany(collection => collection.Results));
          }
        );

      var concatenatedMovieDetails = Observable.Concat(movieDetails);

      concatenatedMovieDetails.Subscribe(
          movie =>
          {
            Console.WriteLine("Movie {0}", movie.Title);
          }
        );


      Console.ReadLine();

Then the movies are consumed in the order in which they appear in the web service results starting at Page 1, Movie 1 and only moving on to Page 2 when we’ve exhausted Page 1.

What it also alters is the huge number of HTTP requests that were going out to the web concurrently. What I now see is requests going out to the web as they are needed – i.e. if I put a breakpoint on line 63 above then I see in Fiddler;

image

and the request for page 2 goes out once page 1 has been consumed.

Summary

I’m sure that I did some things wrong but I had a lot of fun putting together that little bit of code. Rx is such a great framework to work with – I wish it was more built into the .NET framework pieces as I’d love to see it show up in app development for Windows/Phone – e.g. rather than firing an event from the UI why not just have a sequence of data flowing from the UI. Naturally, you can always plug Rx in but it’d be great to see it natively part of more framework pieces.