Parallel Extensions for .NET

I’ve been meaning to spend some time looking at the Parallel Extensions for .NET (PFX) for a little while but I’ve never quite managed to find that time.

Last week, I managed to find a couple of hours on a train to have a poke around so I’m starting to write up the results of that here. Apologies if they’re a bit of a “ramble”.

One of the reasons why I’ve been struggling to find time to look at PFX is that I’m a little unsure of what my view on it is.

Some simple things;

  1. As far as I know, PFX is going to be part of the next version of the .NET Framework.
  2. It’s currently in a June Preview which you can download from here
  3. You can watch Daniel’s great “getting started” videos here although I suspect they might be a little out of date with respect to the June Preview.
  4. PFX is all part of the bigger Parallel Computing Platform initiative and there’s an MSDN centre all about that up here and then the native side of development is surfacing here.

Parallel FX is currently delivered in a single assembly, System.Threading.dll and the functionality looks to break down into a few key areas;

  1. Some thread-safe collections such as a ConcurrentStack<T> and a ConcurrentQueue<T>.
  2. Some classes to help with thread synchronisation such as a SpinLock and a couple of lighter-weight wrappers for existing mechanisms such as the SemaphoreSlim and the ManualResetEventSlim.
  3. Some convenience classes such as the LazyInit<T>.
  4. The parallel task library with the class Task and associated classes such as TaskManager and its TaskManagerPolicy. There are also some higher level convenience methods around this like Parallel.For and Parallel.ForEach and so on.
  5. Parallel LINQ or PLINQ which, again, uses the underlying Task infrastructure to sync up with LINQ via a set of extension methods.

When it comes to (1), (2) and (3) I think they’re really very easy to accept. It would have been nice to have these sorts of classes in the .NET Framework from day one but it’s nice to see them arrive now.

So, I guess instead of having something like this;

 class Program
  {
    static Stack<int> sharedStack;
    static object lockStack;

    static Program()
    {
      lockStack = new object();
      sharedStack = new Stack<int>();
    }
    static void Main(string[] args)
    {
      lock (lockStack)
      {
        sharedStack.Push(10);
      }
    }
  }

I can perhaps have something like this;

class Program
  {
    static ConcurrentStack<int> sharedStack;

    static Program()
    {
      sharedStack = new ConcurrentStack<int>();
    }
    static void Main(string[] args)
    {
      sharedStack.Push(10);
    }
  }

and that removes code which is a good thing and if I want to get something off the top of that stack in an atomic manner then I can do something like;

    static void Main(string[] args)
    {
      int value = 0;

      if (sharedStack.TryPop(out value))
      {
      }
    }

and that’s a lot nicer than what I would have had to have done previously might might be something like;

class Program
  {
    static Stack<int> sharedStack;
    static object lockObject;

    static Program()
    {
      lockObject = new object();
      sharedStack = new Stack<int>();
    }
    static void Main(string[] args)
    {
      int value = -1;

      lock (lockObject)
      {
        if (sharedStack.Count > 0)
        {
          value = sharedStack.Pop();
        }
      }
      if (value != -1)
      {
        // ...
      }
    }
  }

Similarly, classes like BlockingCollection<T> are great because a lot of people have written this kind of collection-with-synchronisation in the past and it’s good to have a single, tested implementation so that you can write classic consumer-producer style code;

  class Program
  {
    static BlockingCollection<int> work;

    static Program()
    {
      work = new BlockingCollection<int>();
    }
    static void Main(string[] args)
    {
      Thread producer = new Thread(() =>
      {
        Random r = new Random((int)DateTime.Now.Ticks);

        while (true)
        {
          work.Add(r.Next(100));
          Thread.Sleep(r.Next(500));
        }
      });
      producer.Start();

      Thread consumer = new Thread(() =>
        {
          while (true)
          {
            int item = work.Remove();
            Console.WriteLine(item);
          }
        });

      consumer.Start();

      Console.ReadLine();
    }
  }

So, this is all good stuff and, similarly, if you have a situation where a SpinLock is going to give you an advantage over a regular .NET lock ( I can’t remember whether they spin or not ) then to have a class that does that for you and tries to avoid the cost of a kernel transition is nice ( note that, below, I made the consumer sleep in its loop just to keep it from spinning having taken the BlockingCollection away );

  class Program
  {
    static List<int> work;
    static SpinLock spinLock;

    static Program()
    {
      work = new List<int>();
      spinLock = new SpinLock();
    }
    static void Main(string[] args)
    {
      Thread producer = new Thread(() =>
      {
        Random r = new Random((int)DateTime.Now.Ticks);

        while (true)
        {
          spinLock.Enter();
          work.Add(r.Next(100));
          spinLock.Exit();

          Thread.Sleep(r.Next(500));
        }
      });
      producer.Start();

      Thread consumer = new Thread(() =>
        {
          while (true)
          {
            int item = -1;

            spinLock.Enter();

            if (work.Count > 0)
            {
              item = work[0];
              work.RemoveAt(0);
            }
            spinLock.Exit();

            if (item != -1)
            {
              Console.WriteLine(item);
            }
            Thread.Sleep(100);
          }
        });

      consumer.Start();

      Console.ReadLine();
    }
  }

So, even though my code is a bit sketchy, the presence of these new classes is a good thing to see.

Where I’ve not yet come to a conclusion is around what happens with items 4 and 5 in my original list which are now items 1 and 2 below;

  1. The parallel task library with the class Task and associated classes such as TaskManager and its TaskManagerPolicy. There are also some higher level convenience methods around this like Parallel.For and Parallel.ForEach and so on.
  2. Parallel LINQ or PLINQ which, again, uses the underlying Task infrastructure to sync up with LINQ via a set of extension methods.

These classes make it very easy for me to write code such as;

static void Main(string[] args)
    {
      Parallel.For(0, 10, x =>
        {
          // Usually you'd do something useful here.
          for (int i = 0; i < 100000000; i++)
          {

          }
        });
    }

and the “problem” that I’ve got is that I have no idea what that code is actually going to do. Things that occur to me;

  1. How many threads will it create to run these 10 parallel tasks?
  2. Does it do something fancy to try and affinitise them to processors?
  3. Does it have some heuristic as to when threads are injected/retired and how’s that linked to things like CPU utilisation, number of threads, memory usage etc? Is that a dynamic decision or is it decided at the time of the Parallel.For?
  4. How do I do IO from the middle of this code? That, presumably, still happens on threads delivered out of the “IO bit” of the .NET ThreadPool?

My other question ( and it kind of ties in with (3) above ) is where I’d use this? I guess that I’m not going to use it server-side under something like;

  1. ASP.NET Web Pages
  2. WCF Web Services
  3. WF Workflows

because in those environments my code is already being invoked from a multi-threaded environment and I probably don’t want to launch another bunch of threading infrastructure to compete with the threads of that hosting environment.

That leads me to think that I’d probably use it either on the client-side or in a server environment where I’ve got more control over the machine. Maybe I bought a big server to do some weather calculations or some such or maybe I’m running something client-side.

On the IO front, as an experiment if I want to mix in async IO then I think the simplicity of the Parallel.For soon gets lost in the complexity of everything else. Say I want to read a bunch of text files in parallel with some code like this ( note – I used one two many lambda statements here so this code is probably riddled with bugs );

  class Program
  {
    static void Main(string[] args)
    {
      string[] files =
        Directory.GetFiles(".", "*.txt");

      CountdownEvent counter = new CountdownEvent(files.Count());

      Parallel.For(0, files.Count(), p =>
        {
          byte[] buffer = new byte[1024];

          FileStream fs = new FileStream(
            files[p], FileMode.Open, FileAccess.Read);

          AsyncCallback forwardReference = null;

          AsyncCallback fn = iar =>
            {
              try
              {
                int read = fs.EndRead(iar);

                if (read < buffer.Length)
                {
                  fs.Close();
                  Console.WriteLine("File read");
                  counter.Decrement();
                }
                else
                {
                  fs.BeginRead(buffer, 0, buffer.Length, forwardReference, null);
                }
              }
              catch
              {
                fs.Close();
              }
            };

          forwardReference = fn;

          fs.BeginRead(buffer, 0, buffer.Length, forwardReference, null);
        });

      counter.Wait();
      Console.WriteLine("For loop is done");
    }
  }

regardless of whether the code is quite right, the fact that I’m using Parallel.For doesn’t add too much because the complexity is in the async IO code and I’m still feeling a bit itchy about how many additional threads I’m creating and so on to do a bunch of async IO work.

I also have to take some care because the regular semantic for Parallel.For looks to be that it doesn’t return until the parallel tasks have completed. However, my parallel tasks in this case look to complete before they actually do because they go off and issue async IO. Hence the use of the CountdownEvent which is another nice class to have 🙂

The other side of this is whether it’s a good idea to be using something like Parallel.For for this kind of work. I’ve a suspicion that this is not what it’s intended to do so I wrote a little bit of test code that I found helpful in looking at how threads and so on are being created/managed by the framework;

  class Program
  {
    static void TestParallel(int maxCount)
    {
      DateTime start = DateTime.Now;
      SpinLock sLock = new SpinLock();
      List<int> threadIds = new List<int>();

      Console.WriteLine("Testing count of {0}", maxCount);

      Parallel.For(0, maxCount, i =>
      {
        while ((DateTime.Now - start).TotalSeconds < 10)
        {
          sLock.Enter();

          if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          {
            threadIds.Add(Thread.CurrentThread.ManagedThreadId);
          }
          sLock.Exit();
        }
      });

      Console.WriteLine("\tEncountered {0} threads", 
        threadIds.Count);

    }
    static void Main(string[] args)
    {
      for (int i = 2; i < 100; i *= 2)
      {
        TestParallel(i);
      }
     Console.ReadLine();
    }
  }

and I never saw this wander above 4 threads for any number of maxCount ranging up to 64 so it looks like I’m getting 2 threads per processor (this is on my dual-core laptop) at most and that’s regardless of whether I insert a big Sleep() into the middle of my “work”.

So that means that if I’m trying to do async IO work then the Parallel.For is not really helping me as it’s making no attempt to parallelise all the tasks that I’ve given it ( not that I’m saying this is a bad thing, you understand ).

The framework is, instead, parallelising a few of them ( probably a sensible number ) at a time and letting them run to completion before considering anything else. I can see that by making my code more explicit so I changed my example to something like;

class Program
  {
    static void Main(string[] args)
    {
      ConsoleColor[] values = (ConsoleColor[])Enum.GetValues(typeof(ConsoleColor));
      object lockObject = new object();

      Parallel.For(0, values.Length, c =>
        {
          while (true)
          {
            // Not a great idea but works for me here.
            lock (lockObject)
            {
              Console.ForegroundColor = values[c];
              Console.WriteLine("Work");
            }
            Thread.Sleep(250);
          }
        });
    }
  }

which lets me see that I get 4 colours out of this. So, subsequent tasks are never scheduled because the initial tasks never complete although I’m a little puzzled as to why it’s 4 and not 2 given that I’m on a dual-processor machine.

That is – if we’re going to take the approach that we only schedule parallel tasks based on the number of CPUs then why schedule 2 per (logical) CPU rather than 1 per CPU? I can see where it’s coming from with a piece of code like this one;

Console.WriteLine(TaskManager.Default.Policy.IdealThreadsPerProcessor);

which prints out 2 on my system so given that it’s dual-core I’m going to end up with 4 threads and, hence, 4 parallel tasks will run at the same time.

I had to question why that was the case and found out from this post (item 1) that it’s a limitation of the current CTP bits in that they are trying to do a limited form of deadlock avoidance and it’ll change in the future.

The results of that previous code though are very different from what the ThreadPool would give you with something like;

 class Program
  {
    static void Main(string[] args)
    {
      ConsoleColor[] values = (ConsoleColor[])Enum.GetValues(typeof(ConsoleColor));
      object lockObject = new object();

      for (int i = 0; i < values.Length; i++)
      {
        int j = i;

        ThreadPool.QueueUserWorkItem(o =>
          {
            while (true)
            {
              // Not a great idea but works for me here.
              lock (lockObject)
              {
                Console.ForegroundColor = values[j];
                Console.WriteLine("Work");
              }
              Thread.Sleep(250);
            }
          });
      }
      Console.ReadLine();
    }
  }

and, actually, those 2 examples started to help me see a little bit more around what the PFX bits are trying to do for me.

The traditional managed ThreadPool does a lot of things ( timers, waiters, IO, workers ) so a direct comparison with PFX is a bit wrong but I’d say that for work-items (i.e. QueueUserWorkItem) the ThreadPool takes a thread-centric approach to getting the work done.

That is, the ThreadPool has a logical queue of work items to be done and ( unless its heuristics tell it the machine is pegged or its limits tell it to stop ) it’ll periodically inject threads to try and pull more and more of those items from the queue and get as many of them running in parallel as it can allowing the OS to multi-task between them.

The ThreadPool’s assumption has to be that the work items aren’t going to max out the CPU’s individually and we want as many as we can to be running at a time so that, between them, they drive up towards 100% utilisation of the CPU’s albeit with the downside of context-switching and poor use of the CPU caches if we do end up in a situation where lots of tasks are trying to process concurrently.

That’s likely to be a pretty reasonable assumption in situations like ASP.NET where you get a request, pull it off the network stack asynchronously, talk to the database for a while and then render the results – each request isn’t individually likely to be trying to do something that requires huge numbers of CPU cycles.

The PFX isn’t taking that approach.

It’s taking more of a task-centric approach. There are some tasks to be done. There are some CPUs to do them on. Generally, it looks to run the task on the CPU until it’s done with the assumption being that the task is largely compute-centric and so it’s not worth running any other tasks on that CPU until the current one completes.

This probably wouldn’t fly for ASP.NET as it means that the requests would start to queue up pretty quickly 🙂 but it does mean that for CPU intensive tasks there’s generally going to be less context switching between threads and that’s going to preserve the data in my caches and so on.

I can see that by comparing this (admittedly unfair and a bit pathological) piece of code;

      for (int i = 0; i < 100; i++)
      {
        ThreadPool.QueueUserWorkItem(o =>
          {
            while (true) ;
          });
      }
      Console.ReadLine();

which I ran for a good few minutes and ticked up to about 30 threads all trying to hammer my CPU’s which meant an ever increasing number of context-switches between them highlighted by the black line below which is on a trend upwards with the number of threads (blue line, increases every 30 seconds I believe with the ThreadPool);

image

Running the same (contrived) example with the PFX bits as in;

for (int i = 0; i < 100; i++)
      {
        int j = i;
        Task t = Task.Create(o =>
        {
          while (true)
          {
          }
        });
      }
      Console.ReadLine();

Gives me a flatline for the thread count of my process and a fairly flat line for context switching on the machine ( bear in mind that this is just my laptop on a train so there’s a bunch of other stuff going on in the background );

image

I can sort of see this a little with perfmon by asking it to show the states of the threads in the process. For the PFX case, it looked like this;

image

That is – most of the threads look to be in fixed state ( presumably, waiting ) whereas 4 keep switching between values of 1 and 2 which represent “READY” and “RUNNING” respectively.

If I get a similar graph from perfmon whilst running the ThreadPool example then I see;

image

That is – lots of threads are transitioning between READY/RUNNING as they fight ( in this case a bit unproductively ) for CPU resource.

 

So, I can see situations where I want to run compute-centric tasks on N CPUs as efficiently as possible and I think that’s the point at which the Parallel Extensions kick in and let me do that in a simple way ( not that I’ve looked particularly long and hard just yet at the Task class itself and nor at the PLINQ bits either ).

What I still feel a little bit vague about is how, as an application programmer, I’m meant to figure out what N is meant to be. That is, on an N-proc machine I can just let the framework default to N or I can explicitly set some value for N but it’s hard to figure out what that should be given that there may be other workloads running on the machine. Generally, I guess most developers using the framework are going to let N default to the number of CPUs on the machine because any other decision is a really difficult one to make.