DataflowBuilder simplifies the creation of dataflow pipelines, making it easier to build, manage, and test complex data processing workflows. With its fluent API, you can quickly set up pipelines that handle various data processing tasks efficiently.
One of the key features of DataflowBuilder is its support for parallel asynchronous tasks executions.
By leveraging dataflow blocks, you can process data concurrently, making efficient use of system resources and improving the performance of your applications. This is especially useful for CPU-intensive or I/O-bound operations where tasks can benefit from parallelism.
- Type safe Builder
- Process huge data effectively without memory leak
- Implement any async or synchronous operations like access to database, api call or any cloud resource call
- Multi-source extension to implement data streaming join
- Extensible: add your own block extensions
- FromSource => Source Block
- Process and ProcessAsync => Input/Output Function block (T=>T ou T => U)
- Batch => Batch input elements (T => T[])
- ToTarget or TaTargetAsynbc => Final block action
To use DataflowBuilder in your project, install it via NuGet Package Manager
DotnetKit.DataflowBuilder NUGET
Install-Package DotnetKit.DataflowBuilderOr using the .NET CLI:
dotnet add package DotnetKit.DataflowBuilder- Define typed item source and build another blocks with fluent API builder
- Build the pipeline
- Send item
blocks could transform, enrich, group each data (item) sent to the pipeline
flowchart LR
Source --> Process1[Enrich] --> Process2[Transform] --> Target
flowchart LR
Source --> Process1[Enrich] --> Process2[Parallel processing] --> Store1
Process2[Parallel processing] --> Store2
Process2[Parallel processing] --> Store3
Store1 --> Target
Store2 --> Target
Store3 --> Target
Here is a simple example to get you started with DataflowBuilder.
Create a Pipeline:
using DotnetKit.DataflowBuilder;
using System.Threading.Tasks;
public async Task RunPipeline()
{
// build the pipeline
var pipeline = DataFlowPipelineBuilder.FromSource<int>()
.Process(a => a * 2)
.ToTarget(a =>
{
Console.WriteLine(a);
})
.Build();
// run the pipeline
for(var i=1;i<=5;i++)
{
await pipeline.SendAsync(1);
}
await pipeline.CompleteAsync();
}Result:
2
4
6
8
10DataflowBuilder supports advanced operations like batching and grouping.
var pipeline = DataFlowPipelineBuilder.FromSource<int>()
.Batch(5)
.ToTarget(batch =>
{
Console.WriteLine($"Batch received: {string.Join(",", batch)}");
})
.Build();
for (int i = 1; i <= 10; i++)
{
await pipeline.SendAsync(i);
}
await pipeline.CompleteAsync();var pipeline = DataFlowPipelineBuilder.FromSource<char[]>()
.ProcessMany(chars => chars.GroupBy(c => c).Select(g => g.Key))
.ToTarget(c =>
{
Console.Write(c);
})
.Build();
await pipeline.SendAsync("hello world".ToCharArray());
await pipeline.CompleteAsync(); var expected = new List<string>() { $"HelloWorld-10", $"PingPong-8" };
var result = new List<string>();
var pipeline = DataFlowPipelineBuilder.FromSources<string, int>()
.Process(items => $"{items.Item1}-{items.Item2}")
.ToTargetAsync(item =>
{
result.Add(item);
return Task.CompletedTask;
})
.Build();
//Order of sending is important for sources
await pipeline.Send1Async("HelloWorld");
await pipeline.Send1Async("PingPong");
await pipeline.Send2Async("HelloWorld".Length);
await pipeline.Send2Async("PingPong".Length);
await pipeline.CompleteAsync();
result.Should().BeEquivalentTo(expected);flowchart TD
A[Sensor item] -->|Process| B(Enriched sensor)
B --> |batch| C(process each 1000 items with 3 // tasks )
C -->|processing task1| D[Bulk insert 1000 sensors]
C -->|processing task2| E[Bulk insert 1000 sensors]
C -->|processing task3| F[Bulk insert 1000 sensors]
D --> |task1 processed| G[Target log event]
E --> |task2 processed| G[Target ]
F --> |task3 processed| G[Publish notification]
//Configuration stage (DI, startup process, or specific lifetime )
//Building sensor bulk insertion pipeline
var pipeline = DataFlowPipelineBuilder.FromSource<SensorEntity>()
.Process(sensor => sensor.EnrichAsync())
.Batch(1000)
.ProcessAsync(async enrichedSensors => {
await mongoDBClient.BulkInsertAsync(enrichedSensors)
}, maxDegreeOfParallelism: 3)
.ToTargetAsync(enrichedSensors =>
{
await servicebusDomainTopic.PublishCreatedSensorsNotificationAsync();
})
.Build();
//Execution stage (API or Event trigger )
//it could be billion of sensors
await foreach(var sensorPage in GetAllSensorsAsync())
{
await _pipeline.SendAsync(sensor);
}
await _pipeline.CompleteAsync();Contributions are welcome! Feel free to open issues or submit pull requests.
This project is licensed under the MIT License.