Intro
Long before Microsoft .NET 4.5 was a reality, in the days when asynch programming was not a given, I developed a bit of a concurrency wrapper implementation which has been used on a variety of projects.
At the threat of being redundant (in light of the across-the-board support for Asynch in .NET 4.5) I’ve decided to write up my old threading approach in the hopes that it might be useful to those not yet using .NET 4.5.
This implementation isn’t terribly advanced, I just thought it might be useful for someone “out there”. It’s not been exhaustively tested or reviewed, so please be kind to it. As usual, the normal conditions apply
A case in work packages
What I wanted, about two years ago, was to quit writing the same thread synchronization logic over and over again. It seemed that whenever I wanted to repeat a series of tasks using threading, I was caught using a lock synch and some kind of support for marshalling events to a UI.
Finally sick of implementing roughly the same implementation or design over and over again, I sat down and figured out what I’d want from an API perspective – something I could throw different objects, types, handlers and events at and produce some semblance of consistency using multiple threads.
Design work
What I wanted to do was write a wrapper on top of the .NET Framework’s threading implementation (System.Threading) and using the Framework’s own queuing implementation, i.e.
ThreadPool.QueueUserWorkItem(ThreadProc, package);
The design took into account a number of different usage scenarios, for example “light weight” usage (simple setup and execute) versus more complex requirements like an ability to signal abort a single thread or using a master abort signal to terminate all running threads.
As this was originally designed to work with Windows services, the logic needed the ability to respond to a shutdown event, so threads had to respond to out of band requests within a reasonable time. As work load could also chop and change, the ability to enqueue new work was added later.
Class Warfare
So the basic design derives from a ThreadPoolManager (which co-ordinates thread execution and packaging) and a ThreadWorkPackage which is what the work to be performed is “wrapped” in. The Manager class is given either a set of packages, or a single package which constitutes the work to be performed.
The real magic is in the establishment of the work packages (providing proc locations for the work to be performed) and the arguments used in beginning execution. There are a number of considerations to be made before executing threads.
Full disclosure: there are a number of classes in a common assembly which really come from a larger and more complex “enterprise” solution which I designed and built in 2010, so these are somewhat just stubs or placeholders at this time. To keep the solution to a minimum, I removed A LOT of the original implementation.
Advantages
So what the ThreadManager class really allows you to do it to leverage some “boiler plate” implementation which handles setting up your event callbacks, thread counters, synchronization objects and setting (and triggering) any event abort signals you’d care to use.
Probably one of the best ways to demonstrate this is in some code samples which show how you’d take advantage of the ThreadManager.
An easy example below shows how to do a “minimal fuss” configuration, noting that the bulk of the configuration is in setting up the configuration of each individual ThreadWorkPackage.
[TestMethod] public void TestSynchExecution() { List<ThreadWorkPackage> packages = TestCommon.GetWorkList("Basic Test", 40, 0, new ThreadWorkPackage.ExecutionEvent(callback)); ThreadPoolManager mgr = new ThreadPoolManager(); mgr.BeginExecution(packages, true); }
public static List<ThreadWorkPackage> GetWorkList(string jobTitle, int itemsToAdd, int multiplier, ThreadWorkPackage.ExecutionEvent callbackEvent) { List<ThreadWorkPackage> items = new List<ThreadWorkPackage>(); for (int x = 0; x < itemsToAdd; ++x) { ThreadWorkPackage package = new ThreadWorkPackage(); package.PackageId = (x + 1) + multiplier; package.PackageSeries = jobTitle; package.PackageProcessingEvent += callbackEvent; items.Add(package); } return items; }
As you can see, limited code is needed to get something simple up and running. Let’s have a look at the operation which is being invoked in the ThreadManager class:
/// <summary> /// Used for Wait Signal-free Calls /// </summary> /// <param name="workItems">[Required] A collection of items to execute</param> /// <param name="synchronous">[Required] Whether or not to wait until they have completed</param> public void BeginExecution(List<ThreadWorkPackage> workItems, bool synchronous) { BeginExecution(workItems, synchronous, false, null, String.Empty, -1); }
This simply calls another overloaded operation which eventually drives to the following, which actually invokes work package (and thread) execution:
Now that’s a whole lot of arguments! Here are the rest of th
/// <summary> /// Start Processing as a batch /// </summary> /// <param name="workItems">[Required] A collection of items to execute</param> /// <param name="synchronous">[Required] Whether or not to wait until they have completed</param> /// <param name="ignoreExecution">[Required] Whether to execute or not, if another set of items are currently being executed /// Note that ignoring executing threads will delay triggering of the Finished Processing event, as it increments the total count of threads /// </param> /// <param name="masterSeries">[Optional] A master abort handle, to abort all packages at once</param> /// <param name="masterWait">[Optional] A common descriptor - required when using a master abort signal</param> /// <param name="timeOut">[Optional] When to timeout from thread execution - used with master abort</param> /// <param name="maxWorkerThreads">[Optional] Max Worker Threads</param> /// <param name="maxCompletionPortThreads">[Optional] Max Completion Port Threads</param> /// <exception cref="NotSupportedException"> /// Only one execution batch is supported at any one time. /// Wait for the current execution to finish first. /// </exception> /// <exception cref="InvalidOperationException"> /// The arguments passed to BeginExecution must not be null and must contain at least one valid item /// </exception> /// <exception cref="NotSupportedException"> /// We use the masterSeries string to determine which threads to trigger an abort for, which is why it is a required field /// when a master wait handle is used for aborting package threads /// </exception> public void BeginExecution(List<ThreadWorkPackage> workItems, bool synchronous, bool ignoreExecution, AutoResetEvent masterWait, string masterSeries, int timeOut, int maxWorkerThreads, int maxCompletionPortThreads);
e possible arguments of the overloaded BeginExecution functions:
Event Driven
What you might have noticed in the definition of the ThreadManager is that it’s entirely configurable whether you want to be notified when one or all threads have finished executing. Events are available and can be used to help you determine when work has completed – or was interrupted:
#region Events /// <summary> /// Called when all Threads have finished executing /// </summary> public delegate void FinishedProcessing(); public event FinishedProcessing FinishProcessingEvent; /// <summary> /// Called when an individual thread has finished executing /// </summary> protected delegate void ThreadFinished(); protected event ThreadFinished ThreadExecutionFinished; /// <summary> /// Called when a thread has been aborted /// </summary> protected delegate void ThreadAborted(); protected event ThreadAborted ThreadExecutionAborted; #endregion
You might also have noticed that the individual ThreadWorkPackage class instances can have events too – for when the thread executes successfully, or when the thread is aborted:
public delegate void ExecutionEvent(ThreadWorkPackage package); public event ExecutionEvent PackageProcessingEvent; public delegate void ExecutionAbortEvent(ThreadWorkPackage package, bool timeOut); public event ExecutionAbortEvent PackageAbortEvent;
You might note the timeOut boolean parameter – the package can be set a timeout value if execution doesn’t complete within a specified time. This value will indicate whether a thread was aborted due to the abort signal being raised, or due to an execution timeout.
Synchronous Execution vs. Asynchronous
Another advantage is that you can kick off execution of work packages without having to wait for all the threads to finish. Using the events, you can just be notified when work completes, or if there are any unexpected errors.
Conversely, you can just as easily kick off execution and have the ThreadManager not return until work is completed.
Unit Tests
There are a few unit tests, but there were a plethora more in my original solution. I’ve republished some within the sample solution, however I’d recommend writing some of your own if you plan to have a play with the solution. It’ll help you understand the implementation better.
More Options
The manager class has quite a number of additional options , including the ability to limit the max number of worker threads, and to “throw some more work” onto the list of threads to be executed. It’s probably worth playing around with a few different options to see if this is suitable for your needs.
Known Issues
There’s one thing I’m aware of which I haven’t had time to correct, and probably quite a few more.
One issue is that the abort signals registered by ThreadPool.RegisterWaitForSingleObject are not properly unregistered when processing finishes.
When execution is finished, any abort events which were registered should be unregistered using RegisteredWaitHandle.Unregister(..). Sorry about that, I got a tad bit slack when I was putting this together, it’s an oversight I’d like to correct.
Feedback
Please be kind. With the birth of our first child earlier this year, I haven’t had nearly as much time as I usually would to either write this article or review the sample code/solution.
I’ve had this code running in a “production-like” environment underpinning Windows services which are running 24/7 without too many problems, but that doesn’t mean it’s bug free by any stretch of the imagination.
If you see any problems or encounter any issues, please drop me an email at rob.sanders@gmail.com. I’d love to factor in feedback, modifications etc.
Source Code
Source Code ThreadManager.zip