Skip to content

azista-image-processing/mda_common

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

17 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

MDA Common (mda-common)

Shared framework library for all MDA services.

This repository contains the common contracts, worker framework, messaging helpers, storage utilities, and task tracking infrastructure used throughout the MDA platform.

Every pipeline service depends on mda-common.


Purpose

The goal of mda-common is to ensure every service follows the same conventions for:

  • Job contracts
  • RabbitMQ messaging
  • Redis progress updates
  • Retry handling
  • Parameter validation
  • Error handling
  • Pipeline forwarding
  • Storage access
  • Monitoring integration

A service developer should only need to focus on business logic.

Everything else should be provided by the framework.


Repository Structure

mda-common/
├── pyproject.toml
├── setup.py
├── CHANGELOG.md
│
└── mda_common/
    ├── __init__.py
    ├── config.py
    ├── exceptions.py
    ├── models.py
    ├── queue.py
    ├── storage.py
    ├── task_tracker.py
    └── worker.py

Installation

Git dependency:

mda-common @ git+https://github.com/your-org/mda-common.git@main

Dockerfile:

ARG COMMON_VERSION=main

RUN pip install \
    "mda-common @ git+https://github.com/your-org/mda-common.git@${COMMON_VERSION}"

Core Concepts

Pipeline Architecture

Admin Backend
      │
      ▼
Stage A
      │
      ▼
Stage B
      │
      ▼
Stage C
      │
      ▼
Stage D

Each stage:

  • Consumes a RabbitMQ queue
  • Performs work
  • Updates the job contract
  • Publishes the updated contract to the next queue

The backend does not participate in forwarding jobs.

The backend only:

  • Creates jobs
  • Monitors progress
  • Stores job state
  • Pushes WebSocket updates

Job Contract

All pipeline stages exchange a common job contract.

Example:

{
  "job_id": "9c98a8e2-13d4-4d7c-a2c5-fda7e3bdb7d1",

  "entry_stage": "detect",
  "exit_stage": "predict",

  "retry_count": 0,

  "params": {
    "image_id": "img_123"
  }
}

Fields

job_id

Unique job identifier.

{
  "job_id": "uuid"
}

entry_stage

First stage that should execute.

{
  "entry_stage": "detect"
}

Used when starting a pipeline from an intermediate stage.


exit_stage

Last stage that should execute.

{
  "exit_stage": "predict"
}

When a worker's stage_name matches exit_stage, processing stops after completion.

No further forwarding occurs.


retry_count

Framework-managed retry counter.

{
  "retry_count": 0
}

Reset to zero whenever a job successfully progresses to the next stage.


params

Workflow state shared between stages.

{
  "params": {
    "image_id": "img_123",
    "detection_run_id": "det_456"
  }
}

The contract should contain references, IDs, and state required by downstream services.

Large payloads should not be placed into the contract.

Instead:

  • Store files in MinIO/S3
  • Store structured results in Postgres
  • Pass IDs and references through the contract

Example:

{
  "params": {
    "image_id": "img_123",
    "classification_run_id": "cls_789"
  }
}

BaseWorker

All services inherit from BaseWorker.

Example:

from mda_common.worker import BaseWorker


class DetectionWorker(BaseWorker):

    stage_name = "detect"

    input_queue = "raw_images"

    output_queue = "detections"

    required_params = {
        "image_id": validate_image_exists,
    }

    output_params = [
        "detection_run_id",
    ]

    async def process(
        self,
        image_id,
        confidence_threshold=0.5,
        **extras,
    ):

        run_id = create_detection_run(
            image_id
        )

        return {
            "detection_run_id": run_id,
        }

Parameter Validation

Services declare required inputs.

Example:

required_params = {
    "image_id": validate_image_exists,
    "model_id": validate_model_exists,
}

Validation occurs before processing starts.


Validators

Validators receive a single value.

Example:

def validate_image_exists(image_id):

    image = lookup_image(image_id)

    if not image:
        raise NotFoundError(
            f"Image {image_id} not found"
        )

    return True

Default Validator

from mda_common.worker import required

Example:

required_params = {
    "image_id": required,
}

Checks:

  • Value exists
  • String values are not empty

Validation Failures

The following exceptions are treated as non-retryable:

MissingParamError
InvalidParamError
NotFoundError
ValidationError

Behavior:

Reject Job
Publish FAILED event
Do Not Retry

Output Contracts

Each service declares expected outputs.

Example:

output_params = [
    "detection_run_id",
]

process() Return Value

process() must return a dictionary.

Example:

return {
    "detection_run_id": run_id,
}

Output Validation

Framework verifies:

set(returned_keys) == set(output_params)

Example:

output_params = [
    "detection_run_id",
]

Valid:

return {
    "detection_run_id": run_id,
}

Invalid:

return {
    "wrong_key": run_id,
}

Contract Update

After validation:

envelope["params"].update(
    contract_updates
)

The updated contract is forwarded to the next stage.


Retry Policy

Processing failures are retried.

Default:

max_retries = 2

Execution count:

Initial Attempt
Retry 1
Retry 2
Fail

Total attempts: 3


Retryable Failures

Examples:

Network outage
GPU OOM
Temporary API failure
Database timeout

Non-Retryable Failures

Examples:

Missing image_id
Invalid task_id
Referenced object does not exist

These are contract failures.

Retrying will not help.


TaskTracker

TaskTracker provides service progress reporting.


RabbitMQ Events

Durable events:

started
completed
failed
skipped

Published to:

job_events

Consumed by:

admin-backend

Redis Events

Non-durable events:

progress

Used for:

Live progress bars
Streaming logs
Realtime UI updates

Example

self.tracker.started()

self.tracker.progress(25)

self.tracker.progress(50)

self.tracker.completed(
    response={
        "detection_run_id": run_id,
    }
)

Queue Helpers

queue.py provides common RabbitMQ helpers.

Example:

from mda_common.queue import publish

Publish:

await publish(
    "detections",
    payload,
)

Consume:

await consume(
    "raw_images",
    callback,
)

Storage Helpers

storage.py provides a common S3-compatible interface.

Works with:

  • MinIO (local development)
  • AWS S3 (production)

Example:

from mda_common.storage import upload
from mda_common.storage import download

Upload:

upload(
    bucket,
    key,
    data,
)

Download:

data = download(
    bucket,
    key,
)

Generate URL:

url = presigned_url(
    bucket,
    key,
)

Service Development Guidelines

A service should:

  • Validate required inputs
  • Execute business logic
  • Store large outputs externally
  • Return only workflow references
  • Never manually forward jobs
  • Never manually publish status events

A service should not:

  • Implement retry logic
  • Implement queue routing
  • Manage RabbitMQ connections
  • Manage Redis connections
  • Duplicate framework functionality

Versioning

Semantic versioning is used.

Example:

0.1.0
0.2.0
0.3.0

Breaking changes to framework contracts require a version bump.

Examples:

  • Job contract changes
  • BaseWorker API changes
  • TaskTracker API changes

Design Philosophy

The framework exists so that service authors can focus exclusively on business logic.

A well-written service should primarily define:

required_params
output_params
process()

Everything else should be handled by mda-common.

About

Common framework for services in MDA.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages