Shared framework library for all MDA services.
This repository contains the common contracts, worker framework, messaging helpers, storage utilities, and task tracking infrastructure used throughout the MDA platform.
Every pipeline service depends on mda-common.
The goal of mda-common is to ensure every service follows the same conventions for:
- Job contracts
- RabbitMQ messaging
- Redis progress updates
- Retry handling
- Parameter validation
- Error handling
- Pipeline forwarding
- Storage access
- Monitoring integration
A service developer should only need to focus on business logic.
Everything else should be provided by the framework.
mda-common/
├── pyproject.toml
├── setup.py
├── CHANGELOG.md
│
└── mda_common/
├── __init__.py
├── config.py
├── exceptions.py
├── models.py
├── queue.py
├── storage.py
├── task_tracker.py
└── worker.py
Git dependency:
mda-common @ git+https://github.com/your-org/mda-common.git@mainDockerfile:
ARG COMMON_VERSION=main
RUN pip install \
"mda-common @ git+https://github.com/your-org/mda-common.git@${COMMON_VERSION}"Admin Backend
│
▼
Stage A
│
▼
Stage B
│
▼
Stage C
│
▼
Stage D
Each stage:
- Consumes a RabbitMQ queue
- Performs work
- Updates the job contract
- Publishes the updated contract to the next queue
The backend does not participate in forwarding jobs.
The backend only:
- Creates jobs
- Monitors progress
- Stores job state
- Pushes WebSocket updates
All pipeline stages exchange a common job contract.
Example:
{
"job_id": "9c98a8e2-13d4-4d7c-a2c5-fda7e3bdb7d1",
"entry_stage": "detect",
"exit_stage": "predict",
"retry_count": 0,
"params": {
"image_id": "img_123"
}
}Unique job identifier.
{
"job_id": "uuid"
}First stage that should execute.
{
"entry_stage": "detect"
}Used when starting a pipeline from an intermediate stage.
Last stage that should execute.
{
"exit_stage": "predict"
}When a worker's stage_name matches exit_stage, processing stops after completion.
No further forwarding occurs.
Framework-managed retry counter.
{
"retry_count": 0
}Reset to zero whenever a job successfully progresses to the next stage.
Workflow state shared between stages.
{
"params": {
"image_id": "img_123",
"detection_run_id": "det_456"
}
}The contract should contain references, IDs, and state required by downstream services.
Large payloads should not be placed into the contract.
Instead:
- Store files in MinIO/S3
- Store structured results in Postgres
- Pass IDs and references through the contract
Example:
{
"params": {
"image_id": "img_123",
"classification_run_id": "cls_789"
}
}All services inherit from BaseWorker.
Example:
from mda_common.worker import BaseWorker
class DetectionWorker(BaseWorker):
stage_name = "detect"
input_queue = "raw_images"
output_queue = "detections"
required_params = {
"image_id": validate_image_exists,
}
output_params = [
"detection_run_id",
]
async def process(
self,
image_id,
confidence_threshold=0.5,
**extras,
):
run_id = create_detection_run(
image_id
)
return {
"detection_run_id": run_id,
}Services declare required inputs.
Example:
required_params = {
"image_id": validate_image_exists,
"model_id": validate_model_exists,
}Validation occurs before processing starts.
Validators receive a single value.
Example:
def validate_image_exists(image_id):
image = lookup_image(image_id)
if not image:
raise NotFoundError(
f"Image {image_id} not found"
)
return Truefrom mda_common.worker import requiredExample:
required_params = {
"image_id": required,
}Checks:
- Value exists
- String values are not empty
The following exceptions are treated as non-retryable:
MissingParamError
InvalidParamError
NotFoundError
ValidationErrorBehavior:
Reject Job
Publish FAILED event
Do Not Retry
Each service declares expected outputs.
Example:
output_params = [
"detection_run_id",
]process() must return a dictionary.
Example:
return {
"detection_run_id": run_id,
}Framework verifies:
set(returned_keys) == set(output_params)Example:
output_params = [
"detection_run_id",
]Valid:
return {
"detection_run_id": run_id,
}Invalid:
return {
"wrong_key": run_id,
}After validation:
envelope["params"].update(
contract_updates
)The updated contract is forwarded to the next stage.
Processing failures are retried.
Default:
max_retries = 2Execution count:
Initial Attempt
Retry 1
Retry 2
Fail
Total attempts: 3
Examples:
Network outage
GPU OOM
Temporary API failure
Database timeout
Examples:
Missing image_id
Invalid task_id
Referenced object does not exist
These are contract failures.
Retrying will not help.
TaskTracker provides service progress reporting.
Durable events:
started
completed
failed
skipped
Published to:
job_events
Consumed by:
admin-backend
Non-durable events:
progress
Used for:
Live progress bars
Streaming logs
Realtime UI updates
self.tracker.started()
self.tracker.progress(25)
self.tracker.progress(50)
self.tracker.completed(
response={
"detection_run_id": run_id,
}
)queue.py provides common RabbitMQ helpers.
Example:
from mda_common.queue import publishPublish:
await publish(
"detections",
payload,
)Consume:
await consume(
"raw_images",
callback,
)storage.py provides a common S3-compatible interface.
Works with:
- MinIO (local development)
- AWS S3 (production)
Example:
from mda_common.storage import upload
from mda_common.storage import downloadUpload:
upload(
bucket,
key,
data,
)Download:
data = download(
bucket,
key,
)Generate URL:
url = presigned_url(
bucket,
key,
)A service should:
- Validate required inputs
- Execute business logic
- Store large outputs externally
- Return only workflow references
- Never manually forward jobs
- Never manually publish status events
A service should not:
- Implement retry logic
- Implement queue routing
- Manage RabbitMQ connections
- Manage Redis connections
- Duplicate framework functionality
Semantic versioning is used.
Example:
0.1.0
0.2.0
0.3.0
Breaking changes to framework contracts require a version bump.
Examples:
- Job contract changes
- BaseWorker API changes
- TaskTracker API changes
The framework exists so that service authors can focus exclusively on business logic.
A well-written service should primarily define:
required_params
output_params
process()Everything else should be handled by mda-common.