.Net 4 Task Parallel Library and Asynchronous Amazon SimpleDB Access
At work we use Amazon SimpleDB for our distributed, redundant database; and Amazon supplies an SDK to use in which to access the service. Unfortunately for some, this SDK exposes all of the calls synchronously, with no asynchronous versions of any web service calls. As such, my initial implementation of data access was something like:
IEnumerable<MyData> data1; IEnumerable<MyData> data2; IEnumerable<MyData> data3; IEnumerable<MyData> data4; data1 = GetData1FromSDB(); data2 = GetData2FromSDB(); data3 = GetData3FromSDB(); data4 = GetData4FromSDB(); return DoSomethingCool(data1, data2, data3, data4);
Now that looks readable and straightforward, but it does not scale. Eventually we got to the point where each of the calls to SDB were retrieving significant amounts of data. Enough to cause site slowness when looking at large datasets. Reading about the handy dandy Task Parallel Library, I modified the implementation to this:
Parallel.Invoke(
() => { data1 = GetData1FromSDB(); },
() => { data2 = GetData2FromSDB(); },
() => { data3 = GetData3FromSDB(); },
() => { data4 = GetData4FromSDB(); }
);
Great! Immediate speedup and all was good..for the moment. The data access was parallelized, but the interesting thing about Parallel.Invoke is that it limits itself to the number of processor cores your system contains. It was designed for compute bound tasks, not IO bound ones. Well great, I needed a way to go beyond the number of processor cores in the system, and I played with passing in some ParallelOptions with different parameters to try and mix it up, but settled on this:
var tasks = new Task[] {
Task.Factory.StartNew(() => { data1 = GetData1FromSDB(); }),
Task.Factory.StartNew(() => { data2 = GetData2FromSDB(); }),
Task.Factory.StartNew(() => { data3 = GetData3FromSDB(); }),
Task.Factory.StartNew(() => { data4 = GetData4FromSDB(); })
};
Task.WaitAll(tasks);
This implementation queues up all the tasks in the ThreadPool and goes to town. It's not limited by the number of processor cores, but by the size of the ThreadPool. Nice. But this implementation and the one above share another problem: Even while the tasks are blocked and waiting for data from SDB, it is still holding onto a Thread in the Threadpool. What I really needed was an asynchronous version of the SDB SDK but I don't want to rewrite their entire SDK to do it. After some more research into the Task Parallel Library and its compatibility with the Asynchronous Programming Model, I came up with this implementation:
var tasks = new Action[] {
() => { data1 = GetData1FromSDB(); },
() => { data2 = GetData2FromSDB(); },
() => { data3 = GetData3FromSDB(); },
() => { data4 = GetData4FromSDB(); }
}.Select(a => Task.Factory.FromAsync(a.BeginInvoke, a.EndInvoke, null)).ToArray();
Task.WaitAll(tasks);
See what I did there? I took each data accessor and wrapped it into an Action via a lambda expression. Action's (and Func's) contain built in BeginInvoke and EndInvoke asynchronous versions of their execution. Nice. Now I just needed to batch them up and wait for them all to execute before continuing. I wrapped them into Tasks using Task.Factory.FromAsync and then issued a Task.WaitAll to wait till they finished. I then abstracted the functionality into an extension method:
public static void InvokeAsyncAndWaitAll(this IEnumerable<Action> actions) {
Task.WaitAll(actions.Select(a => Task.Factory.FromAsync(a.BeginInvoke, a.EndInvoke, null)).ToArray());
}
I can create multiple extension methods for each type of Action and Func as well (the Func version would need to return some type of collection). The nice thing about this is that all the data access happens asynchronously (though I have no control over when the thread sleeps during the data access), and data access is parallelized as well. Additionally using the Task Parallel Library lets the code look readable and definitely more maintainable than using some type of sync'd/locked counter for checking when all data accesses complete.
