From 717fddb7d5dc909860dcbbdf48adc7b60f8f01e8 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 13 May 2026 14:57:58 +0200 Subject: [PATCH 1/5] First version --- conf/topic_schemas/status_change.json | 141 ++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 conf/topic_schemas/status_change.json diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json new file mode 100644 index 0000000..cfbec06 --- /dev/null +++ b/conf/topic_schemas/status_change.json @@ -0,0 +1,141 @@ +{ + "type": "object", + "properties": { + "event_id": { + "type": "string", + "description": "Unique identifier for the event (GUID)" + }, + "tenant_id": { + "type": "string", + "description": "Application ID or ServiceNow identifier" + }, + "source_app": { + "type": "string", + "description": " Standardized source application name (aqueduct, unify, lum, etc)" + }, + "source_app_version": { + "type": "string", + "description": "Source application version (SemVer preferred)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp_event": { + "type": "number", + "description": "Timestamp of the event in epoch milliseconds" + }, + "country": { + "type": "string", + "description": "The country the data is related to." + }, + "execution_id": { + "type": "string", + "format": "uuid", + "description": "Primary execution identifier (UUID)." + }, + "execution_parent_id": { + "type": [ + "string", + "null" + ], + "description": "Optional parent execution identifier (UUID)." + }, + "execution_group_id": { + "type": "string", + "format": "uuid", + "description": "Execution group identifier (UUID), may reference an execution id." + }, + "attempt_number": { + "type": "integer", + "minimum": 1, + "description": "Attempt number for this execution." + }, + "platform": { + "type": "string", + "description": "Platform, e.g. aws.emr, aws.glue, aws.lambda." + }, + "platform_metadata": { + "type": "object", + "description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})." + }, + "input_arguments": { + "type": "object", + "description": "Arguments passed to the job." + }, + "definition_id": { + "type": "string", + "description": "Definition (Pipeline, Domain, Process) identifier." + }, + "definition_version": { + "type": [ + "string", + "null" + ], + "description": "Optional definition version." + }, + "definition_name": { + "type": "string", + "description": "Human-readable definition name." + }, + "status_type": { + "type": "string", + "enum": [ + "WAITING", + "RUNNING", + "SUCCEEDED", + "FAILED" + ], + "description": "High-level status type." + }, + "status_subtype": { + "type": [ + "string", + "null" + ], + "description": "Optional status subtype, e.g. NO_DATA or error code." + }, + "status_detail": { + "type": [ + "string", + "null" + ], + "description": "Optional human-readable status detail." + }, + "timestamp_created_at": { + "type": "number", + "description": "Timestamp when an execution was created in epoch milliseconds" + }, + "timestamp_started_at": { + "type": "number", + "description": "Timestamp when an execution was started in epoch milliseconds" + }, + "timestamp_finished_at": { + "type": "number", + "description": "Timestamp when an execution was finished in epoch milliseconds" + }, + "timestamp_last_updated_at": { + "type": "number", + "description": "Timestamp when an execution status was last updated in epoch milliseconds." + }, + "additional_context": { + "type": "object", + "description": "Additional context payload." + } + }, + "required": [ + "execution_id", + "execution_group_id", + "attempt_number", + "source_app", + "source_app_version", + "environment", + "platform", + "definition_id", + "definition_name", + "status_type", + "created_at", + "started_at", + "last_updated_at" + ] +} \ No newline at end of file From f607b5882d87c5a0c3a701e32c8b8d19187311c8 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 13 May 2026 16:05:39 +0200 Subject: [PATCH 2/5] Add status_change schema --- conf/access.json | 3 +- conf/topic_schemas/status_change.json | 112 +++++++++++++--------- src/handlers/handler_topic.py | 4 +- src/utils/config_loader.py | 3 +- src/utils/constants.py | 1 + tests/unit/handlers/test_handler_topic.py | 5 +- tests/unit/utils/test_config_loader.py | 8 +- 7 files changed, 84 insertions(+), 52 deletions(-) diff --git a/conf/access.json b/conf/access.json index bb21ccc..e44a8f9 100644 --- a/conf/access.json +++ b/conf/access.json @@ -9,5 +9,6 @@ } }, "public.cps.za.dlchange": ["FooUser", "BarUser"], - "public.cps.za.test": ["TestUser"] + "public.cps.za.test": ["TestUser"], + "public.cps.za.status-change": ["TestUser"] } diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index cfbec06..8b5150f 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -1,9 +1,20 @@ { "type": "object", "properties": { + "event_type": { + "type": "string", + "enum": [ + "ExecutionCreatedEvent", + "ExecutionStartedEvent", + "ExecutionUpdatedEvent", + "ExecutionFinishedEvent" + ], + "description": "Lifecycle event type for execution status changes." + }, "event_id": { "type": "string", - "description": "Unique identifier for the event (GUID)" + "format": "uuid", + "description": "Unique identifier for the event (UUID)" }, "tenant_id": { "type": "string", @@ -27,7 +38,7 @@ }, "country": { "type": "string", - "description": "The country the data is related to." + "description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc." }, "execution_id": { "type": "string", @@ -39,6 +50,7 @@ "string", "null" ], + "format": "uuid", "description": "Optional parent execution identifier (UUID)." }, "execution_group_id": { @@ -68,10 +80,7 @@ "description": "Definition (Pipeline, Domain, Process) identifier." }, "definition_version": { - "type": [ - "string", - "null" - ], + "type": "string", "description": "Optional definition version." }, "definition_name": { @@ -84,39 +93,18 @@ "WAITING", "RUNNING", "SUCCEEDED", - "FAILED" + "FAILED", + "KILLED" ], - "description": "High-level status type." + "description": "High-level status type for the current lifecycle event." }, "status_subtype": { - "type": [ - "string", - "null" - ], + "type": "string", "description": "Optional status subtype, e.g. NO_DATA or error code." }, "status_detail": { - "type": [ - "string", - "null" - ], - "description": "Optional human-readable status detail." - }, - "timestamp_created_at": { - "type": "number", - "description": "Timestamp when an execution was created in epoch milliseconds" - }, - "timestamp_started_at": { - "type": "number", - "description": "Timestamp when an execution was started in epoch milliseconds" - }, - "timestamp_finished_at": { - "type": "number", - "description": "Timestamp when an execution was finished in epoch milliseconds" - }, - "timestamp_last_updated_at": { - "type": "number", - "description": "Timestamp when an execution status was last updated in epoch milliseconds." + "type": "string", + "description": "Optional human-readable status detail, e.g. short error message." }, "additional_context": { "type": "object", @@ -124,18 +112,52 @@ } }, "required": [ + "event_type", + "event_id", "execution_id", - "execution_group_id", - "attempt_number", - "source_app", - "source_app_version", - "environment", - "platform", - "definition_id", - "definition_name", - "status_type", - "created_at", - "started_at", - "last_updated_at" + "timestamp_event" + ], + "allOf": [ + { + "if": { + "properties": { + "event_type": { + "const": "ExecutionCreatedEvent" + } + } + }, + "then": { + "required": [ + "source_app", + "source_app_version", + "environment", + "platform", + "input_arguments" + ] + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "ExecutionFinishedEvent" + } + } + }, + "then": { + "required": [ + "status_type" + ], + "properties": { + "status_type": { + "enum": [ + "SUCCEEDED", + "FAILED", + "KILLED" + ] + } + } + } + } ] } \ No newline at end of file diff --git a/src/handlers/handler_topic.py b/src/handlers/handler_topic.py index 5426805..6b8cd5d 100644 --- a/src/handlers/handler_topic.py +++ b/src/handlers/handler_topic.py @@ -29,7 +29,7 @@ from src.handlers.handler_token import HandlerToken from src.utils.conf_path import CONF_DIR from src.utils.config_loader import TopicAccessMap, load_access_config -from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST +from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST from src.utils.utils import build_error_response from src.writers.writer import WriteError, Writer @@ -75,6 +75,8 @@ def with_load_topic_schemas(self) -> "HandlerTopic": self.topics[TOPIC_DLCHANGE] = json.load(file) with open(os.path.join(topic_schemas_dir, "test.json"), "r", encoding="utf-8") as file: self.topics[TOPIC_TEST] = json.load(file) + with open(os.path.join(topic_schemas_dir, "status_change.json"), "r", encoding="utf-8") as file: + self.topics[TOPIC_STATUS_CHANGE] = json.load(file) logger.debug("Loaded topic schemas successfully.") return self diff --git a/src/utils/config_loader.py b/src/utils/config_loader.py index 673bcbf..f7c01b0 100644 --- a/src/utils/config_loader.py +++ b/src/utils/config_loader.py @@ -24,7 +24,7 @@ from boto3.resources.base import ServiceResource -from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST +from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST logger = logging.getLogger(__name__) @@ -150,6 +150,7 @@ def load_topic_names(conf_dir: str) -> list[str]: "runs.json": TOPIC_RUNS, "dlchange.json": TOPIC_DLCHANGE, "test.json": TOPIC_TEST, + "status_change.json": TOPIC_STATUS_CHANGE, } schemas_dir = os.path.join(conf_dir, "topic_schemas") topics: list[str] = [] diff --git a/src/utils/constants.py b/src/utils/constants.py index 425b376..0110dc8 100644 --- a/src/utils/constants.py +++ b/src/utils/constants.py @@ -37,6 +37,7 @@ TOPIC_RUNS = "public.cps.za.runs" TOPIC_DLCHANGE = "public.cps.za.dlchange" TOPIC_TEST = "public.cps.za.test" +TOPIC_STATUS_CHANGE = "public.cps.za.status-change" SUPPORTED_WRITE_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS, TOPIC_DLCHANGE, TOPIC_TEST}) SUPPORTED_STATS_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS}) diff --git a/tests/unit/handlers/test_handler_topic.py b/tests/unit/handlers/test_handler_topic.py index fc4a5b7..8fa292c 100644 --- a/tests/unit/handlers/test_handler_topic.py +++ b/tests/unit/handlers/test_handler_topic.py @@ -87,6 +87,7 @@ def test_load_topic_schemas_success(): "runs.json": {"type": "object", "properties": {"run_id": {"type": "string"}}}, "dlchange.json": {"type": "object", "properties": {"change_id": {"type": "string"}}}, "test.json": {"type": "object", "properties": {"event_id": {"type": "string"}}}, + "status_change.json": {"type": "object", "properties": {"execution_id": {"type": "string"}}}, } def mock_open_side_effect(file_path, *_args, **_kwargs): @@ -99,10 +100,11 @@ def mock_open_side_effect(file_path, *_args, **_kwargs): result = handler.with_load_topic_schemas() assert result is handler - assert 3 == len(handler.topics) + assert 4 == len(handler.topics) assert "public.cps.za.runs" in handler.topics assert "public.cps.za.dlchange" in handler.topics assert "public.cps.za.test" in handler.topics + assert "public.cps.za.status-change" in handler.topics ## get_topics_list() @@ -112,6 +114,7 @@ def test_get_topics(event_gate_module, make_event): assert 200 == resp["statusCode"] body = json.loads(resp["body"]) assert "public.cps.za.test" in body + assert "public.cps.za.status-change" in body ## get_topic_schema() diff --git a/tests/unit/utils/test_config_loader.py b/tests/unit/utils/test_config_loader.py index 82301f7..3bc2979 100644 --- a/tests/unit/utils/test_config_loader.py +++ b/tests/unit/utils/test_config_loader.py @@ -46,6 +46,7 @@ def conf_dir(tmp_path: Path) -> str: (schemas_dir / "runs.json").write_text('{"type": "object"}', encoding="utf-8") (schemas_dir / "dlchange.json").write_text('{"type": "object"}', encoding="utf-8") (schemas_dir / "test.json").write_text('{"type": "object"}', encoding="utf-8") + (schemas_dir / "status_change.json").write_text('{"type": "object"}', encoding="utf-8") return str(tmp_path) @@ -101,13 +102,14 @@ class TestLoadTopicNames: """Tests for load_topic_names().""" def test_discovers_all_topics(self, conf_dir: str) -> None: - """Test that all three topics are discovered from schema files.""" + """Test that all configured topics are discovered from schema files.""" result = load_topic_names(conf_dir) - assert 3 == len(result) + assert 4 == len(result) assert "public.cps.za.runs" in result assert "public.cps.za.dlchange" in result assert "public.cps.za.test" in result + assert "public.cps.za.status-change" in result def test_missing_schema_file_excluded(self, conf_dir: str) -> None: """Test that a missing schema file excludes that topic.""" @@ -115,7 +117,7 @@ def test_missing_schema_file_excluded(self, conf_dir: str) -> None: result = load_topic_names(conf_dir) - assert 2 == len(result) + assert 3 == len(result) assert "public.cps.za.test" not in result def test_empty_schemas_dir(self, tmp_path: Path) -> None: From 2093de907213d6688196d0a62b83592359e14268 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 20 May 2026 15:29:35 +0200 Subject: [PATCH 3/5] wip --- conf/topic_schemas/status_change.json | 141 +++++++++++++++++++------- 1 file changed, 105 insertions(+), 36 deletions(-) diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index 8b5150f..ef4f8d3 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -4,28 +4,45 @@ "event_type": { "type": "string", "enum": [ - "ExecutionCreatedEvent", - "ExecutionStartedEvent", - "ExecutionUpdatedEvent", - "ExecutionFinishedEvent" + "JobCreatedEvent", + "JobCreatedAndStartedEvent", + "JobStartedEvent", + "JobUpdatedEvent", + "JobFinishedEvent" ], - "description": "Lifecycle event type for execution status changes." + "description": "Lifecycle event type for job status changes." }, "event_id": { "type": "string", "format": "uuid", "description": "Unique identifier for the event (UUID)" }, + "job_ref": { + "type": [ + "string", + "null" + ], + "description": "Identifier of the job in it's respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." + }, "tenant_id": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Application ID or ServiceNow identifier" }, "source_app": { - "type": "string", + "type": [ + "string", + "null" + ], "description": " Standardized source application name (aqueduct, unify, lum, etc)" }, "source_app_version": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Source application version (SemVer preferred)" }, "environment": { @@ -37,92 +54,144 @@ "description": "Timestamp of the event in epoch milliseconds" }, "country": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc." }, - "execution_id": { + "job_id": { "type": "string", "format": "uuid", - "description": "Primary execution identifier (UUID)." + "description": "Primary job identifier (UUID)." }, - "execution_parent_id": { + "parent_job_id": { "type": [ "string", "null" ], "format": "uuid", - "description": "Optional parent execution identifier (UUID)." + "description": "Optional parent job identifier (UUID), to represent nested job hierarchies." }, - "execution_group_id": { - "type": "string", + "initial_job_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Optional initial job identifier (UUID), to represent retried or replayed jobs." + }, + "job_group_id": { + "type": [ + "string", + "null" + ], "format": "uuid", - "description": "Execution group identifier (UUID), may reference an execution id." + "description": "Job group identifier (UUID), may or may not reference a job id." + }, + "job_name": { + "type": [ + "string", + "null" + ], + "description": "Human-readable job name." }, "attempt_number": { - "type": "integer", + "type": [ + "integer", + "null" + ], "minimum": 1, - "description": "Attempt number for this execution." + "description": "Attempt number for this job." }, "platform": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Platform, e.g. aws.emr, aws.glue, aws.lambda." }, "platform_metadata": { - "type": "object", + "type": [ + "object", + "null" + ], "description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})." }, "input_arguments": { - "type": "object", + "type": [ + "object", + "null" + ], "description": "Arguments passed to the job." }, "definition_id": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Definition (Pipeline, Domain, Process) identifier." }, "definition_version": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Optional definition version." }, - "definition_name": { - "type": "string", - "description": "Human-readable definition name." - }, "status_type": { - "type": "string", + "type": [ + "string", + "null" + ], "enum": [ "WAITING", "RUNNING", "SUCCEEDED", "FAILED", - "KILLED" + "KILLED", + null ], "description": "High-level status type for the current lifecycle event." }, "status_subtype": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Optional status subtype, e.g. NO_DATA or error code." }, "status_detail": { - "type": "string", + "type": [ + "string", + "null" + ], "description": "Optional human-readable status detail, e.g. short error message." }, "additional_context": { - "type": "object", + "type": [ + "object", + "null" + ], "description": "Additional context payload." } }, "required": [ "event_type", "event_id", - "execution_id", - "timestamp_event" + "job_id", + "timestamp_event", + "environment" ], "allOf": [ { "if": { "properties": { "event_type": { - "const": "ExecutionCreatedEvent" + "enum": [ + "JobCreatedEvent", + "JobCreatedAndStartedEvent" + ] } } }, @@ -140,7 +209,7 @@ "if": { "properties": { "event_type": { - "const": "ExecutionFinishedEvent" + "const": "JobFinishedEvent" } } }, From a6dc8b4e4b397f98be999aecf97083faca4e3178 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 20 May 2026 15:40:25 +0200 Subject: [PATCH 4/5] newline --- conf/topic_schemas/status_change.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index ef4f8d3..3f9b283 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -229,4 +229,4 @@ } } ] -} \ No newline at end of file +} From 0c8deb23ba046ce665342c7ab422a4b5beab4a8f Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 20 May 2026 16:49:55 +0200 Subject: [PATCH 5/5] Update conf/topic_schemas/status_change.json Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- conf/topic_schemas/status_change.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json index 3f9b283..ee9fedc 100644 --- a/conf/topic_schemas/status_change.json +++ b/conf/topic_schemas/status_change.json @@ -50,7 +50,8 @@ "description": "Environment (dev, uat, pre-prod, prod, test or others)" }, "timestamp_event": { - "type": "number", + "type": "integer", + "minimum": 0, "description": "Timestamp of the event in epoch milliseconds" }, "country": {