Rx and Schedulers

I got caught out by something simple about scheduling in Rx and so I thought I’d share. I started with something that was fairly complicated but then boiled it down to a simple repro which helped me to figure it out and then when I asked around I got helpfully pointed to this forum post where someone had explored the same situation.

Here’s the simplest thing in the world which fooled me;

      var observable =
        Observable
          .Return(101)
          .Repeat()
          .Take(5);

      observable.ForEach(
        value => Console.WriteLine("Value produced is {0}", value));

      Console.WriteLine("Done");

and where it fooled me is that I expected it to output the value 101 5 times and then complete and output the text “Done”.

It doesn’t. It never outputs the text “Done”. Intriguingly, a similar example;

    var observable =
      Observable
        .Return(101)
        .Repeat()
        .Take(5);

    observable.Subscribe(
      i => Console.WriteLine(i),
      () => Console.WriteLine("Completed"));

prints the value 101 5 times and prints “Completed” but, from Task Manager, I can see it burning a CPU after it seems to have finished.

By contrast, a very similar looking piece of code;

      var observable =
        Observable
          .Range(101, 1)
          .Repeat()
          .Take(5);

      observable.ForEach(
        value => Console.WriteLine("Value produced is {0}", value));

      Console.WriteLine("Done");

does indeed output the value 101 5 times and then completes and outputs the text “Done”.

How can two things so similar be so different? I think it’s all down to schedulers.

Schedulers

There’s a good video over on Channel 9 about schedulers in Rx;

which provides great coverage but the basics are that Rx introduces the idea of a common interface for scheduling some work to be done. It’s a pretty simple idea that hides a lot of possible complexity;

image

  public interface IScheduler
  {
    DateTimeOffset Now { get; }

    IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action);
    IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action);
    IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action);
  }

and so a scheduler has a notion of the time and it can schedule some function to run either “now” or at some point in the future.

The idea of the IDisposable return value here is that disposing of the disposable can provide a way to attempt to cancel the work if that is possible.

I don’t think there’s any way to walk up to a scheduler as modelled here and determine how much outstanding work it has to do or how busy it is or anything like that.

Then there’s a whole bunch of extension methods to IScheduler living in the class Scheduler which also provides static properties for concrete implementations of the interface;

image

and the static properties there provide some implementations of IScheduler;

  • CurrentThread
  • Immediate
  • NewThread
  • TaskPool
  • ThreadPool

and there are also testing implementations in Microsoft.Reactive.Testing.dll and there are a couple more in the System.Reactive.Windows.Forms.dll and System.Reactive.Windows.Threading.dll for Windows Forms ( ControlScheduler ) and for WPF/Silverlight ( DispatcherScheduler ).

I think that some of these schedulers are fairly obvious and some of them have quite a degree of subtlety to them.

Let’s say that I’ve got some work to do;

using System;
using System.Reactive.Concurrency;
using System.Threading;
using System.Threading.Tasks;

class Program
{
  static void WriteMessage(string message)
  {
    Console.WriteLine("Thread [{0}], Task [{1}], Message [{2}]",
      Thread.CurrentThread.ManagedThreadId,
      Task.CurrentId ?? -1,
      message);
  }
  static void Work(IScheduler scheduler)
  {
    scheduler.Schedule(
      () => WriteMessage("Doing the pre-work"));

    for (int i = 0; i < 5; i++)
    {
      int j = i;
      scheduler.Schedule(
        () => WriteMessage(string.Format("Doing work item {0}", j)));
    }

    scheduler.Schedule(
      () => WriteMessage("Doing the post-work"));
  }
  static void Main(string[] args)
  {
    Work(Scheduler.NewThread);

    Console.ReadLine();
  }
}

and I’ve chosen to do this on the NewThread scheduler which I think it a fairly easy one to understand because (as far as I know) it does exactly what it says on the tin in that it creates a new thread for each piece of work and the work runs on that thread before letting that thread exit.

There’s a few implications of that – one is that there’s no guarantee about ordering and ( without extra code ) there’s no way of knowing when all the work is done and that’s why I have a Console.ReadLine at the end of the program so that the user can manually control the end of the program’s lifetime.

On my system, one run of this program displays;

Thread [3], Task [-1], Message [Doing the pre-work]

Thread Devil, Task [-1], Message [Doing work item 2]

Thread [4], Task [-1], Message [Doing work item 0]

Thread [9], Task [-1], Message [Doing the post-work]

Thread [7], Task [-1], Message [Doing work item 3]

Thread [5], Task [-1], Message [Doing work item 1]

Thread Music, Task [-1], Message [Doing work item 4]

and on another it displays;

Thread [3], Task [-1], Message [Doing the pre-work]

Thread Devil, Task [-1], Message [Doing work item 2]

Thread [9], Task [-1], Message [Doing the post-work]

Thread [5], Task [-1], Message [Doing work item 1]

Thread Music, Task [-1], Message [Doing work item 4]

Thread [4], Task [-1], Message [Doing work item 0]

Thread [7], Task [-1], Message [Doing work item 3]

If I were to switch the NewThread scheduler to the ThreadPool scheduler then there’s not much that really changes except each piece of work goes to the ThreadPool rather than a NewThread which should be a cheaper solution in terms of thread creation but may mean that specific work gets delayed a little longer than it did as it is possibly in a ThreadPool queue rather than executing immediately on a new thread. There’s still nothing that can be guaranteed about ordering and one run of this program displays;

Thread [3], Task [-1], Message [Doing the pre-work]

Thread Devil, Task [-1], Message [Doing work item 1]

Thread [4], Task [-1], Message [Doing work item 0]

Thread [9], Task [-1], Message [Doing the post-work]

Thread [7], Task [-1], Message [Doing work item 3]

Thread Music, Task [-1], Message [Doing work item 4]

Thread [5], Task [-1], Message [Doing work item 2]

on my system. I can replace the NewThread scheduler with the TaskPool scheduler with Tasks being used to run the work items which can have various implications for their scheduling but I’m not really likely to notice them here;

Thread [4], Task [2], Message [Doing work item 0]

Thread Music, Task [4], Message [Doing work item 4]

Thread [3], Task [1], Message [Doing the pre-work]

Thread [10], Task [5], Message [Doing the post-work]

Thread [5], Task [3], Message [Doing work item 1]

Thread [7], Task [7], Message [Doing work item 3]

Thread Devil, Task Devil, Message [Doing work item 2]

but my code does spot that it now has a current task whereas it never did before.

So far, so good but in some ways I think that these schedulers are simpler than the ones that don’t necessarily involve a separate thread doing work on our behalf.

What about the CurrentThread and Immediate schedulers? What do they do? If I switch my code to make use of them then I see the exact same results in both cases;

Thread [1], Task [-1], Message [Doing the pre-work]

Thread [1], Task [-1], Message [Doing work item 0]

Thread [1], Task [-1], Message [Doing work item 1]

Thread [1], Task [-1], Message [Doing work item 2]

Thread [1], Task [-1], Message [Doing work item 3]

Thread [1], Task [-1], Message [Doing work item 4]

Thread [1], Task [-1], Message [Doing the post-work]

so I’ve now got ordering preserved and I no longer need my Console.ReadLine() call to keep my program alive because it won’t end before the work has all been completed.

So these 2 schedulers are identical? No. The workload just needs changing to demonstrate the difference. Changing that Work function for a moment;

  static void Work(IScheduler scheduler)
  {
    WriteMessage("Doing the pre-work");

    for (int i = 0; i < 5; i++)
    {
      int j = i;
      scheduler.Schedule(
        () => WriteMessage(string.Format("Doing work item {0}", j)));
    }

    WriteMessage("Doing the post-work");
  }

Now, if I run this with the Immediate scheduler then what I see as output is;

Thread [1], Task [-1], Message [Doing the pre-work]

Thread [1], Task [-1], Message [Doing work item 0]

Thread [1], Task [-1], Message [Doing work item 1]

Thread [1], Task [-1], Message [Doing work item 2]

Thread [1], Task [-1], Message [Doing work item 3]

Thread [1], Task [-1], Message [Doing work item 4]

Thread [1], Task [-1], Message [Doing the post-work]

but if I run this with the CurrentThread scheduler then what I see as output is;

Thread [1], Task [-1], Message [Doing the pre-work]

Thread [1], Task [-1], Message [Doing the post-work]

Thread [1], Task [-1], Message [Doing work item 0]

Thread [1], Task [-1], Message [Doing work item 1]

Thread [1], Task [-1], Message [Doing work item 2]

Thread [1], Task [-1], Message [Doing work item 3]

Thread [1], Task [-1], Message [Doing work item 4]

and so the difference ( to me at least ) is that the Immediate scheduler runs work “right now on the current thread” whereas the CurrentThread scheduler queues work up such that it will execute when the current work is done.

As a secondary example;

   Scheduler.CurrentThread.Schedule(
      () =>
      {
        Console.WriteLine("1");

        Scheduler.CurrentThread.Schedule(
          () =>
          {
            Console.WriteLine("3");

            Scheduler.CurrentThread.Schedule(
              () =>
              {
                Console.WriteLine("5");
              });

            Console.WriteLine("4");
          });

        Console.WriteLine("2");

      });

will write out “1” to “5” in order on the CurrentThread scheduler.

This feels similar to the sort of scheduling that we’re used to from Windows UI models ( or COM components ) that use a message loop where we post messages back into the loop in order to schedule something in the future and in Windows forms this showed up as;

Control.Invoke

and in WPF/Silverlight is showed up as;

Dispatcher.[Begin]Invoke

but there is no message loop running on this particular thread. Rather, the scheduler itself is presumably managing some kind of Stack which has (at least) a queue of the work items that have been queued during the execution of the current item and so need to be executed when the current item completes.

It’s worth saying that if we were in an environment with a message loop like Windows Forms then the CurrentThread scheduler does not magically change its behaviour and become “message loop aware” and start making use of that message loop and so in an example where I have a UI ListBox called listBox1 this code;

      // this will execute when we get back to the message loop.
      this.BeginInvoke(new Action(() =>
        {
          this.listBox1.Items.Add("Control Invoke Item");
        }));

      // this will execute now
      Scheduler.CurrentThread.Schedule(
        () =>
        {
          this.listBox1.Items.Add("Scheduled Item");
        });

will produce the result;

image

whereas this code using the ControlScheduler which works with Windows Forms will indeed piggy back onto that message loop;

      // this will execute when we get back to the message loop.
      this.BeginInvoke(new Action(() =>
        {
          this.listBox1.Items.Add("Control Invoke Item");
        }));

      ControlScheduler scheduler = new ControlScheduler(this);

      // this will also execute when we get back to the message loop.
      scheduler.Schedule(
        () =>
        {
          this.listBox1.Items.Add("Scheduled Item");
        });

and so produces the result;

image

There’s more about the CurrentThread and the Immediate schedulers up in this forum post if you want to dig deeper into it and get into trampolines!

But we never specified any scheduler

Having spent a little time talking about the scheduler, it’s important to say that in my original code snippets I never used a scheduler. I never specified one. So, how did the scheduler come to bite me?

Defaults.

The Rx operators that have a need to introduce concurrency make use of a scheduler to determine how to introduce that concurrency. Where they do that, there is typically an overload on the operator to take an IScheduler. In my original code where I was using Observable.Return;

var obs = Observable.Return(101);

I could have been explicit and gone with the overload that took an IScheduler;

var obs = Observable.Return(101, Scheduler.CurrentThread);

and if I’d done that then my original code based around Return would work just the same way as the code based around Range.

If I choose not to be explicit about which scheduler I want then I’m going to get the default that the author of the operator ( i.e. Return() in this case ) decided to go with.

Now, this isn’t random. The Rx operator author will have made a sensible decision for a default and ( from MSDN ) that choice will be based around the principle;

“If you do not use the overload which takes a scheduler as an argument, Rx will pick a default scheduler by using the principle of least concurrency. This means that the scheduler which introduces the least amount of concurrency that satisfies the needs of the operator is chosen. For example, for operators returning an observable with a finite and small number of messages, Rx calls Immediate. For operators returning a potentially large or infinite number of messages, CurrentThread is called. For operators which use timers, ThreadPool is used.”

Based on that – Return uses the Immediate scheduler whereas Range uses the CurrentThread scheduler.

Coming back to the original snippets

At this point I can then revisit the original snippet ( or something very similar ) and ponder about the difference in behaviour that I got bogged down in.

This example;

    var obs =
      Observable
        .Return(1)
        .Do(value => Console.WriteLine("Still running"))
        .Repeat()
        .TakeWhile(x => false);

    obs.Subscribe(
      value => Console.WriteLine("Value is {0}", value),
      () => Console.WriteLine("Completed"));

does complete but it then continues to print the message “Still running” for ever. Why?

It’s worth pointing out that this code looks pretty simple but I think it turns out to be quite complex. If I stick a breakpoint on my line of code up above that prints the “Completed” message –i.e. here;

image

then when I arrive at that code, I see a call-stack that is around 55 calls deep and a lot of it looks like;

>    ConsoleApplication12.exe!Program.Main.AnonymousMethod__3() Line 23    C#

     System.Reactive.dll!System.Reactive.AnonymousObserver<int>.Completed() + 0x23 bytes   
     System.Reactive.dll!System.Reactive.AbstractObserver<int>.OnCompleted() + 0x3b bytes   
     System.Reactive.dll!System.Reactive.AnonymousObservable<int>.AutoDetachObserver.Completed() + 0x21 bytes   
     System.Reactive.dll!System.Reactive.AbstractObserver<int>.OnCompleted() + 0x3b bytes   
     System.Reactive.dll!System.Reactive.Linq.Observable.TakeWhile_<int>.AnonymousMethod__44a(int x = 1) + 0xd8 bytes   
     System.Reactive.dll!System.Reactive.AnonymousObserver<int>.Next(int value = 1) + 0x2b bytes   
     System.Reactive.dll!System.Reactive.AbstractObserver<int>.OnNext(int value = 1) + 0x31 bytes   
     System.Reactive.dll!System.Reactive.AnonymousObservable<int>.AutoDetachObserver.Next(int value = 1) + 0x29 bytes   
     System.Reactive.dll!System.Reactive.AbstractObserver<int>.OnNext(int value = 1) + 0x31 bytes   
     System.Reactive.dll!System.Reactive.AnonymousObserver<int>.Next(int value = 1) + 0x2b bytes   
     System.Reactive.dll!System.Reactive.AbstractObserver<int>.OnNext(int value = 1) + 0x31 bytes   
     System.Reactive.dll!System.Reactive.AnonymousObservable<int>.AutoDetachObserver.Next(int value = 1) + 0x29 bytes   
     System.Reactive.dll!System.Reactive.AbstractObserver<int>.OnNext(int value = 1) + 0x31 bytes   
     System.Reactive.dll!System.Reactive.Linq.Observable.Do<int>.AnonymousMethod__3c2(int x = 1) + 0x7a bytes   
     System.Reactive.dll!System.Reactive.AnonymousObserver<int>.Next(int value = 1) + 0x2b bytes   
     System.Reactive.dll!System.Reactive.AbstractObserver<int>.OnNext(int value = 1) + 0x31 bytes   
     System.Reactive.dll!System.Reactive.AnonymousObservable<int>.AutoDetachObserver.Next(int value = 1) + 0x29 bytes   
     System.Reactive.dll!System.Reactive.AbstractObserver<int>.OnNext(int value = 1) + 0x31 bytes   
     System.Reactive.dll!System.Reactive.Linq.Observable.Return<int>.AnonymousMethod__285() + 0x2a bytes   

so it’s not so easy to be able to reason about exactly what’s going on but it’s pretty easy to say that at the bottom of that section of the call-stack that I’ve included we have our friend Observable.Return calling OnNext and about half way up the section we have TakeWhile calling OnCompleted.

As part of experimenting, I wrote my own version of Observable.Return. I’m not sure that it’s the same as the real version but for my purposes here, it seemed to exhibit the same behaviour;

  static IObservable<T> MyReturn<T>(T value)
  {
    return (Observable.Create<T>(
      observer =>
      {
        Scheduler.Immediate.Schedule(() =>
          {
            observer.OnNext(value);
            observer.OnCompleted();
          });
        return (Disposable.Empty);
      }));
  }

I also wrote a version of TakeWhile;

  static IObservable<T> MyTakeWhile<T>(IObservable<T> source,
    Predicate<T> predicate)
  {
    return (
      Observable.Create<T>(
        observer =>
        {
          IDisposable sub = source.Subscribe(
            value =>
            {
              if (!predicate(value))
              {
                observer.OnCompleted();
              }
              else
              {
                observer.OnNext(value);
              }
            },
            ex => observer.OnError(ex),
            () => observer.OnCompleted());

          return (sub);
        }));
  }

and, again, I’m not sure if it matches the real implementation but I found it really useful to be able to try and reason about this stuff – it made things a lot more obvious to me.

If my MyReturn method uses the Immediate scheduler as it does above then line 8 of the MyReturn function will execute before line 23 of MyTakeWhile function. In fact, line 23 of MyTakeWhile will never execute. The “logical” chain of calls will look something like;

  1. Main :: Subscribe to TakeWhile
  2. TakeWhile :: Subscribe to Repeat
  3. Repeat :: Subscribe to Do
  4. Do :: Subscribe to Return
  5. Return :: OnNext ( 1 )
  6. Do :: execute lambda for value of 1
  7. Repeat :: OnNext ( 1 )
  8. TakeWhile :: OnNext ( 1 ) which will evaluate its predicate, find it false and so call
    1. Main :: OnCompleted
    2. TakeWhile :: “Dispose of subscription to Repeat”

and I think the basic problem is that at the very last step here, it is not possible for TakeWhile to dispose of its subscription to Repeat because it has yet to be returned. We’re trying to dispose of something that we don’t yet have and so no-one is going to cause the Repeat operator to stop when the TakeWhile operator stops.

Step 5 is vital in all this above because Return schedules the OnNext call immediately and so it executes as part of the subscription process.

If my MyReturn method uses the CurrentThread scheduler then line 23 of the MyTakeWhile function will execute before line 8 of the MyReturn function. The “logical” chain of calls will look something like;

  • Main :: Subscribe to TakeWhile
  • TakeWhile :: Subscribe to Repeat
  • Repeat :: Subscribe to Do
  • Do :: Subscribe to Return
  • Return :: Schedules a call to OnNext ( 1 ) and OnCompleted()
  • Return :: returns an empty disposable
  • Do :: returns some disposable
  • Repeat :: returns some disposable
  • TakeWhile :: returns some disposable
  • Return :: OnNext ( 1 ) runs

and so it’s a very different set of behaviour that happens just by virtue of switching from the Immediate to the CurrentThread scheduler.

My final thoughts on this one would be that something that appears to be very simple is actually pretty complicated and I find it a lot easier to try and diagnose what’s going on in the presence of source code rather than just with a binary library and that caused me to write my own versions of the operators so that I could step through them.