diff --git a/conf/access.json b/conf/access.json index bb21ccc..e44a8f9 100644 --- a/conf/access.json +++ b/conf/access.json @@ -9,5 +9,6 @@ } }, "public.cps.za.dlchange": ["FooUser", "BarUser"], - "public.cps.za.test": ["TestUser"] + "public.cps.za.test": ["TestUser"], + "public.cps.za.status-change": ["TestUser"] } diff --git a/conf/topic_schemas/status_change.json b/conf/topic_schemas/status_change.json new file mode 100644 index 0000000..ee9fedc --- /dev/null +++ b/conf/topic_schemas/status_change.json @@ -0,0 +1,233 @@ +{ + "type": "object", + "properties": { + "event_type": { + "type": "string", + "enum": [ + "JobCreatedEvent", + "JobCreatedAndStartedEvent", + "JobStartedEvent", + "JobUpdatedEvent", + "JobFinishedEvent" + ], + "description": "Lifecycle event type for job status changes." + }, + "event_id": { + "type": "string", + "format": "uuid", + "description": "Unique identifier for the event (UUID)" + }, + "job_ref": { + "type": [ + "string", + "null" + ], + "description": "Identifier of the job in it's respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)." + }, + "tenant_id": { + "type": [ + "string", + "null" + ], + "description": "Application ID or ServiceNow identifier" + }, + "source_app": { + "type": [ + "string", + "null" + ], + "description": " Standardized source application name (aqueduct, unify, lum, etc)" + }, + "source_app_version": { + "type": [ + "string", + "null" + ], + "description": "Source application version (SemVer preferred)" + }, + "environment": { + "type": "string", + "description": "Environment (dev, uat, pre-prod, prod, test or others)" + }, + "timestamp_event": { + "type": "integer", + "minimum": 0, + "description": "Timestamp of the event in epoch milliseconds" + }, + "country": { + "type": [ + "string", + "null" + ], + "description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc." + }, + "job_id": { + "type": "string", + "format": "uuid", + "description": "Primary job identifier (UUID)." + }, + "parent_job_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Optional parent job identifier (UUID), to represent nested job hierarchies." + }, + "initial_job_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Optional initial job identifier (UUID), to represent retried or replayed jobs." + }, + "job_group_id": { + "type": [ + "string", + "null" + ], + "format": "uuid", + "description": "Job group identifier (UUID), may or may not reference a job id." + }, + "job_name": { + "type": [ + "string", + "null" + ], + "description": "Human-readable job name." + }, + "attempt_number": { + "type": [ + "integer", + "null" + ], + "minimum": 1, + "description": "Attempt number for this job." + }, + "platform": { + "type": [ + "string", + "null" + ], + "description": "Platform, e.g. aws.emr, aws.glue, aws.lambda." + }, + "platform_metadata": { + "type": [ + "object", + "null" + ], + "description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})." + }, + "input_arguments": { + "type": [ + "object", + "null" + ], + "description": "Arguments passed to the job." + }, + "definition_id": { + "type": [ + "string", + "null" + ], + "description": "Definition (Pipeline, Domain, Process) identifier." + }, + "definition_version": { + "type": [ + "string", + "null" + ], + "description": "Optional definition version." + }, + "status_type": { + "type": [ + "string", + "null" + ], + "enum": [ + "WAITING", + "RUNNING", + "SUCCEEDED", + "FAILED", + "KILLED", + null + ], + "description": "High-level status type for the current lifecycle event." + }, + "status_subtype": { + "type": [ + "string", + "null" + ], + "description": "Optional status subtype, e.g. NO_DATA or error code." + }, + "status_detail": { + "type": [ + "string", + "null" + ], + "description": "Optional human-readable status detail, e.g. short error message." + }, + "additional_context": { + "type": [ + "object", + "null" + ], + "description": "Additional context payload." + } + }, + "required": [ + "event_type", + "event_id", + "job_id", + "timestamp_event", + "environment" + ], + "allOf": [ + { + "if": { + "properties": { + "event_type": { + "enum": [ + "JobCreatedEvent", + "JobCreatedAndStartedEvent" + ] + } + } + }, + "then": { + "required": [ + "source_app", + "source_app_version", + "environment", + "platform", + "input_arguments" + ] + } + }, + { + "if": { + "properties": { + "event_type": { + "const": "JobFinishedEvent" + } + } + }, + "then": { + "required": [ + "status_type" + ], + "properties": { + "status_type": { + "enum": [ + "SUCCEEDED", + "FAILED", + "KILLED" + ] + } + } + } + } + ] +} diff --git a/src/handlers/handler_topic.py b/src/handlers/handler_topic.py index 5426805..6b8cd5d 100644 --- a/src/handlers/handler_topic.py +++ b/src/handlers/handler_topic.py @@ -29,7 +29,7 @@ from src.handlers.handler_token import HandlerToken from src.utils.conf_path import CONF_DIR from src.utils.config_loader import TopicAccessMap, load_access_config -from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST +from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST from src.utils.utils import build_error_response from src.writers.writer import WriteError, Writer @@ -75,6 +75,8 @@ def with_load_topic_schemas(self) -> "HandlerTopic": self.topics[TOPIC_DLCHANGE] = json.load(file) with open(os.path.join(topic_schemas_dir, "test.json"), "r", encoding="utf-8") as file: self.topics[TOPIC_TEST] = json.load(file) + with open(os.path.join(topic_schemas_dir, "status_change.json"), "r", encoding="utf-8") as file: + self.topics[TOPIC_STATUS_CHANGE] = json.load(file) logger.debug("Loaded topic schemas successfully.") return self diff --git a/src/utils/config_loader.py b/src/utils/config_loader.py index 673bcbf..f7c01b0 100644 --- a/src/utils/config_loader.py +++ b/src/utils/config_loader.py @@ -24,7 +24,7 @@ from boto3.resources.base import ServiceResource -from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST +from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST logger = logging.getLogger(__name__) @@ -150,6 +150,7 @@ def load_topic_names(conf_dir: str) -> list[str]: "runs.json": TOPIC_RUNS, "dlchange.json": TOPIC_DLCHANGE, "test.json": TOPIC_TEST, + "status_change.json": TOPIC_STATUS_CHANGE, } schemas_dir = os.path.join(conf_dir, "topic_schemas") topics: list[str] = [] diff --git a/src/utils/constants.py b/src/utils/constants.py index 425b376..0110dc8 100644 --- a/src/utils/constants.py +++ b/src/utils/constants.py @@ -37,6 +37,7 @@ TOPIC_RUNS = "public.cps.za.runs" TOPIC_DLCHANGE = "public.cps.za.dlchange" TOPIC_TEST = "public.cps.za.test" +TOPIC_STATUS_CHANGE = "public.cps.za.status-change" SUPPORTED_WRITE_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS, TOPIC_DLCHANGE, TOPIC_TEST}) SUPPORTED_STATS_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS}) diff --git a/tests/unit/handlers/test_handler_topic.py b/tests/unit/handlers/test_handler_topic.py index fc4a5b7..8fa292c 100644 --- a/tests/unit/handlers/test_handler_topic.py +++ b/tests/unit/handlers/test_handler_topic.py @@ -87,6 +87,7 @@ def test_load_topic_schemas_success(): "runs.json": {"type": "object", "properties": {"run_id": {"type": "string"}}}, "dlchange.json": {"type": "object", "properties": {"change_id": {"type": "string"}}}, "test.json": {"type": "object", "properties": {"event_id": {"type": "string"}}}, + "status_change.json": {"type": "object", "properties": {"execution_id": {"type": "string"}}}, } def mock_open_side_effect(file_path, *_args, **_kwargs): @@ -99,10 +100,11 @@ def mock_open_side_effect(file_path, *_args, **_kwargs): result = handler.with_load_topic_schemas() assert result is handler - assert 3 == len(handler.topics) + assert 4 == len(handler.topics) assert "public.cps.za.runs" in handler.topics assert "public.cps.za.dlchange" in handler.topics assert "public.cps.za.test" in handler.topics + assert "public.cps.za.status-change" in handler.topics ## get_topics_list() @@ -112,6 +114,7 @@ def test_get_topics(event_gate_module, make_event): assert 200 == resp["statusCode"] body = json.loads(resp["body"]) assert "public.cps.za.test" in body + assert "public.cps.za.status-change" in body ## get_topic_schema() diff --git a/tests/unit/utils/test_config_loader.py b/tests/unit/utils/test_config_loader.py index 82301f7..3bc2979 100644 --- a/tests/unit/utils/test_config_loader.py +++ b/tests/unit/utils/test_config_loader.py @@ -46,6 +46,7 @@ def conf_dir(tmp_path: Path) -> str: (schemas_dir / "runs.json").write_text('{"type": "object"}', encoding="utf-8") (schemas_dir / "dlchange.json").write_text('{"type": "object"}', encoding="utf-8") (schemas_dir / "test.json").write_text('{"type": "object"}', encoding="utf-8") + (schemas_dir / "status_change.json").write_text('{"type": "object"}', encoding="utf-8") return str(tmp_path) @@ -101,13 +102,14 @@ class TestLoadTopicNames: """Tests for load_topic_names().""" def test_discovers_all_topics(self, conf_dir: str) -> None: - """Test that all three topics are discovered from schema files.""" + """Test that all configured topics are discovered from schema files.""" result = load_topic_names(conf_dir) - assert 3 == len(result) + assert 4 == len(result) assert "public.cps.za.runs" in result assert "public.cps.za.dlchange" in result assert "public.cps.za.test" in result + assert "public.cps.za.status-change" in result def test_missing_schema_file_excluded(self, conf_dir: str) -> None: """Test that a missing schema file excludes that topic.""" @@ -115,7 +117,7 @@ def test_missing_schema_file_excluded(self, conf_dir: str) -> None: result = load_topic_names(conf_dir) - assert 2 == len(result) + assert 3 == len(result) assert "public.cps.za.test" not in result def test_empty_schemas_dir(self, tmp_path: Path) -> None: