C# > Asynchronous Programming > Parallel Programming > Dataflow Library (TPL)
Parallel ForEach with Degree of Parallelism using TPL Dataflow
This example uses TPL Dataflow to process a collection of items in parallel, but with a controlled degree of parallelism. This is useful when you want to limit the number of concurrent operations to avoid overloading the system or consuming too many resources. By using a `DegreeOfParallelism` setting, we can tune the performance and resource usage.
Concepts: Controlled Parallelism
Parallelism is beneficial but can be detrimental if uncontrolled. Excessive parallelism can lead to resource contention, context switching overhead, and even system instability. Controlling the degree of parallelism allows you to optimize performance and resource usage for a specific workload and environment.
Code Example: Parallel ForEach with DOP
This code creates an `ActionBlock` with a specified `MaxDegreeOfParallelism`. The `Post` method is used to send each item to the block. The block processes the items concurrently, but the number of concurrent operations is limited by the `MaxDegreeOfParallelism` setting. This prevents the system from being overwhelmed.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public class ParallelForEachExample
{
public static async Task Main(string[] args)
{
int[] data = new int[100];
for (int i = 0; i < data.Length; i++)
{
data[i] = i + 1;
}
int degreeOfParallelism = 4; // Adjust this value as needed
var actionBlock = new ActionBlock<int>(async item =>
{
Console.WriteLine($"Processing item {item} on thread {Task.CurrentId}");
await Task.Delay(100); // Simulate some work
Console.WriteLine($"Item {item} processed");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = degreeOfParallelism });
foreach (var item in data)
{
actionBlock.Post(item);
}
actionBlock.Complete();
await actionBlock.Completion;
Console.WriteLine("Parallel processing complete.");
}
}
Explanation: MaxDegreeOfParallelism
The `MaxDegreeOfParallelism` property of `ExecutionDataflowBlockOptions` controls the maximum number of tasks that can be executing concurrently within the block. Setting it to `1` effectively serializes the execution. A value of `-1` (the default) allows unlimited concurrency, which might not always be desirable.
Real-Life Use Case Section
Consider a web crawler. You want to crawl multiple web pages concurrently, but you also want to limit the number of concurrent requests to avoid overloading the target server or your own network. The `MaxDegreeOfParallelism` setting allows you to control the crawl rate.
Best Practices
Interview Tip
Be prepared to discuss the trade-offs between parallelism and overhead. Explain how controlling the degree of parallelism can improve performance and resource utilization.
When to use them
Use controlled parallelism when you need to process a collection of items concurrently, but you want to limit the resource consumption or avoid overwhelming the system. It's particularly useful for I/O-bound operations, network requests, and CPU-intensive tasks that can benefit from parallelism but need to be managed carefully.
Memory footprint
The memory footprint depends on the size of the collection being processed and the buffering behavior of the `ActionBlock`. Consider using bounded buffers to manage memory usage if necessary.
Alternatives
Alternatives include using `Parallel.ForEach` with `ParallelOptions` to control the degree of parallelism, custom thread management, and Reactive Extensions (Rx). TPL Dataflow provides a more flexible and composable approach in many cases.
Pros
Cons
FAQ
-
How do I determine the optimal value for MaxDegreeOfParallelism?
There's no one-size-fits-all answer. It depends on the nature of the workload, the hardware resources available, and the constraints of the system. Experiment with different values and monitor resource usage to find the best setting. A common starting point is the number of CPU cores. -
What happens if an exception is thrown within the ActionBlock delegate?
The exception will be propagated to the `Completion` task of the `ActionBlock`. You can handle the exception by awaiting the `Completion` task and checking for exceptions. It's important to handle exceptions to prevent the pipeline from crashing.