DPorch is a Python runtime environment for executing & connecting distributed, decentralized data engineering pipelines used in directed graph workflows. Built with Python (via Python.NET) and C#, targeting .NET 9.0.
Its primary features are as follows:
- Directed Graph Workflows: Distributed JSON configurable pipelines that form DAGs or DCGs.
- Python Scripting: Defines pipeline data-processing logic with modular, chained Python scripts
- TCP Messaging: Transmits pipeline output to other workers on a shared network as their pipeline input.
- Serialization: Serializes pipeline input & deserializes output using pickle (planned alternative formats).
- Service Discovery: Resolves TCP IPs & ports through UDP service discovery.
- Task Scheduling: Queues input data for processing using a FIFO scheduling policy.
- .NET 9.0 SDK
- Python 3.7+ - Required for pipeline scripting via pythonnet
- Your Python DLL path (e.g.
C:\Users\Name\AppData\Local\Programs\Python\Python311\python311.dll)
- Your Python DLL path (e.g.
-
Clone and Build: Clone the repository and navigate to its
root directory:git clone https://github.com/SebastianBathrick/DPorch cd DPorch
Restore dependencies and build
DPorch.slnusingdotnet:dotnet restore dotnet build
-
Start Setup Wizard: Navigate to
src\DPorch.CLI\(console application project):cd .\src\DPorch.CLI\
Run
DPorch.CLI.csprojusing dotnet:dotnet run
You'll see the following output:
Initialized: Created new preferences file at C:\Users\Sebastian\AppData\Roaming\DPorch\settings.json Required preferences have not been assigned values You can assign them now using prompts or later using the pref command with the appropriate options Would you like to set required preferences now? [y/n] (y):Press
yandENTERto continue. -
Configure User Preferences: Python 3.7+ DLL: Enter file path (typically in same directory as
python.exe):Please enter Python v3.7+ DLL path: C:\<Python Directory>\python311.dllInput Network Interface: Select the interface where pipelines will send UDP messages and receive TCP messages. Use
ARROW KEYSto navigate andSPACEBARto select. PressENTERwhen done:Select input network interface: > Wi-Fi Loopback Pseudo-Interface 1 Local Area Connection* 9 ...Output Network Interfaces: Select one or more interfaces where pipelines will listen for UDP messages and send TCP messages.
Select output network interfaces: > [X] Wi-Fi [ ] Loopback Pseudo-Interface 1 [ ] Local Area Connection* 9 ...Service Discovery Port: Enter the UDP port all pipelines will send and listen for UDP messages during service/pipeline discovery (default:
5557):Please enter service discovery port (1-65535) (5557): 5557 Saved: Discovery port: 5557 Preferences file setup complete!The application will now exit and now all commands are available to use.
Note
Use prefs --help to change preferences, locate preferences file, or run the setup wizard again.
-
Create Your First DAG: The same steps can be followed using one or two terminal windows on the same machine.
Create a working directory:
mkdir tutorial cd tutorial
Create Pipeline A that will send data (you can also manually create the JSON file without the
initcommand). You can set values in the JSON file or useinitflags (rundporch init --helpfor more flag info).dporch init -n pipeline_a -i 0 -o pipeline_b -s generate_number.py
Create and edit
generate_number.py:import time counter = 0 def step(): global counter counter += 1 time.sleep(1) print(f"Sending: {counter}") return counter
Create a working directory:
mkdir tutorial cd tutorial
Create Pipeline B that will receive data. You can set values in the JSON file or use
initflags (rundporch init --helpfor more flag info).dporch init -n pipeline_b -i 1 -s print_number.py
Create and edit
print_number.py:def step(input_data): number = input_data["pipeline_a"] print(f"Received: {number}")
-
Execute Pipelines
On Machine 1, run Pipeline A:
dporch run pipeline_a.json
On Machine 2, run Pipeline B:
dporch run pipeline_b.json
If using a single machine you can run them in the same terminal by passing both
.jsonfiles to theruncommand:dporch run pipeline_a.json pipeline_b.json
You should now see Pipeline A sending numbers and Pipeline B receiving them. Press
CTRL+Cto stop either pipeline.
Tip
For more complex pipeline topologies and advanced features, start with One Source to Multiple Targets and continue reading the sections below.
Click to expand
DPorch.CLI
- Microsoft.Extensions.DependencyInjection v10.0.0
- Spectre.Console v0.50.0
- Spectre.Console.Cli v0.50.0
DPorch.Runtime
- Microsoft.Extensions.DependencyInjection.Abstractions v10.0.0
- Microsoft.Extensions.DependencyInjection v10.0.0
- NetMQ v4.0.2.2
- Python.Runtime v2.7.9
- pythonnet v3.0.5
DPorch.Core.Tests
- coverlet.collector v6.0.4
- Microsoft.NET.Test.Sdk v17.14.1
- xunit v2.9.3
- xunit.runner.visualstudio v3.1.4
DPorch.Runtime.Tests
- coverlet.collector v6.0.4
- Microsoft.NET.Test.Sdk v17.14.1
- xunit v2.9.3
- xunit.runner.visualstudio v3.1.4
For detailed technical documentation, including threading model, design patterns, and implementation details, see ARCHITECTURE.md.
When starting the runtime environment, the following steps occur:
- Reads a JSON config file that defines a pipeline: the name, number of input sources (pipelines), script execution order, whether to wait for all input, & the names of output targets (pipelines).
- Pipelines broadcast their presence via UDP on a designated port (selected during one-time setup)
- Those that share data exchange h&shakes & connect their TCP push/pull sockets.
- Each script’s top-level runs, & a dedicated module/scope is created for each (global variables).
- Received messages are queued on a dedicated thread for processing (FIFO). Pipelines can wait until receiving data from each pipeline, or use None for any missing data (select in JSON config).
- Each message is deserialized & passed to the first script’s “step” event function for processing.
- A script’s event return value is passed as an argument to the next script’s step event.
- The last script in the execution order returns a value that is serialized & queued for relaying.
- Messages containing output data & GUID (unique sender ID) frames are relayed on a separate thread to all targets.
- A pipeline continues to receive input, process it, & then send its output until the environment closes.
To see a list of available commands, run the following:
PS C:\ dporch --helpThe following output will be displayed:
USAGE:
dporch [OPTIONS] <COMMAND>
EXAMPLES:
dporch init
dporch init --name my_pipeline
dporch init -n my_pipeline -s script.py -i 2 -o target1
dporch run pipeline.json
dporch run pipeline_1.json pipeline_2.json
OPTIONS:
-h, --help Prints help information
COMMANDS:
init Create a new pipeline configuration file
run Execute a pipeline configuration
prefs Manage user preferences. Omit options to view all preferencesTo see more information about a specific command, run the following:
dporch <COMMAND --helpPipeline configurations are JSON files that define a pipeline’s properties. To create one, use the init command.
PS C:\ dporch init
Created: C:\Computer\config.json
The command creates a .json file in the current working directory with the following default configuration:
{
"name": "",
"scripts": [],
"source_pipeline_count": 0,
"target_pipeline_names": []
}- name (string) - The name of the pipeline. This name is used by other pipelines to reference this pipeline in their
target_pipeline_nameslist and appears as a dictionary key when this pipeline sends data to others. This name must be at least three characters long. - scripts - Python script file paths relative to the configuration file directory to execute sequentially in each iteration. Each script must define a
step()function. There must be at least one script file path. - source_pipeline_count - The number of source pipelines this pipeline expects to receive data from. The pipeline will wait for data from all sources before beginning each iteration. This number can be zero or greater.
- target_pipeline_names - Pipeline names to send this pipeline's output data to. The final script's return value will be sent to all targets listed here. There can be zero or more target pipelines.
Python scripts define pipeline behavior at each phase of its lifecycle: startup, iteration, and shutdown.
Note
/examples contains files for each example shown below.
Each Python script requires a top-level function named step with an optional parameter. The following is a minimal valid script:
def step():
passEach script's top-level statements run once when the pipeline starts, after receiving handshakes from its source pipelines. Scripts execute in the order specified in the JSON configuration. After sending handshakes to its target pipelines, the pipeline begins its iteration loop, calling each script's step() function once per iteration in that same order. Each script has its own isolated scope, and top-level global variables maintain state between iterations.
The following script is the only script in a pipeline:
counter = 0
print(f"Initial counter value {counter}")
def step():
global counter
counter += 1
print(f"Counter value this iteration: {counter}")If it were allowed to run three iterations, the output would look like the following. Note how the top-level print statement executes only once, while the step() function's print statement executes on each iteration.
Initial counter value 0
Counter value this iteration: 1
Counter value this iteration: 2
Counter value this iteration: 3
A second script is added to the end of the pipeline with the following code:
def step(input_data):
if input_data % 2 == 0:
print(f"{input_data} is an even number")
else:
print(f"{input_data} is an odd number")To pass data to this second script, the first script is modified to return the value of counter.
counter = 0
print(f"Initial counter value {counter}")
def step():
global counter
counter += 1
print(f"Counter value this iteration: {counter}")
return counterWith both scripts in the pipeline, executing it produces the following output. The counter value from the first script is passed as input_data to the second script, which checks whether it is even or odd.
Initial counter value 0
Counter value this iteration: 1
1 is an odd number
Counter value this iteration: 2
2 is an even number
Counter value this iteration: 3
3 is an odd number
The end() function is an optional lifecycle hook called when the pipeline shuts down, such as when the user sends a keyboard interrupt (CTRL+C). It takes no parameters and is intended for cleanup operations like releasing resources or closing connections. Like step() functions, end() functions execute sequentially in the order scripts are defined in the configuration.
The following is a valid script with an end() function:
def step():
print("Running iteration")
def end():
print("Cleaning up resources")When a pipeline with this script receives a shutdown signal (CTRL+C), it will call the end() function before terminating. If a script does not define an end() function, DPorch will skip it and move to the next script.
The following example demonstrates resource management using the end() function. The script opens a TCP socket connection at startup, uses it during iterations, and closes it during shutdown:
end_close_sock.json
{
"name": "end_close_sock",
"scripts": [
"..\\iteration_rate_limiter.py",
"end_close_sock.py"
],
"source_pipeline_count": 0,
"target_pipeline_names": []
}end_close_sock.py
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("tcpbin.com", 4242)) # Public echo server
def step():
sock.send(b"Hello from DPorch!\n")
response = sock.recv(1024)
print(f"Received: {response.decode('utf-8')}")
return response
def end():
sock.close()
print("Socket closed")The top-level socket connection executes once at startup. The step() function sends and receives data each iteration, and the end() function closes the socket when the pipeline shuts down.
DPorch provides a managed variable called delta_time that tracks the elapsed time (in seconds) since the previous step() call for that script. To use it, declare a top-level variable named delta_time. DPorch will detect this variable and update it before each step() call.
The following script demonstrates basic usage of delta_time:
delta_time = 0.0
def step():
print(f"Time since last step() call: {delta_time} seconds")On the first step() call, delta_time is 0.0. On subsequent calls, it contains the elapsed time since the previous step() execution for that script.
The following example uses delta_time to count seconds by accumulating elapsed time and printing a message each time a full second passes:
delta_time_sec.json
{
"name": "delta_time_sec",
"scripts": [
"delta_time_sec.py"
],
"source_pipeline_count": 0,
"target_pipeline_names": []
}delta_time_sec.py
delta_time = 0.0
elapsed_time = 0.0
sec_passed = 0
def step():
global delta_time, elapsed_time, sec_passed
elapsed_time += delta_time
if (elapsed_time >= 1.0):
sec_passed += 1
elapsed_time = 0
print(f"{sec_passed} second(s) have passed")The script accumulates delta_time each iteration. When the total reaches one second, it increments sec_passed, resets the accumulator, and prints the count.
Pipelines can send output to zero or more targets and receive input from zero or more sources. The following examples demonstrate common topologies.
In this example, pipeline_a sends its output to both pipeline_b and pipeline_c.
pipeline_a.json
{
"name": "pipeline_a",
"scripts": ["make_counter_msg.py"],
"source_pipeline_count": 0,
"target_pipeline_names": [
"pipeline_b",
"pipeline_c"
]
}pipeline_a lists two target pipelines in target_pipeline_names, so the final script's return value is sent to both pipeline_b and pipeline_c. With source_pipeline_count set to 0, it does not wait for incoming data before iterating.
make_counter_msg.py
counter = 0
def step():
global counter
counter += 1
print(f"Sending {counter} to pipeline_b and pipeline_c")
return counterThe step() function increments and returns a counter value each iteration. DPorch sends the return value to all pipelines listed in target_pipeline_names.
pipeline_b.json
{
"name": "pipeline_b",
"scripts": ["print_counter_msg.py"],
"source_pipeline_count": 1,
"target_pipeline_names": []
}pipeline_c.json
{
"name": "pipeline_c",
"scripts": ["print_counter_msg.py"],
"source_pipeline_count": 1,
"target_pipeline_names": []
}Both pipeline_b and pipeline_c have source_pipeline_count set to 1, so each waits for data from one source before iterating. Neither has target pipelines. Both use the same script, print_counter_msg.py.
print_counter_msg.py
def step(input_data):
msg = input_data["pipeline_a"]
print(f"I got a message from pipeline_a: {msg}")DPorch delivers input data as a dictionary keyed by source pipeline name. Both pipeline_b and pipeline_c access the counter value using input_data["pipeline_a"].
When these three pipelines run together, they produce the following output:
pipeline_a output
Sending 1 to pipeline_b and pipeline_c
Sending 2 to pipeline_b and pipeline_c
Sending 3 to pipeline_b and pipeline_c
(continues...)
pipeline_b output
I got a message from pipeline_a: 1
I got a message from pipeline_a: 2
I got a message from pipeline_a: 3
(continues...)
pipeline_c output
I got a message from pipeline_a: 1
I got a message from pipeline_a: 2
I got a message from pipeline_a: 3
(continues...)
Both pipeline_b and pipeline_c receive the same data from pipeline_a and process it independently.
In this example, both pipeline_x and pipeline_y send their outputs to pipeline_z.
pipeline_x.json
{
"name": "pipeline_x",
"scripts": ["generate_random.py"],
"source_pipeline_count": 0,
"target_pipeline_names": ["pipeline_z"]
}generate_random.py
import random
def step():
num = random.randint(1, 100)
print(f"pipeline_x generated: {num}")
return numpipeline_y.json
{
"name": "pipeline_y",
"scripts": ["generate_timestamp.py"],
"source_pipeline_count": 0,
"target_pipeline_names": ["pipeline_z"]
}generate_timestamp.py
import time
def step():
timestamp = int(time.time())
print(f"pipeline_y generated timestamp: {timestamp}")
return timestamppipeline_z.json
{
"name": "pipeline_z",
"scripts": ["combine_inputs.py"],
"source_pipeline_count": 2,
"target_pipeline_names": []
}combine_inputs.py
def step(input_data):
random_num = input_data["pipeline_x"]
timestamp = input_data["pipeline_y"]
print(f"Received random number {random_num} and timestamp {timestamp}")pipeline_z has source_pipeline_count set to 2, so it waits for data from both pipeline_x and pipeline_y before each iteration. The data is accessed by source pipeline name: input_data["pipeline_x"] and input_data["pipeline_y"].
When these three pipelines run together, they produce the following output:
pipeline_x output
pipeline_x generated: 42
pipeline_x generated: 87
pipeline_x generated: 15
(continues...)
pipeline_y output
pipeline_y generated timestamp: 1702393845
pipeline_y generated timestamp: 1702393846
pipeline_y generated timestamp: 1702393847
(continues...)
pipeline_z output
Received random number 42 and timestamp 1702393845
Received random number 87 and timestamp 1702393846
Received random number 15 and timestamp 1702393847
(continues...)
pipeline_z only processes data after receiving input from both sources. DPorch waits for all expected source pipelines before starting each iteration.
In this example, pipeline_a generates numbers and sends them to both pipeline_b and pipeline_c. Each of these pipelines performs a different mathematical operation, then both send their results to pipeline_d, which combines them.
pipeline_a.json
{
"name": "pipeline_a",
"scripts": ["generate_number.py"],
"source_pipeline_count": 0,
"target_pipeline_names": [
"pipeline_b",
"pipeline_c"
]
}generate_number.py
counter = 0
def step():
global counter
counter += 1
print(f"pipeline_a: Sending number {counter}")
return counterpipeline_a generates incrementing numbers and sends each one to both pipeline_b and pipeline_c.
pipeline_b.json
{
"name": "pipeline_b",
"scripts": ["double_number.py"],
"source_pipeline_count": 1,
"target_pipeline_names": ["pipeline_d"]
}double_number.py
def step(input_data):
num = input_data["pipeline_a"]
doubled = num * 2
print(f"pipeline_b: Doubled {num} to {doubled}")
return doubledpipeline_b receives numbers from pipeline_a, doubles them, and sends the result to pipeline_d.
pipeline_c.json
{
"name": "pipeline_c",
"scripts": ["square_number.py"],
"source_pipeline_count": 1,
"target_pipeline_names": ["pipeline_d"]
}square_number.py
def step(input_data):
num = input_data["pipeline_a"]
squared = num ** 2
print(f"pipeline_c: Squared {num} to {squared}")
return squaredpipeline_c receives numbers from pipeline_a, squares them, and sends the result to pipeline_d.
pipeline_d.json
{
"name": "pipeline_d",
"scripts": ["combine_results.py"],
"source_pipeline_count": 2,
"target_pipeline_names": []
}combine_results.py
def step(input_data):
doubled = input_data["pipeline_b"]
squared = input_data["pipeline_c"]
total = doubled + squared
print(f"pipeline_d: Received doubled={doubled} and squared={squared}, sum={total}")pipeline_d waits for data from both pipeline_b and pipeline_c, then combines the results by adding them together.
When these four pipelines run together, they produce the following output:
pipeline_a output
pipeline_a: Sending number 1
pipeline_a: Sending number 2
pipeline_a: Sending number 3
pipeline_a: Sending number 4
(continues...)
pipeline_b output
pipeline_b: Doubled 1 to 2
pipeline_b: Doubled 2 to 4
pipeline_b: Doubled 3 to 6
pipeline_b: Doubled 4 to 8
(continues...)
pipeline_c output
pipeline_c: Squared 1 to 1
pipeline_c: Squared 2 to 4
pipeline_c: Squared 3 to 9
pipeline_c: Squared 4 to 16
(continues...)
pipeline_d output
pipeline_d: Received doubled=2 and squared=1, sum=3
pipeline_d: Received doubled=4 and squared=4, sum=8
pipeline_d: Received doubled=6 and squared=9, sum=15
pipeline_d: Received doubled=8 and squared=16, sum=24
(continues...)
pipeline_d processes data only after receiving input from both pipeline_b and pipeline_c. For example, when the input number is 3:
pipeline_bdoubles it to 6pipeline_csquares it to 9pipeline_dreceives both and adds them: 6 + 9 = 15