🚀 Real-World Example: Asynchronous Pipelines with TPL Dataflow in C#

🚀 Real-World Example: Asynchronous Pipelines with TPL Dataflow in C#

The Task Parallel Library (TPL) Dataflow is ideal for building asynchronous, high-throughput pipelines in .NET—perfect for scenarios like bulk downloading, processing, or transforming data. Properly using asynchronous code ensures scalability, responsiveness, and efficient resource utilization.

In this article, I’ll show you a fully asynchronous TPL Dataflow pipeline for downloading and processing files in C# and explain each part in detail.


🏁 Scenario: Asynchronous Download and Save

Suppose you want to download several files from the internet and save them to disk as quickly and efficiently as possible, using asynchronous I/O at every stage.


💻 Code Example: Fully Asynchronous File Pipeline

using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static async Task Main(string[] args)
    {
        var fileUrls = new List<string>
        {
            "https://example.com/file1.txt",
            "https://example.com/file2.txt",
            "https://example.com/file3.txt"
        };

        using var httpClient = new HttpClient();

        // 1. Download block: asynchronously downloads file contents
        var downloadBlock = new TransformBlock<string, (string url, byte[] data)>(
            async url =>
            {
                Console.WriteLine($"Downloading: {url}");
                var data = await httpClient.GetByteArrayAsync(url);
                return (url, data);
            },
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
        );

        // 2. Save block: asynchronously saves bytes to disk
        var saveBlock = new ActionBlock<(string url, byte[] data)>(
            async file =>
            {
                var fileName = Path.GetFileName(file.url);
                await File.WriteAllBytesAsync(fileName, file.data);
                Console.WriteLine($"Saved: {fileName}");
            },
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
        );

        // Link the two blocks so output of download feeds into save
        downloadBlock.LinkTo(saveBlock, new DataflowLinkOptions { PropagateCompletion = true });

        // 3. Post URLs to the pipeline
        foreach (var url in fileUrls)
            await downloadBlock.SendAsync(url);

        downloadBlock.Complete();             // Signal no more URLs will be posted
        await saveBlock.Completion;           // Wait for all saves to finish

        Console.WriteLine("All files downloaded and saved.");
    }
}        

📝 Detailed Explanation

1️⃣ HttpClient

  • Purpose: Efficiently manages HTTP connections for downloading files.
  • Disposed automatically with using.

2️⃣ TransformBlock (downloadBlock)

  • Type: TransformBlock<string, (string url, byte[] data)>
  • What it does: Accepts a URL, downloads the file asynchronously, and outputs a tuple containing the URL and downloaded byte array.
  • Asynchrony: The delegate is async and uses await httpClient.GetByteArrayAsync(url).
  • Parallelism: MaxDegreeOfParallelism = 4 allows up to 4 downloads at once.

3️⃣ ActionBlock (saveBlock)

  • Type: ActionBlock<(string url, byte[] data)>
  • What it does: Receives the tuple, extracts a filename from the URL, and writes the byte data asynchronously to disk.
  • Asynchrony: Uses await File.WriteAllBytesAsync(...).
  • Parallelism: Also allows up to 4 saves at once.

4️⃣ Linking Blocks

  • downloadBlock.LinkTo(saveBlock, ...) feeds the output of downloads into saving.
  • PropagateCompletion = true ensures the pipeline finishes cleanly when input is done.

5️⃣ Posting and Completion

  • Each URL is sent into the pipeline with SendAsync.
  • downloadBlock.Complete() signals no more items will be posted.
  • await saveBlock.Completion waits until all files are processed and saved.


⚡ Why Asynchronous Pipelines?

  • Resource Efficiency: Non-blocking I/O means threads aren’t wasted waiting for downloads or disk writes.
  • Scalability: Easily scale to dozens or hundreds of files.
  • Responsiveness: The app remains responsive even under heavy load.
  • Clean Code: TPL Dataflow clearly models real-world processing pipelines.


🏆 Takeaway

With TPL Dataflow and async/await, you can build high-throughput, scalable, and efficient pipelines for any data processing scenario in C#. The code above is easily adaptable—add more stages for parsing, transforming, or uploading results as needed.

Ready to modernize your .NET workflows? Try asynchronous TPL Dataflow in your next project!


#dotnet #csharp #TPL #TPLDataflow #async #asynchronous #parallelprogramming #softwaredevelopment #cloud


Great article! Looking forward to exploring the Task Parallel Library and TPL Dataflow for optimizing .NET applications. Will definitely give it a read!

Like
Reply

To view or add a comment, sign in

More articles by Mohammed Rehan Javed

Others also viewed

Explore content categories