Edit:
This article describes an older version of the Parallel Framework. Please go to the Parallel Framework - Download and Support page to get the latest update.
This article will remain for historical purpose. Download links have been removed from this page.
Introduction
In a previous article "Parallel computing with sockets", I wrote about the implications when using parallelism in combination with sockets. But when talking to coders around me, it seems that they also encounter difficulties in grasping the concepts of parallel coding in general.Reason enough to write an article about it.
Please note that if you want to know more about "Why parallelism?" and some of the basics, please read the article "Parallel computing with sockets".
Ok, to explain how to "simply" make parallel software, I'll first recapitulate a little of the basics from the previous article.
Tasks
To code in parallelism, is to think in tasks. Each task can run on its own and runs in a separate process (thread) than the other tasks and the application.
Define your code into tasks rather than into threads. For example, a thread can first find all logfiles in a directory, open each file, read some lines, parse the lines and write the result into an output file. This can all be done in the same thread and if you want more concurrency, you can run multiple threads all doing the same thing.
If you would look at this in terms of tasks, you could make the following distinction:1. Find all logfiles in a directory2. Open a logfile and read some lines3. Parse a line and write output
The task 1 can create new tasks of type 2, and task 2 can create new tasks of type 3. In this model your tasks are autonomic and are able to run separately from each other. Also, they don't have to run in any specific order after they are created.
This makes it easy for the underlying OS to spread the tasks over the available threads and cpu-cores.
Though you don't want to see tasks as threads in a programmatic-flow kind-of-way, you do have to remind that all these tasks run on separate threads in some sort of a threadpool, and thus face the same memory mapping and locking issues as the underlying threads.
If you don't know much about programming with threads, please first read the article "Threading in C#" by Joseph Albahari. He explains all the ins and outs about threading.
Parallel Framework
To make it a little easier for people to write parallel code, I've developed a lightweight Parallel Framework. It's a simple framework to work with tasks. There are several more frameworks available on the internet, but most are a lot more complex and have additional features that I (and probably you) don't need most of the time.
The framework is simple. It's basically built up of three different classes:- ParallelState- ParallelTask- ParallelTaskPool
And (because I needed it for the example) I've written one additional class:- PropertybagState
The following diagram shows it all.
The ParallelTask is the task that runs your code and do the appropriate (optional) callbacks when your code has finished. It's basically nothing more than some sort of wrapper which delivers the ParallelState object to your code.
The ParallelTaskPool can run the tasks in a fashionable way. Though you can run the tasks by themselves, you would most probably want to use the pool. It takes care of problems like overloading the process with too many concurrent tasks. And it can also block your main thread until all tasks have finished.
To be able to pass state info and parameters to a task, ParallelState objects containing all necessary data are passed along the tasks. These are merely containers of data and perform no real function themselves.It also contains functionality to pass exceptions, which occurred in a task during execution, back to the main thread.
To create a specifically suiting state object for your needs, you can inherit from the ParallelState class and extend it. If you want a simple (but less efficient) state object, you can use the PropertybagState. It contains a hash-table in which you can store all parameters.
You will find the C# source files of the framework below.
Example Implementation: IIS LogParser
As an example on how to create parallel software, I've coded a little application which parses the (W3C) logfiles of an IIS webserver.
Basically what I want it to do is this:- Find all logfiles in a given directory- Read all the loglines- Convert the usable information to XML- and, Write all XML to one result file
For this example the following diagram applies.
So, basically you have a main thread, a number of "parse file" tasks and a vast number of "parse line" tasks. For a directory containing lots of huge logfiles this setup results in a huge amount of tasks.
# logfiles * # loglines per file = # tasks
To make sure that I don't kill the processor and memory with too many concurrent tasks, I'll use the ParallelTaskPool.
Example Code: parseFolder
public static void parseFolder(string _srcPath, string _dstPath, DateTime _from) { StreamWriter dst = null; try { //create result file dst = new StreamWriter(File.Open(_dstPath, FileMode.Create, FileAccess.Write, FileShare.Read)); dst.WriteLine("<?xml version=\"1.0\" encoding=\"utf-8\"?>"); dst.WriteLine("<?xml-stylesheet type=\"text/xsl\" href=\"translate.xslt\"?>"); dst.WriteLine("<REPORT>"); //get all logfiles string[] files = Directory.GetFiles(_srcPath, "*.log", SearchOption.AllDirectories); //parse all files for (int idx=0; idx<files.Length && !exceptionOccurred(); idx++) { //only process new files if (File.GetCreationTime(files[idx]) >= _from) { //create state object PropertybagState state = new PropertybagState(); state.setProperty("file", files[idx]); state.setProperty("dst", dst); //create task ParallelTask task = new ParallelTask( new ParallelTask.TaskRunnerDelegate(parseFile), new ParallelTask.TaskCallbackDelegate(taskDone), state); //run the task ParallelTaskPool.runTask(task); } } //wait until all tasks are done ParallelTaskPool.blockWhileTasksRunning(); //close report tag in result file dst.WriteLine("</REPORT>"); //rethrow the task exception, if pressent if (taskException != null) throw taskException; } finally { //try to close result file try{ dst.Close(); }catch(Exception e){;} } }
This is the main function and runs in the main-thread.This function finds all the log files in _srcPath. For each log file found, a task is created to handle that specific file.To pass on parameters for the task, a PropertybagState object is used, which contains the logfile path and a reference to the output file.
On creation of the task (at the constructor), the state object is passed and also two delegates are passed as parameters. One delegate refers to the code that will be executed by the task. The other delegate, an optional one, is a callback delegate and is executed when the task is done.
The ParallelTaskPool is used to run the task. Running tasks this way also enables you to block your main thread (or any other for that matter) and wait until all tasks are done. Now you can run all your tasks, wait until all tasks have finished and then cleanup after the tasks. You can do this with ParallelTaskPool.blockWhileTasksRunning().
In the example, I used this technique to close the output file (which is passed to all tasks as a state parameter). If I wouldn't wait until all tasks were finished, I'd either close the output file too early, resulting in IO errors in the tasks. Or I had to open and close the output file in each task, creating a huge IO overhead.
Example Code: taskDone
private static void taskDone(ParallelState _state) { //check for errors if (_state.taskExceptionOccurred()) taskException = new Exception("Exception during task execution", _state.taskException); //end task ParallelTaskPool.endTask(); }
This is the callback delegate function. As you can see, the state object is passed to this method and with it, you are able to check if there was an exception during the execution of the task.
Also, this is the place where you signal the ParallelTaskPool that the task is finished. When using the pool, this is vital to do. If you fail to call the ParallelTaskPool.endTask() the pool clutters up with tasks and only a few tasks will actually run. Others will remain queued and main thread execution will be halted.
Please also make sure that the call to ParallelTaskPool.endTask() is the last thing your task does. You can call it earlier without a problem, but tasks will run better (smoother) when you do so.
Example Code: exceptionOccurred
private static Exception taskException = null; private static bool exceptionOccurred() { return (taskException != null); }
This is a holder for an exception which occurred during execution of the task. It's filled by the taskDone (callback) function.This function is used by parseFolder (looping through all the files) and by parseFile (looping through all the lines) to stop looping when an exception has occurred.
Example Code: parseFile
private static void parseFile(ParallelState _state) { //get state properties PropertybagState stateIn = (PropertybagState)_state; StreamWriter dst = (StreamWriter)stateIn.getProperty("dst"); string filePath = stateIn.getPropertyString("file"); StreamReader log = null; try { //open logfile log = new StreamReader(File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)); //go through all lines string line = null; while ((line = log.ReadLine()) != null && !exceptionOccurred()) { if (!line.StartsWith("#")) { //create state object PropertybagState state = new PropertybagState(); state.setProperty("logline", line); state.setProperty("dst", dst); //create task ParallelTask task = new ParallelTask( new ParallelTask.TaskRunnerDelegate(parseLine), new ParallelTask.TaskCallbackDelegate(taskDone), state); //run the task ParallelTaskPool.runTask(task); } } } finally { //close logfile try{ log.Close(); }catch(Exception e){;} } }
This code runs as a task. The state is passed to the task as a parameter.Here the log file is opened and for each line that is not a comment, a new task is created which handles the parsing of the line.
As you can see a new state object is created. The current one can't be reused, because it would overwrite the logline property for each line (and thus task). But when you would extend the ParallelState class with your own specific state class, you would be able to make some properties static and create a .Clone() method. That way you would lessen overhead.
Example Code: parseLine
private static void parseLine(ParallelState _state) { //get the state object PropertybagState state = (PropertybagState)_state; //get some state properties StreamWriter dst = (StreamWriter)state.getProperty("dst"); string line = state.getPropertyString("logline"); //split line into fields string[] fields = line.Split(new char[]{' '}); if (fields.Length < 12) throw new Exception("Bad format of logline: \""+line+"\""); //create activity element XmlElement xml = (new XmlDocument()).CreateElement("ACTIVITY"); //add attributes xml.SetAttribute("datetime", fields[0]+"T"+fields[1]); xml.SetAttribute("site", fields[2]); xml.SetAttribute("url", fields[5]); xml.SetAttribute("query", fields[6]); xml.SetAttribute("clientIP", fields[9]); xml.SetAttribute("userAgent", fields[10]); xml.SetAttribute("reponseCode", fields[11]); //write xml to file lock (dst) dst.WriteLine(xml.OuterXml); }
This code also runs as a task. The state is passed to the task as a parameter.Here the log line is parsed and converted to XML. The XML is written to the output file.
Because tasks run on different threads (which can be executed simultaneously) and because all the parseLine tasks write to the same output file, you must use locking to avoid any thread safety issues on the file handle. Lock an object that is known (available) to all tasks which use the output file.In my example I've used the reference to the output file itself for the lock.
Please always make sure to lock as less as possible! Only lock what is absolutely necessary and nothing more, because otherwise performance can be drastically reduced and deadlocks are more easy to happen.
Now, please enjoy the Parallel Framework and parallel programming. If you have any questions about this subject, please leave a comment below.
References:
Threading in C# (http://www.albahari.com/threading/)Parallel Computing with Sockets (http://blog.rednael.com/2008/11/07/ParallelComputingWithSockets.aspx)
Downloads:
Please download the latest version at the Parallel Framework - Download and Support page.
Source files and documentation included.
Remember Me
Disclaimer The opinions expressed herein are my own personal opinions and do not represent my employer's view in any way.