Skip to content

SebastianBathrick/DPorch

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

36 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DPorch

C# .NET 9 Python 3.7+ License: MIT

DPorch is a Python runtime environment for executing & connecting distributed, decentralized data engineering pipelines used in directed graph workflows. Built with Python (via Python.NET) and C#, targeting .NET 9.0.

Highlights

Its primary features are as follows:

  • Directed Graph Workflows: Distributed JSON configurable pipelines that form DAGs or DCGs.
  • Python Scripting: Defines pipeline data-processing logic with modular, chained Python scripts
  • TCP Messaging: Transmits pipeline output to other workers on a shared network as their pipeline input.
  • Serialization: Serializes pipeline input & deserializes output using pickle (planned alternative formats).
  • Service Discovery: Resolves TCP IPs & ports through UDP service discovery.
  • Task Scheduling: Queues input data for processing using a FIFO scheduling policy.

Prerequisites

  • .NET 9.0 SDK
  • Python 3.7+ - Required for pipeline scripting via pythonnet
    • Your Python DLL path (e.g. C:\Users\Name\AppData\Local\Programs\Python\Python311\python311.dll)

Quick Start Guide

  1. Clone and Build: Clone the repository and navigate to its root directory:

    git clone https://github.com/SebastianBathrick/DPorch
    cd DPorch

    Restore dependencies and build DPorch.sln using dotnet:

    dotnet restore
    dotnet build
  2. Start Setup Wizard: Navigate to src\DPorch.CLI\ (console application project):

    cd .\src\DPorch.CLI\

    Run DPorch.CLI.csproj using dotnet:

    dotnet run

    You'll see the following output:

    Initialized: Created new preferences file at C:\Users\Sebastian\AppData\Roaming\DPorch\settings.json
    
    Required preferences have not been assigned values
    You can assign them now using prompts or later using the pref command with the appropriate options
    
    Would you like to set required preferences now? [y/n] (y):
    

    Press y and ENTER to continue.

  3. Configure User Preferences: Python 3.7+ DLL: Enter file path (typically in same directory as python.exe):

    Please enter Python v3.7+ DLL path: C:\<Python Directory>\python311.dll
    

    Input Network Interface: Select the interface where pipelines will send UDP messages and receive TCP messages. Use ARROW KEYS to navigate and SPACEBAR to select. Press ENTER when done:

    Select input network interface:
    > Wi-Fi
    Loopback Pseudo-Interface 1
    Local Area Connection* 9
    ...
    

    Output Network Interfaces: Select one or more interfaces where pipelines will listen for UDP messages and send TCP messages.

    Select output network interfaces:
    > [X] Wi-Fi
    [ ] Loopback Pseudo-Interface 1
    [ ] Local Area Connection* 9
    ...
    

    Service Discovery Port: Enter the UDP port all pipelines will send and listen for UDP messages during service/pipeline discovery (default: 5557):

    Please enter service discovery port (1-65535) (5557): 5557
    Saved: Discovery port: 5557
    Preferences file setup complete!
    

    The application will now exit and now all commands are available to use.

Note

Use prefs --help to change preferences, locate preferences file, or run the setup wizard again.

  1. Create Your First DAG: The same steps can be followed using one or two terminal windows on the same machine.

    Machine 1: Create Pipeline A

    Create a working directory:

    mkdir tutorial
    cd tutorial

    Create Pipeline A that will send data (you can also manually create the JSON file without the init command). You can set values in the JSON file or use init flags (run dporch init --help for more flag info).

    dporch init -n pipeline_a -i 0 -o pipeline_b -s generate_number.py

    Create and edit generate_number.py:

    import time
    counter = 0
    
    def step():
        global counter
        counter += 1
        time.sleep(1)
        print(f"Sending: {counter}")
        return counter

    Machine 2: Create Pipeline B

    Create a working directory:

    mkdir tutorial
    cd tutorial

    Create Pipeline B that will receive data. You can set values in the JSON file or use init flags (run dporch init --help for more flag info).

    dporch init -n pipeline_b -i 1 -s print_number.py

    Create and edit print_number.py:

    def step(input_data):
        number = input_data["pipeline_a"]
        print(f"Received: {number}")
  2. Execute Pipelines

    On Machine 1, run Pipeline A:

    dporch run pipeline_a.json

    On Machine 2, run Pipeline B:

    dporch run pipeline_b.json

    If using a single machine you can run them in the same terminal by passing both .json files to the run command:

    dporch run pipeline_a.json pipeline_b.json

    You should now see Pipeline A sending numbers and Pipeline B receiving them. Press CTRL+C to stop either pipeline.

Tip

For more complex pipeline topologies and advanced features, start with One Source to Multiple Targets and continue reading the sections below.

Documentation

Table of Contents

Click to expand

NuGet Packages

DPorch.CLI
DPorch.Runtime
DPorch.Core.Tests
DPorch.Runtime.Tests

Architecture

For detailed technical documentation, including threading model, design patterns, and implementation details, see ARCHITECTURE.md.

Runtime Behavior

When starting the runtime environment, the following steps occur:

  1. Reads a JSON config file that defines a pipeline: the name, number of input sources (pipelines), script execution order, whether to wait for all input, & the names of output targets (pipelines).
  2. Pipelines broadcast their presence via UDP on a designated port (selected during one-time setup)
  3. Those that share data exchange h&shakes & connect their TCP push/pull sockets.
  4. Each script’s top-level runs, & a dedicated module/scope is created for each (global variables).
  5. Received messages are queued on a dedicated thread for processing (FIFO). Pipelines can wait until receiving data from each pipeline, or use None for any missing data (select in JSON config).
  6. Each message is deserialized & passed to the first script’s “step” event function for processing.
  7. A script’s event return value is passed as an argument to the next script’s step event.
  8. The last script in the execution order returns a value that is serialized & queued for relaying.
  9. Messages containing output data & GUID (unique sender ID) frames are relayed on a separate thread to all targets.
  10. A pipeline continues to receive input, process it, & then send its output until the environment closes.

Command Line Interface

To see a list of available commands, run the following:

PS C:\ dporch --help

The following output will be displayed:

USAGE:
    dporch [OPTIONS] <COMMAND>

EXAMPLES:
    dporch init
    dporch init --name my_pipeline
    dporch init -n my_pipeline -s script.py -i 2 -o target1
    dporch run pipeline.json
    dporch run pipeline_1.json pipeline_2.json

OPTIONS:
    -h, --help    Prints help information

COMMANDS:
    init     Create a new pipeline configuration file
    run      Execute a pipeline configuration
    prefs    Manage user preferences. Omit options to view all preferences

To see more information about a specific command, run the following:

dporch <COMMAND --help

Pipeline Configurations

Pipeline configurations are JSON files that define a pipeline’s properties. To create one, use the init command.

PS C:\ dporch init
Created: C:\Computer\config.json

The command creates a .json file in the current working directory with the following default configuration:

{
  "name": "",
  "scripts": [],
  "source_pipeline_count": 0,
  "target_pipeline_names": []
}

Pipeline Properties

  • name (string) - The name of the pipeline. This name is used by other pipelines to reference this pipeline in their target_pipeline_names list and appears as a dictionary key when this pipeline sends data to others. This name must be at least three characters long.
  • scripts - Python script file paths relative to the configuration file directory to execute sequentially in each iteration. Each script must define a step() function. There must be at least one script file path.
  • source_pipeline_count - The number of source pipelines this pipeline expects to receive data from. The pipeline will wait for data from all sources before beginning each iteration. This number can be zero or greater.
  • target_pipeline_names - Pipeline names to send this pipeline's output data to. The final script's return value will be sent to all targets listed here. There can be zero or more target pipelines.

Python Scripting

Python scripts define pipeline behavior at each phase of its lifecycle: startup, iteration, and shutdown.

Note

/examples contains files for each example shown below.

Step Function

Each Python script requires a top-level function named step with an optional parameter. The following is a minimal valid script:

def step():
    pass

Each script's top-level statements run once when the pipeline starts, after receiving handshakes from its source pipelines. Scripts execute in the order specified in the JSON configuration. After sending handshakes to its target pipelines, the pipeline begins its iteration loop, calling each script's step() function once per iteration in that same order. Each script has its own isolated scope, and top-level global variables maintain state between iterations.

The following script is the only script in a pipeline:

counter = 0
print(f"Initial counter value {counter}")

def step():
    global counter
    counter += 1
    print(f"Counter value this iteration: {counter}")

If it were allowed to run three iterations, the output would look like the following. Note how the top-level print statement executes only once, while the step() function's print statement executes on each iteration.

Initial counter value 0
Counter value this iteration: 1
Counter value this iteration: 2
Counter value this iteration: 3

A second script is added to the end of the pipeline with the following code:

def step(input_data):
    if input_data % 2 == 0:
        print(f"{input_data} is an even number")
    else:
        print(f"{input_data} is an odd number")

To pass data to this second script, the first script is modified to return the value of counter.

counter = 0
print(f"Initial counter value {counter}")

def step():
    global counter
    counter += 1
    print(f"Counter value this iteration: {counter}")
    return counter

With both scripts in the pipeline, executing it produces the following output. The counter value from the first script is passed as input_data to the second script, which checks whether it is even or odd.

Initial counter value 0
Counter value this iteration: 1
1 is an odd number
Counter value this iteration: 2
2 is an even number
Counter value this iteration: 3
3 is an odd number

End Function

The end() function is an optional lifecycle hook called when the pipeline shuts down, such as when the user sends a keyboard interrupt (CTRL+C). It takes no parameters and is intended for cleanup operations like releasing resources or closing connections. Like step() functions, end() functions execute sequentially in the order scripts are defined in the configuration.

The following is a valid script with an end() function:

def step():
    print("Running iteration")

def end():
    print("Cleaning up resources")

When a pipeline with this script receives a shutdown signal (CTRL+C), it will call the end() function before terminating. If a script does not define an end() function, DPorch will skip it and move to the next script.

The following example demonstrates resource management using the end() function. The script opens a TCP socket connection at startup, uses it during iterations, and closes it during shutdown:

end_close_sock.json

{
  "name": "end_close_sock",
  "scripts": [
    "..\\iteration_rate_limiter.py",
    "end_close_sock.py"
  ],
  "source_pipeline_count": 0,
  "target_pipeline_names": []
}

end_close_sock.py

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("tcpbin.com", 4242))  # Public echo server

def step():
    sock.send(b"Hello from DPorch!\n")
    response = sock.recv(1024)
    print(f"Received: {response.decode('utf-8')}")
    return response

def end():
    sock.close()
    print("Socket closed")

The top-level socket connection executes once at startup. The step() function sends and receives data each iteration, and the end() function closes the socket when the pipeline shuts down.

Delta Time

DPorch provides a managed variable called delta_time that tracks the elapsed time (in seconds) since the previous step() call for that script. To use it, declare a top-level variable named delta_time. DPorch will detect this variable and update it before each step() call.

The following script demonstrates basic usage of delta_time:

delta_time = 0.0

def step():
    print(f"Time since last step() call: {delta_time} seconds")

On the first step() call, delta_time is 0.0. On subsequent calls, it contains the elapsed time since the previous step() execution for that script.

The following example uses delta_time to count seconds by accumulating elapsed time and printing a message each time a full second passes:

delta_time_sec.json

{
  "name": "delta_time_sec",
  "scripts": [
    "delta_time_sec.py"
  ],
  "source_pipeline_count": 0,
  "target_pipeline_names": []
}

delta_time_sec.py

delta_time = 0.0
elapsed_time = 0.0
sec_passed = 0

def step():
    global delta_time, elapsed_time, sec_passed
    elapsed_time += delta_time

    if (elapsed_time >= 1.0):
        sec_passed += 1
        elapsed_time = 0
        print(f"{sec_passed} second(s) have passed")

The script accumulates delta_time each iteration. When the total reaches one second, it increments sec_passed, resets the accumulator, and prints the count.

Pipeline Communication

Pipelines can send output to zero or more targets and receive input from zero or more sources. The following examples demonstrate common topologies.

One Source to Multiple Targets

In this example, pipeline_a sends its output to both pipeline_b and pipeline_c.

Diagram showing one node labeled pipeline_a with one arrow pointing at pipeline_b and another pointing at pipeline_c

pipeline_a.json

{
  "name": "pipeline_a",
  "scripts": ["make_counter_msg.py"],
  "source_pipeline_count": 0,
  "target_pipeline_names": [
     "pipeline_b",
     "pipeline_c"
   ]
}

pipeline_a lists two target pipelines in target_pipeline_names, so the final script's return value is sent to both pipeline_b and pipeline_c. With source_pipeline_count set to 0, it does not wait for incoming data before iterating.

make_counter_msg.py

counter = 0

def step():
    global counter
    counter += 1
    print(f"Sending {counter} to pipeline_b and pipeline_c")
    return counter

The step() function increments and returns a counter value each iteration. DPorch sends the return value to all pipelines listed in target_pipeline_names.

pipeline_b.json

{
  "name": "pipeline_b",
  "scripts": ["print_counter_msg.py"],
  "source_pipeline_count": 1,
  "target_pipeline_names": []
}

pipeline_c.json

{
  "name": "pipeline_c",
  "scripts": ["print_counter_msg.py"],
  "source_pipeline_count": 1,
  "target_pipeline_names": []
}

Both pipeline_b and pipeline_c have source_pipeline_count set to 1, so each waits for data from one source before iterating. Neither has target pipelines. Both use the same script, print_counter_msg.py.

print_counter_msg.py

def step(input_data):
    msg = input_data["pipeline_a"]
    print(f"I got a message from pipeline_a: {msg}")

DPorch delivers input data as a dictionary keyed by source pipeline name. Both pipeline_b and pipeline_c access the counter value using input_data["pipeline_a"].

When these three pipelines run together, they produce the following output:

pipeline_a output

Sending 1 to pipeline_b and pipeline_c
Sending 2 to pipeline_b and pipeline_c
Sending 3 to pipeline_b and pipeline_c
(continues...)

pipeline_b output

I got a message from pipeline_a: 1
I got a message from pipeline_a: 2
I got a message from pipeline_a: 3
(continues...)

pipeline_c output

I got a message from pipeline_a: 1
I got a message from pipeline_a: 2
I got a message from pipeline_a: 3
(continues...)

Both pipeline_b and pipeline_c receive the same data from pipeline_a and process it independently.

Multiple Sources to One Target

In this example, both pipeline_x and pipeline_y send their outputs to pipeline_z.

Diagram showing a node labeled pipeline_x and another labeled pipeline_y both pointing at a third node labeled pipeline_z

pipeline_x.json

{
  "name": "pipeline_x",
  "scripts": ["generate_random.py"],
  "source_pipeline_count": 0,
  "target_pipeline_names": ["pipeline_z"]
}

generate_random.py

import random

def step():
    num = random.randint(1, 100)
    print(f"pipeline_x generated: {num}")
    return num

pipeline_y.json

{
  "name": "pipeline_y",
  "scripts": ["generate_timestamp.py"],
  "source_pipeline_count": 0,
  "target_pipeline_names": ["pipeline_z"]
}

generate_timestamp.py

import time

def step():
    timestamp = int(time.time())
    print(f"pipeline_y generated timestamp: {timestamp}")
    return timestamp

pipeline_z.json

{
  "name": "pipeline_z",
  "scripts": ["combine_inputs.py"],
  "source_pipeline_count": 2,
  "target_pipeline_names": []
}

combine_inputs.py

def step(input_data):
    random_num = input_data["pipeline_x"]
    timestamp = input_data["pipeline_y"]
    print(f"Received random number {random_num} and timestamp {timestamp}")

pipeline_z has source_pipeline_count set to 2, so it waits for data from both pipeline_x and pipeline_y before each iteration. The data is accessed by source pipeline name: input_data["pipeline_x"] and input_data["pipeline_y"].

When these three pipelines run together, they produce the following output:

pipeline_x output

pipeline_x generated: 42
pipeline_x generated: 87
pipeline_x generated: 15
(continues...)

pipeline_y output

pipeline_y generated timestamp: 1702393845
pipeline_y generated timestamp: 1702393846
pipeline_y generated timestamp: 1702393847
(continues...)

pipeline_z output

Received random number 42 and timestamp 1702393845
Received random number 87 and timestamp 1702393846
Received random number 15 and timestamp 1702393847
(continues...)

pipeline_z only processes data after receiving input from both sources. DPorch waits for all expected source pipelines before starting each iteration.

Diamond Pipeline Topology Example

In this example, pipeline_a generates numbers and sends them to both pipeline_b and pipeline_c. Each of these pipelines performs a different mathematical operation, then both send their results to pipeline_d, which combines them.

Diagram showing pipeline_a pointing to both pipeline_b and pipeline_c, which both point to pipeline_d

pipeline_a - Number Generator

pipeline_a.json

{
  "name": "pipeline_a",
  "scripts": ["generate_number.py"],
  "source_pipeline_count": 0,
  "target_pipeline_names": [
     "pipeline_b",
     "pipeline_c"
   ]
}

generate_number.py

counter = 0

def step():
    global counter
    counter += 1
    print(f"pipeline_a: Sending number {counter}")
    return counter

pipeline_a generates incrementing numbers and sends each one to both pipeline_b and pipeline_c.

pipeline_b - Doubler

pipeline_b.json

{
  "name": "pipeline_b",
  "scripts": ["double_number.py"],
  "source_pipeline_count": 1,
  "target_pipeline_names": ["pipeline_d"]
}

double_number.py

def step(input_data):
    num = input_data["pipeline_a"]
    doubled = num * 2
    print(f"pipeline_b: Doubled {num} to {doubled}")
    return doubled

pipeline_b receives numbers from pipeline_a, doubles them, and sends the result to pipeline_d.

pipeline_c - Squarer

pipeline_c.json

{
  "name": "pipeline_c",
  "scripts": ["square_number.py"],
  "source_pipeline_count": 1,
  "target_pipeline_names": ["pipeline_d"]
}

square_number.py

def step(input_data):
    num = input_data["pipeline_a"]
    squared = num ** 2
    print(f"pipeline_c: Squared {num} to {squared}")
    return squared

pipeline_c receives numbers from pipeline_a, squares them, and sends the result to pipeline_d.

pipeline_d - Combiner

pipeline_d.json

{
  "name": "pipeline_d",
  "scripts": ["combine_results.py"],
  "source_pipeline_count": 2,
  "target_pipeline_names": []
}

combine_results.py

def step(input_data):
    doubled = input_data["pipeline_b"]
    squared = input_data["pipeline_c"]
    total = doubled + squared
    print(f"pipeline_d: Received doubled={doubled} and squared={squared}, sum={total}")

pipeline_d waits for data from both pipeline_b and pipeline_c, then combines the results by adding them together.

Output

When these four pipelines run together, they produce the following output:

pipeline_a output

pipeline_a: Sending number 1
pipeline_a: Sending number 2
pipeline_a: Sending number 3
pipeline_a: Sending number 4
(continues...)

pipeline_b output

pipeline_b: Doubled 1 to 2
pipeline_b: Doubled 2 to 4
pipeline_b: Doubled 3 to 6
pipeline_b: Doubled 4 to 8
(continues...)

pipeline_c output

pipeline_c: Squared 1 to 1
pipeline_c: Squared 2 to 4
pipeline_c: Squared 3 to 9
pipeline_c: Squared 4 to 16
(continues...)

pipeline_d output

pipeline_d: Received doubled=2 and squared=1, sum=3
pipeline_d: Received doubled=4 and squared=4, sum=8
pipeline_d: Received doubled=6 and squared=9, sum=15
pipeline_d: Received doubled=8 and squared=16, sum=24
(continues...)

pipeline_d processes data only after receiving input from both pipeline_b and pipeline_c. For example, when the input number is 3:

  • pipeline_b doubles it to 6
  • pipeline_c squares it to 9
  • pipeline_d receives both and adds them: 6 + 9 = 15

Contributors