Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion conf/access.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
}
},
"public.cps.za.dlchange": ["FooUser", "BarUser"],
"public.cps.za.test": ["TestUser"]
"public.cps.za.test": ["TestUser"],
"public.cps.za.status-change": ["TestUser"]
}
233 changes: 233 additions & 0 deletions conf/topic_schemas/status_change.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
{
"type": "object",
"properties": {
"event_type": {
"type": "string",
"enum": [
"JobCreatedEvent",
"JobCreatedAndStartedEvent",
"JobStartedEvent",
"JobUpdatedEvent",
"JobFinishedEvent"
],
"description": "Lifecycle event type for job status changes."
},
"event_id": {
"type": "string",
"format": "uuid",
"description": "Unique identifier for the event (UUID)"
},
"job_ref": {
"type": [
"string",
"null"
],
"description": "Identifier of the job in it's respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)."
},
"tenant_id": {
"type": [
"string",
"null"
],
"description": "Application ID or ServiceNow identifier"
},
Comment on lines +20 to +33
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Schema contract does not enforce the required status-transition payload.

The objective says status_change must include an application identifier plus previous and new state. In this schema, tenant_id is optional, and there are no explicit previous_state / new_state fields, so producers can publish events that don’t carry a real transition.

Also applies to: 142-156, 179-185

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@conf/topic_schemas/status_change.json` around lines 20 - 33, The schema
currently treats tenant_id as optional and lacks explicit
previous_state/new_state fields; update status_change.json to make the
application identifier required (mark "tenant_id" as required in the root
"required" array or otherwise add a required "application_id" equivalent), add
"previous_state" and "new_state" properties (type "string", non-nullable) to the
schema, and include those two fields in the schema's "required" array so every
status_change event must carry tenant_id plus previous_state and new_state;
apply the same changes to the other schema blocks noted (lines referenced in the
review).

"source_app": {
"type": [
"string",
"null"
],
"description": " Standardized source application name (aqueduct, unify, lum, etc)"
},
"source_app_version": {
"type": [
"string",
"null"
],
"description": "Source application version (SemVer preferred)"
},
"environment": {
"type": "string",
"description": "Environment (dev, uat, pre-prod, prod, test or others)"
},
"timestamp_event": {
"type": "integer",
"minimum": 0,
"description": "Timestamp of the event in epoch milliseconds"
},
Comment thread
kevinwallimann marked this conversation as resolved.
"country": {
"type": [
"string",
"null"
],
"description": "The country the data is related to, e.g. za, ke, on-mu, nbc-tz, etc."
},
"job_id": {
"type": "string",
"format": "uuid",
"description": "Primary job identifier (UUID)."
},
"parent_job_id": {
"type": [
"string",
"null"
],
"format": "uuid",
"description": "Optional parent job identifier (UUID), to represent nested job hierarchies."
},
"initial_job_id": {
"type": [
"string",
"null"
],
"format": "uuid",
"description": "Optional initial job identifier (UUID), to represent retried or replayed jobs."
},
"job_group_id": {
"type": [
"string",
"null"
],
"format": "uuid",
"description": "Job group identifier (UUID), may or may not reference a job id."
},
"job_name": {
"type": [
"string",
"null"
],
"description": "Human-readable job name."
},
"attempt_number": {
"type": [
"integer",
"null"
],
"minimum": 1,
"description": "Attempt number for this job."
},
"platform": {
"type": [
"string",
"null"
],
"description": "Platform, e.g. aws.emr, aws.glue, aws.lambda."
},
"platform_metadata": {
"type": [
"object",
"null"
],
"description": "Platform-specific metadata (e.g. {\"cluster_id\": \"j-...\"})."
},
"input_arguments": {
"type": [
"object",
"null"
],
"description": "Arguments passed to the job."
},
"definition_id": {
"type": [
"string",
"null"
],
"description": "Definition (Pipeline, Domain, Process) identifier."
},
"definition_version": {
"type": [
"string",
"null"
],
"description": "Optional definition version."
},
"status_type": {
"type": [
"string",
"null"
],
"enum": [
"WAITING",
"RUNNING",
"SUCCEEDED",
"FAILED",
"KILLED",
null
],
"description": "High-level status type for the current lifecycle event."
},
"status_subtype": {
"type": [
"string",
"null"
],
"description": "Optional status subtype, e.g. NO_DATA or error code."
},
"status_detail": {
"type": [
"string",
"null"
],
"description": "Optional human-readable status detail, e.g. short error message."
},
"additional_context": {
"type": [
"object",
"null"
],
"description": "Additional context payload."
}
},
"required": [
"event_type",
"event_id",
"job_id",
"timestamp_event",
"environment"
],
"allOf": [
{
"if": {
"properties": {
"event_type": {
"enum": [
"JobCreatedEvent",
"JobCreatedAndStartedEvent"
]
}
}
},
"then": {
"required": [
"source_app",
"source_app_version",
"environment",
"platform",
"input_arguments"
]
}
},
{
"if": {
"properties": {
"event_type": {
"const": "JobFinishedEvent"
}
}
},
"then": {
"required": [
"status_type"
],
"properties": {
"status_type": {
"enum": [
"SUCCEEDED",
"FAILED",
"KILLED"
]
}
}
}
}
]
}
4 changes: 3 additions & 1 deletion src/handlers/handler_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from src.handlers.handler_token import HandlerToken
from src.utils.conf_path import CONF_DIR
from src.utils.config_loader import TopicAccessMap, load_access_config
from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST
from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST
from src.utils.utils import build_error_response
from src.writers.writer import WriteError, Writer

Expand Down Expand Up @@ -75,6 +75,8 @@ def with_load_topic_schemas(self) -> "HandlerTopic":
self.topics[TOPIC_DLCHANGE] = json.load(file)
with open(os.path.join(topic_schemas_dir, "test.json"), "r", encoding="utf-8") as file:
self.topics[TOPIC_TEST] = json.load(file)
with open(os.path.join(topic_schemas_dir, "status_change.json"), "r", encoding="utf-8") as file:
self.topics[TOPIC_STATUS_CHANGE] = json.load(file)

logger.debug("Loaded topic schemas successfully.")
return self
Expand Down
3 changes: 2 additions & 1 deletion src/utils/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from boto3.resources.base import ServiceResource

from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST
from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_STATUS_CHANGE, TOPIC_TEST

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -150,6 +150,7 @@ def load_topic_names(conf_dir: str) -> list[str]:
"runs.json": TOPIC_RUNS,
"dlchange.json": TOPIC_DLCHANGE,
"test.json": TOPIC_TEST,
"status_change.json": TOPIC_STATUS_CHANGE,
}
schemas_dir = os.path.join(conf_dir, "topic_schemas")
topics: list[str] = []
Expand Down
1 change: 1 addition & 0 deletions src/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
TOPIC_RUNS = "public.cps.za.runs"
TOPIC_DLCHANGE = "public.cps.za.dlchange"
TOPIC_TEST = "public.cps.za.test"
TOPIC_STATUS_CHANGE = "public.cps.za.status-change"

SUPPORTED_WRITE_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS, TOPIC_DLCHANGE, TOPIC_TEST})
SUPPORTED_STATS_TOPICS: frozenset[str] = frozenset({TOPIC_RUNS})
5 changes: 4 additions & 1 deletion tests/unit/handlers/test_handler_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def test_load_topic_schemas_success():
"runs.json": {"type": "object", "properties": {"run_id": {"type": "string"}}},
"dlchange.json": {"type": "object", "properties": {"change_id": {"type": "string"}}},
"test.json": {"type": "object", "properties": {"event_id": {"type": "string"}}},
"status_change.json": {"type": "object", "properties": {"execution_id": {"type": "string"}}},
}

def mock_open_side_effect(file_path, *_args, **_kwargs):
Expand All @@ -99,10 +100,11 @@ def mock_open_side_effect(file_path, *_args, **_kwargs):
result = handler.with_load_topic_schemas()

assert result is handler
assert 3 == len(handler.topics)
assert 4 == len(handler.topics)
assert "public.cps.za.runs" in handler.topics
assert "public.cps.za.dlchange" in handler.topics
assert "public.cps.za.test" in handler.topics
assert "public.cps.za.status-change" in handler.topics


## get_topics_list()
Expand All @@ -112,6 +114,7 @@ def test_get_topics(event_gate_module, make_event):
assert 200 == resp["statusCode"]
body = json.loads(resp["body"])
assert "public.cps.za.test" in body
assert "public.cps.za.status-change" in body


## get_topic_schema()
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/utils/test_config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def conf_dir(tmp_path: Path) -> str:
(schemas_dir / "runs.json").write_text('{"type": "object"}', encoding="utf-8")
(schemas_dir / "dlchange.json").write_text('{"type": "object"}', encoding="utf-8")
(schemas_dir / "test.json").write_text('{"type": "object"}', encoding="utf-8")
(schemas_dir / "status_change.json").write_text('{"type": "object"}', encoding="utf-8")

return str(tmp_path)

Expand Down Expand Up @@ -101,21 +102,22 @@ class TestLoadTopicNames:
"""Tests for load_topic_names()."""

def test_discovers_all_topics(self, conf_dir: str) -> None:
"""Test that all three topics are discovered from schema files."""
"""Test that all configured topics are discovered from schema files."""
result = load_topic_names(conf_dir)

assert 3 == len(result)
assert 4 == len(result)
assert "public.cps.za.runs" in result
assert "public.cps.za.dlchange" in result
assert "public.cps.za.test" in result
assert "public.cps.za.status-change" in result

def test_missing_schema_file_excluded(self, conf_dir: str) -> None:
"""Test that a missing schema file excludes that topic."""
os.remove(os.path.join(conf_dir, "topic_schemas", "test.json"))

result = load_topic_names(conf_dir)

assert 2 == len(result)
assert 3 == len(result)
assert "public.cps.za.test" not in result

def test_empty_schemas_dir(self, tmp_path: Path) -> None:
Expand Down