From cc938e895b816e77df59138211e88f5d2daa29bf Mon Sep 17 00:00:00 2001 From: Jan Inge Bergseth <31886431+BergsethCognite@users.noreply.github.com> Date: Fri, 29 May 2026 08:52:54 +0200 Subject: [PATCH 1/4] removed unused local variable --- .../fn_dm_context_files_LOC_SOURCE_annotation/pipeline.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/pipeline.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/pipeline.py index 371ece66..f7dd48ba 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/pipeline.py +++ b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/pipeline.py @@ -430,10 +430,8 @@ def get_new_files( file_cursor = None num_retry += 1 if num_retry > 3: - retry = False raise Exception(msg) from e else: - retry = False raise Exception(msg) from e @@ -536,7 +534,6 @@ def run_diagram_detect( except Exception as e: num_retry += 1 if num_retry > 3: - retry = False if len(file_ids) > 1: raise Exception(f"Batch diagram detect batch failed on {len(file_ids)} files - rerunning with one file at a time") else: @@ -601,7 +598,6 @@ def push_result_to_annotations( except Exception as e: num_retry += 1 if num_retry > 3: - retry = False msg = f"Annotations add/update of: {len(edge_applies)} failed, error: {e.message}" logger.error(msg) raise Exception(msg) from e From ce333e5cfe5615372fb094879b6f5355d8351223 Mon Sep 17 00:00:00 2001 From: Jan Inge Bergseth <31886431+BergsethCognite@users.noreply.github.com> Date: Fri, 29 May 2026 12:27:21 +0200 Subject: [PATCH 2/4] cleaning up code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug fixes Outer except no longer raises a literal string "msg"; preserves message and chains original exception. push_result_to_annotations now always returns int (was sometimes a tuple); a malformed result item no longer aborts the whole batch. File-link entities now stored under the canonical search_property key, so file-to-file matches actually work. Cross-view entities normalised to the file view's search_property so the diagram-detect search_field finds them. Per-batch error_count / annotated_count arithmetic corrected (was over-counting whole pages). ANNOTATE_BATCH_SIZE no longer mutated as a module-level global; per-invocation local that can't leak across warm-container calls. Cursor preserved on transient 400 in get_new_files; persistent 400 fails loudly instead of silently restarting from scratch. read_state_store split into typed read_state_cursor / read_state_batch_num with safe coercion. Hardening Narrowed except Exception to CogniteAPIError where .code / .message is read. Bounded exponential backoff added to run_diagram_detect, get_new_files, and the apply retry loop. Narrowed broad except in _detect_annotation_to_edge_applies to parse-shape errors only. daemon=True on the Mixpanel usage-tracker thread. Cleanup / hygiene Dead sys.path.append and unused sys / Path imports removed. @dataclass on Enum removed. Optional hints fixed (str | None = None) for debug_file, filter_property, filter_values. Orphan pares_direct_relation typo'd classmethod deleted. cognite.client.utils._text.shorten (private API) replaced with a local _truncate helper. RawUploadQueue now lazy-imported (test-friendly; no behavior change). from __future__ import annotations added. list[dict[str, any]] → list[dict[str, Any]] everywhere. Entities de-duplicated on (space, external_id) to prevent duplicate edges. Per-file delete_annotations_for_file replaced with a single batched delete_annotations_for_files per page. O(n²) get_file_source_id replaced with a one-shot (space, external_id) → sourceId index. create_table calls hoisted to a single setup step instead of running every batch. Per-file INFO log demoted to DEBUG. Missing ANNOTATE_BATCH_SIZE import added (latent NameError when debug=False). Various typos fixed ("For for view", "don't contains", "status an logged status", "tag PID" docstrings). Tests (new) test_pipeline.py — 26 unit tests covering helpers, state coercion, get_all_entities, get_new_files retries, push_result_to_annotations. test_config.py — 21 unit tests covering Optional defaults, required fields, threshold validation, Literal enforcement. All 47 tests run locally with just pytest cognite-sdk pyyaml pydantic (no cognite-extractor-utils needed). Rename / module alignment (related, outside the function dir) Function directory renamed to fn_dm_context_files_annotation; functions.Function.yaml externalId made static. Extraction pipeline externalId aligned to static ep_ctx_files_pandid_annotation (both ExtractionPipeline.yaml and config.yaml). Workflow task references updated to the new function/extraction-pipeline ids. handler.py::run_locally uses the new extraction-pipeline external id. README updated: stale ids replaced, dataset/RAW-DB confusion fixed, "Running functions locally" rewritten, new "Testing" section added. --- .../cdf_p_and_id_annotation/README.md | 153 ++++- ...-pandid_annotation.ExtractionPipeline.yaml | 8 +- .../ctx_files-pandid_annotation.config.yaml | 10 +- .../__init__.py | 0 .../config.py | 12 +- .../constants.py | 0 .../handler.py | 4 +- .../logger.py | 0 .../pipeline.py | 586 +++++++++++------- .../requirements.txt | 0 .../test_config.py | 181 ++++++ .../test_pipeline.py | 526 ++++++++++++++++ .../functions/functions.Function.yaml | 4 +- .../workflows/annotation.WorkflowVersion.yaml | 6 +- 14 files changed, 1224 insertions(+), 266 deletions(-) rename modules/contextualization/cdf_p_and_id_annotation/functions/{fn_dm_context_files_LOC_SOURCE_annotation => fn_dm_context_files_annotation}/__init__.py (100%) rename modules/contextualization/cdf_p_and_id_annotation/functions/{fn_dm_context_files_LOC_SOURCE_annotation => fn_dm_context_files_annotation}/config.py (90%) rename modules/contextualization/cdf_p_and_id_annotation/functions/{fn_dm_context_files_LOC_SOURCE_annotation => fn_dm_context_files_annotation}/constants.py (100%) rename modules/contextualization/cdf_p_and_id_annotation/functions/{fn_dm_context_files_LOC_SOURCE_annotation => fn_dm_context_files_annotation}/handler.py (95%) rename modules/contextualization/cdf_p_and_id_annotation/functions/{fn_dm_context_files_LOC_SOURCE_annotation => fn_dm_context_files_annotation}/logger.py (100%) rename modules/contextualization/cdf_p_and_id_annotation/functions/{fn_dm_context_files_LOC_SOURCE_annotation => fn_dm_context_files_annotation}/pipeline.py (58%) rename modules/contextualization/cdf_p_and_id_annotation/functions/{fn_dm_context_files_LOC_SOURCE_annotation => fn_dm_context_files_annotation}/requirements.txt (100%) create mode 100644 modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_config.py create mode 100644 modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py diff --git a/modules/contextualization/cdf_p_and_id_annotation/README.md b/modules/contextualization/cdf_p_and_id_annotation/README.md index 22effcd7..35db441a 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/README.md +++ b/modules/contextualization/cdf_p_and_id_annotation/README.md @@ -60,10 +60,10 @@ Key features are: - Use sync api against data modeling for only processing new/updated files - Store state/cursor in RAW db/table (as provided in configuration) - Run ALL mode - - Clean out status an logged status from previous runs in RAW, and process - all P&ID files. - - NOTE: to also clean / delete previous annotations also add: - cleanOldAnnotations = True in configuration + - Clean out cursor/state and per-run progress from previous runs in RAW, + then process every matching P&ID file from scratch. + - NOTE: to also clean / delete previous annotations, set + `cleanOldAnnotations: true` in configuration. - Annotation process - Optional: Use configuration for filter property to find Files and/or Assets @@ -103,9 +103,9 @@ This module manages the following resources: pipelines, transformations, and raw tables 3. extraction pipeline: - - ID: `ep_ctx_files_{{location_name}}_{{source_name}}_pandid_annotation` + - ID: `ep_ctx_files_pandid_annotation` - Content: Documentation and configuration for a CDF function running - P&ID contextualization/annotation (see function for more description) + P&ID contextualization/annotation (see function for more description). 4. transformations: - ID: `asset_tagging_tr` @@ -118,27 +118,30 @@ This module manages the following resources: adding tags that can be used in the annotation configuration's filterProperty. 5. function: - - ID: `fn_dm_context_files_{{location_name}}_{{source_name}}_annotation` - - Content: Reads new/updated files using the SYNC api. Extracts all - tags in P&ID that match tags from Asset & Files to create CDF + - ID: `fn_dm_context_files_annotation` + - Content: Reads new/updated files using the SYNC API. Extracts all + tags in P&ID that match tags from Assets & Files to create CDF annotations used for linking found objects in the document to - other resource types in CDF - -6. raw: in database `{{files_dataset}}` (typically `ds_files_{{location_name}}_{{source_name}}`) - - ID: `documents_docs` - - Content: Table storing document-to-document relationships found in P&ID files - - ID: `documents_tags` - - Content: Table storing document-to-tag relationships found in P&ID files - - ID: `files_state_store` - - Content: Table storing state/cursor information for incremental processing + other resource types in CDF. + +6. raw: stored in the database configured by the `rawDb` parameter in the + extraction pipeline config (by convention this matches the dataset external + id, e.g. `ds_files_{{location_name}}_{{source_name}}`). + - Table: `documents_docs` + - Content: Document-to-document relationships found in P&ID files. + - Table: `documents_tags` + - Content: Document-to-tag relationships found in P&ID files. + - Table: `files_state_store` + - Content: Cursor and per-batch progress for incremental processing + (read on resume, written after each successful batch). 7. workflow - ID: `{{workflow}}` (default: `entity_matching`) - Content: Orchestrates the P&ID annotation process: 1. Runs `asset_tagging_tr` transformation to tag assets 2. Runs `file_tagging_tr` transformation to tag files - 3. Runs `fn_dm_context_files_{{location_name}}_{{source_name}}_annotation` - function to perform annotation (depends on both tagging transformations) + 3. Runs `fn_dm_context_files_annotation` function to perform annotation + (depends on both tagging transformations) ## Variables @@ -359,11 +362,43 @@ config: #### Running functions locally -You may run locally: +You can run the function on your machine without deploying to CDF Functions — +useful for debugging configuration changes against a real CDF project, and for +the initial bulk pass before switching to incremental mode. + +1. From the function directory: + + ```bash + cd modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation + ``` + +2. Create a local `.env` file with the credentials needed for your CDF project + (see `handler.py::run_locally` for the full list): + + ```env + CDF_PROJECT=your-project + CDF_CLUSTER=greenfield + IDP_CLIENT_ID=... + IDP_CLIENT_SECRET=... + IDP_TOKEN_URL=... + ``` + +3. Install the function's runtime dependencies into your local environment: + + ```bash + pip install -r requirements.txt + ``` -To run `fn_dm_context_files_LOC_SOURCE_annotation`, -simply call the `handler.py` with a local `.env`file that contain env variable -for logging on to your CDF project +4. Run the handler: + + ```bash + python handler.py + ``` + + This calls `run_locally()`, which authenticates to CDF and invokes the same + `handle()` entry point as the deployed function. The extraction-pipeline + external id used by `run_locally()` is set near the bottom of `handler.py` + — adjust it to match your environment's actual extraction pipeline. #### Cognite Function runtime @@ -376,3 +411,73 @@ The current implementation processes files sequentially, which works well for in **Future Scalability:** The codebase is designed to be easily extended with parallel processing and async modules, allowing you to scale from processing dozens to thousands of P&ID files efficiently. This makes it a future-proof solution that grows with your needs. + +## Testing + +The function ships with a unit-test suite that pins down the bug-fix surface +(per-batch counting, cursor preservation on transient API errors, the +diagram-detect search-property key, RAW state coercion, entity de-duplication, +annotation-id boundary cases, and config validation). Tests do not require a +CDF connection — the `CogniteClient` is fully mocked. + +### Prerequisites + +- Python 3.11+ (matches the Cognite Functions runtime) +- The minimum runtime libraries needed to import `pipeline.py` and `config.py`: + + ```bash + pip install pytest cognite-sdk pyyaml pydantic + ``` + + `cognite-extractor-utils` is **not** required to run the suite — it is + lazy-imported inside the runtime function and the test suite never reaches + the construction site. + +### Run + +From the repo root: + +```bash +pytest -q modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/ +``` + +Expected output (47 tests): + +``` +............................................... [100%] +47 passed in ~1s +``` + +### What's covered + +- **`test_pipeline.py`** (26 tests) + - `_truncate` — short / at-limit / overflow / empty / pathological max-len. + - `create_annotation_id` — naive form, determinism, distinct raw → distinct + id, and the two length-boundary fallbacks (short form vs. truncated prefix). + - `read_state_cursor` / `read_state_batch_num` — typed coercion of the RAW + state values (e.g. numeric cursor coerced to `str`, string `"10"` coerced + to `int`, unparseable values reset to `0` with a warning). + - `get_all_entities` — file entities use the canonical `search_property` key, + cross-view entities are normalised to that same key, and duplicate + `(space, external_id)` entries are dropped. + - `get_new_files` — successful sync persists the new cursor; a transient 400 + retries with the **same** cursor (verified by capturing each call's + cursor); persistent 400 raises after `max_retries` without overwriting + state; non-400 errors propagate immediately with no sleep. + - `push_result_to_annotations` — a result item missing `fileInstanceId` no + longer aborts the batch (function returns `int`, not a tuple, and the + surviving items are still applied); cleanup goes through a single batched + `delete_annotations_for_files` call when `cleanOldAnnotations` is enabled. + +- **`test_config.py`** (21 tests) + - `Optional` field defaults (`debugFile`, `filterProperty`, `filterValues`). + - Required-field enforcement (parameterized over the mandatory fields). + - Threshold range validation (parameterized over good/bad values). + - `Literal[...]` validation for the `type` field. + +### Adding new tests + +Tests live alongside the function code following the repo convention. Create or +extend a `test_*.py` file in +`modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/` +and pytest will pick it up automatically. diff --git a/modules/contextualization/cdf_p_and_id_annotation/extraction_pipelines/ctx_files-pandid_annotation.ExtractionPipeline.yaml b/modules/contextualization/cdf_p_and_id_annotation/extraction_pipelines/ctx_files-pandid_annotation.ExtractionPipeline.yaml index 3022be1b..77a080c3 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/extraction_pipelines/ctx_files-pandid_annotation.ExtractionPipeline.yaml +++ b/modules/contextualization/cdf_p_and_id_annotation/extraction_pipelines/ctx_files-pandid_annotation.ExtractionPipeline.yaml @@ -1,7 +1,7 @@ -externalId: 'ep_ctx_files_{{location_name}}_{{source_name}}_pandid_annotation' -name: 'ctx:files:{{location_name}}:{{source_name}}:pandid_annotation' +externalId: 'ep_ctx_files_pandid_annotation' +name: 'ctx:files:pandid_annotation' dataSetExternalId: '{{files_dataset}}' -description: 'Annotation of P&ID documents from file source {{location_name}}:{{source_name}}' +description: 'Annotation of P&ID documents from file source {{location_name}}:{{source_name}}' rawTables: - dbName: 'ds_files_{{location_name}}_{{source_name}}' tableName: 'documents_tags' @@ -55,7 +55,7 @@ documentation: > Provide a data block with the parameters for the process to run. ``` - data = {"logLevel":"INFO", "ExtractionPipelineExtId": "ep_ctx_files_LOC_SOURCE_pandid_annotation"} + data = {"logLevel":"INFO", "ExtractionPipelineExtId": "ep_ctx_files_pandid_annotation"} ``` diff --git a/modules/contextualization/cdf_p_and_id_annotation/extraction_pipelines/ctx_files-pandid_annotation.config.yaml b/modules/contextualization/cdf_p_and_id_annotation/extraction_pipelines/ctx_files-pandid_annotation.config.yaml index fe466a38..0c86e702 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/extraction_pipelines/ctx_files-pandid_annotation.config.yaml +++ b/modules/contextualization/cdf_p_and_id_annotation/extraction_pipelines/ctx_files-pandid_annotation.config.yaml @@ -1,4 +1,4 @@ -externalId: 'ep_ctx_files_{{location_name}}_{{source_name}}_pandid_annotation' +externalId: 'ep_ctx_files_pandid_annotation' config: parameters: debug: False @@ -22,16 +22,16 @@ config: instanceSpace: {{ fileInstanceSpace }} externalId: {{organization}}File version: {{ viewVersion }} - searchProperty: aliases + searchProperty: name type: diagrams.FileLink filterProperty: tags - filterValues: ["PID", "ISO", "PLOT PLANS"] + filterValues: [] entityViews: - schemaSpace: {{ schemaSpace }} instanceSpace: {{ assetInstanceSpace }} externalId: {{organization}}Asset version: {{ viewVersion }} - searchProperty: aliases + searchProperty: name type: diagrams.AssetLink filterProperty: tags - filterValues: ["PID"] + filterValues: [] diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/__init__.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/__init__.py similarity index 100% rename from modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/__init__.py rename to modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/__init__.py diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/config.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/config.py similarity index 90% rename from modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/config.py rename to modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/config.py index 85543ea8..c521dd56 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/config.py +++ b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/config.py @@ -13,7 +13,7 @@ # Configuration classes class Parameters(BaseModel, alias_generator=to_camel): debug: bool - debug_file: str = None + debug_file: str | None = None run_all: bool clean_old_annotations: bool raw_db: str @@ -31,8 +31,8 @@ class ViewPropertyConfig(BaseModel, alias_generator=to_camel): version: str search_property: str = "alias" type: Literal["diagrams.FileLink", "diagrams.AssetLink"] - filter_property: str = None - filter_values: list[str] = None + filter_property: str | None = None + filter_values: list[str] | None = None def as_view_id(self) -> dm.ViewId: return dm.ViewId(space=self.schema_space, external_id=self.external_id, version=self.version) @@ -60,12 +60,6 @@ class Config(BaseModel, alias_generator=to_camel): parameters: Parameters data: ConfigData - @classmethod - def pares_direct_relation(cls, value: Any) -> Any: - if isinstance(value, dict): - return dm.DirectRelationReference.load(value) - return value - def load_config_parameters(client: CogniteClient, function_data: dict[str, Any]) -> Config: """Retrieves the configuration parameters from the function data and loads the configuration from CDF.""" diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/constants.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/constants.py similarity index 100% rename from modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/constants.py rename to modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/constants.py diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/handler.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/handler.py similarity index 95% rename from modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/handler.py rename to modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/handler.py index 87ce564f..70d37ed7 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/handler.py +++ b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/handler.py @@ -37,7 +37,7 @@ def _send() -> None: "cdf_cluster": client.config.cdf_cluster, "cdf_project": client.config.project, }) - threading.Thread(target=_send, daemon=False).start() + threading.Thread(target=_send, daemon=True).start() except Exception: # Usage tracking is best-effort; must not affect the handler. pass @@ -85,7 +85,7 @@ def run_locally(): ), ) ) - data = {"logLevel":"INFO", "ExtractionPipelineExtId": "ep_ctx_files_LOC_SOURCE_pandid_annotation"} + data = {"logLevel": "INFO", "ExtractionPipelineExtId": "ep_ctx_files_pandid_annotation"} handle(data, client) diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/logger.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/logger.py similarity index 100% rename from modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/logger.py rename to modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/logger.py diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/pipeline.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/pipeline.py similarity index 58% rename from modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/pipeline.py rename to modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/pipeline.py index f7dd48ba..41afdafb 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/pipeline.py +++ b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/pipeline.py @@ -1,12 +1,12 @@ +from __future__ import annotations + import json -import sys +import time import traceback -from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum from hashlib import sha256 -from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any from cognite.client import CogniteClient from cognite.client import data_modeling as dm @@ -27,10 +27,16 @@ ) from cognite.client.data_classes.data_modeling.query import NodeResultSetExpression, Query, Select, SourceSelector from cognite.client.exceptions import CogniteAPIError -from cognite.client.utils._text import shorten -from cognite.extractorutils.uploader import RawUploadQueue from config import Config, ViewPropertyConfig + +# RawUploadQueue is only constructed at runtime in annotate_p_and_id; importing +# it lazily lets pipeline.py be imported (e.g. for unit tests) without the +# cognite-extractor-utils package installed. The type hint below stays valid +# because of `from __future__ import annotations`. +if TYPE_CHECKING: # pragma: no cover + from cognite.extractorutils.uploader import RawUploadQueue from constants import ( + ANNOTATE_BATCH_SIZE, BATCH_SIZE, EXTERNAL_ID_LIMIT, FILE_LINK_EXTERNAL_ID, @@ -41,14 +47,17 @@ ) from logger import CogniteFunctionLogger -sys.path.append(str(Path(__file__).parent)) - -@dataclass class DiagramAnnotationStatus(Enum): SUGGESTED = "Suggested" APPROVED = "Approved" + +def _truncate(msg: str, max_len: int) -> str: + """Trim a string to max_len characters with an ellipsis if truncated.""" + return msg if len(msg) <= max_len else msg[: max_len - 3] + "..." + + # Define a custom exception class DiagramDetectError(Exception): def __init__(self, message="Error processing Diagram Detect"): @@ -76,18 +85,21 @@ def annotate_p_and_id( client: An instantiated CogniteClient config: A dataclass containing the configuration for the annotation process """ - global ANNOTATE_BATCH_SIZE pipeline_ext_id = data["ExtractionPipelineExtId"] error_count, annotated_count = 0, 0 + # Local copy so failure-mode adjustments (e.g. drop to size 1) don't leak + # across function invocations in a warm container. + annotate_batch_size = ANNOTATE_BATCH_SIZE try: file_cursor = None file_num = 0 if config.parameters.debug: logger = CogniteFunctionLogger("DEBUG") logger.debug("**** Write debug messages and only process one file *****") - ANNOTATE_BATCH_SIZE = 1 + annotate_batch_size = 1 logger.debug("Initiate RAW upload queue used to store output from Diagram parsing") + from cognite.extractorutils.uploader import RawUploadQueue raw_uploader = RawUploadQueue(cdf_client=client, max_queue_size=500000, trigger_log_level="INFO") # Check if we should run all files (then delete state content in RAW) or just new files @@ -98,10 +110,21 @@ def annotate_p_and_id( delete_table(client, config.parameters.raw_db, config.parameters.raw_table_state) elif config.parameters.debug_file and config.parameters.debug: logger.debug(f"Since debugging with one file name: {config.parameters.debug_file} - ignore cursor and batch number") - else: + + # Ensure RAW database/tables exist exactly once per invocation, before any + # state reads/writes or per-batch RAW uploads (M3). The create_table helper + # is idempotent. + raw_db = config.parameters.raw_db + create_table(client, raw_db, config.parameters.raw_table_state) + create_table(client, raw_db, config.parameters.raw_table_doc_tag) + create_table(client, raw_db, config.parameters.raw_table_doc_doc) + + if not config.parameters.run_all and not ( + config.parameters.debug_file and config.parameters.debug + ): logger.debug("Get file cursor and batch number from RAW, to continue processing from last run") - file_cursor = read_state_store(client, logger, STAT_STORE_CURSOR, config.parameters.raw_db, config.parameters.raw_table_state) - file_num = read_state_store(client, logger, STAT_STORE_NUM_IN_BATCH, config.parameters.raw_db, config.parameters.raw_table_state) + file_cursor = read_state_cursor(client, logger, raw_db, config.parameters.raw_table_state) + file_num = read_state_batch_num(client, logger, raw_db, config.parameters.raw_table_state) logger.debug("Create entities for files, assets, equipment and more if in configuration") entities = get_all_entities(client, logger, config) @@ -116,19 +139,26 @@ def annotate_p_and_id( update_pipeline_run(client, logger, pipeline_ext_id, "success", annotated_count, error_count, None) return - original_batch_size = ANNOTATE_BATCH_SIZE + original_batch_size = annotate_batch_size while len(new_files["files"]) > 0: doc_doc = [] doc_tag = [] annotation_view_id = config.data.annotation_view.as_view_id() search_property = config.data.annotation_job.file_view.search_property file_ids = new_files["files"].as_ids() + # Size of the most recently attempted batch; consulted by the + # exception handlers below so they don't double-count or + # mis-count using a post-mutated annotate_batch_size. + last_batch_size = 0 try: - for num in range(file_num, len(file_ids), ANNOTATE_BATCH_SIZE): - result = run_diagram_detect(client, logger, entities, file_ids[num:num+ANNOTATE_BATCH_SIZE], search_property) + for num in range(file_num, len(file_ids), annotate_batch_size): + batch = file_ids[num:num+annotate_batch_size] + last_batch_size = len(batch) + result = run_diagram_detect(client, logger, entities, batch, search_property) if result is None: - error_count += len(file_ids) + error_count += last_batch_size else: + errors_before = error_count error_count = push_result_to_annotations( client, config, @@ -141,13 +171,14 @@ def annotate_p_and_id( doc_doc, doc_tag ) - annotated_count += len(file_ids) - error_count + errors_added = error_count - errors_before + annotated_count += last_batch_size - errors_added # Update raw with new annotations write_mapping_to_raw(client, config, raw_uploader, doc_doc, doc_tag, logger) logger.debug("Update state store with doc num in batch - in case timeout to set water mark") - update_state_store(client, logger, file_cursor, num+ANNOTATE_BATCH_SIZE, config, None, STAT_STORE_NUM_IN_BATCH) + update_state_store(client, logger, file_cursor, num+annotate_batch_size, config, None, STAT_STORE_NUM_IN_BATCH) if config.parameters.debug: break @@ -169,27 +200,29 @@ def annotate_p_and_id( break except DiagramDetectError as e: - error_count += ANNOTATE_BATCH_SIZE + # DiagramDetectError is raised by run_diagram_detect only for single-file + # batches that exhausted retries, so last_batch_size is 1 here. + error_count += last_batch_size msg = f"Skipping file - diagram detect failed with error: {e!s}" logger.error(msg) - file_num += ANNOTATE_BATCH_SIZE - ANNOTATE_BATCH_SIZE = original_batch_size + file_num += last_batch_size + annotate_batch_size = original_batch_size except Exception as e: - if ANNOTATE_BATCH_SIZE > 1: - msg = f"Failed to push: {ANNOTATE_BATCH_SIZE} annotations to data model, setting Batch Size = 1 and retry error: {e!s}" + if annotate_batch_size > 1: + msg = f"Failed to push: {last_batch_size} annotations to data model, setting Batch Size = 1 and retry error: {e!s}" logger.error(msg) - ANNOTATE_BATCH_SIZE = 1 + annotate_batch_size = 1 else: - error_count += ANNOTATE_BATCH_SIZE - msg = f"Failed to push: {ANNOTATE_BATCH_SIZE} annotations to data model, error: {e!s}" + error_count += last_batch_size + msg = f"Failed to push: {last_batch_size} annotations to data model, error: {e!s}" logger.error(msg) raise Exception(msg) from e except Exception as e: msg = f"failed, Message: {e!s}" update_pipeline_run(client, logger, pipeline_ext_id, "failure", annotated_count, error_count, msg) - raise Exception("msg") + raise Exception(msg) from e @@ -220,36 +253,64 @@ def update_pipeline_run( ExtractionPipelineRun( extpipe_external_id=xid, status=status, - message=shorten(msg, 1000) + message=_truncate(msg, 1000) ) ) -def read_state_store( +def _read_state_value( client: CogniteClient, logger: CogniteFunctionLogger, key: str, db: str, - table: str -) -> str: - - if key == STAT_STORE_NUM_IN_BATCH: - value = 0 - else: - value = None + table: str, +) -> Any: + """Look up a single state value from RAW. Returns None if not found. + The state table is expected to exist already; callers should ensure + it via a one-time `create_table` call at pipeline start (M3). + """ logger.info(f"Read state from DB: {db} Table: {table} Key: {key}") - - logger.debug("Create DB / Table for state if it does not exist") - create_table(client, db, table) - - row_list = client.raw.rows.list(db_name=db, table_name=table, columns=[STAT_STORE_VALUE], limit=-1) + row_list = client.raw.rows.list( + db_name=db, table_name=table, columns=[STAT_STORE_VALUE], limit=-1 + ) for row in row_list: if row.key == key: - value = row.columns[STAT_STORE_VALUE] - if value == 0 or value is None: - logger.debug(f"State not found for key: {key} -> read all values") + return row.columns[STAT_STORE_VALUE] + logger.debug(f"State not found for key: {key}") + return None + + +def read_state_cursor( + client: CogniteClient, + logger: CogniteFunctionLogger, + db: str, + table: str, +) -> str | None: + """Read the persisted sync cursor; None if no prior state.""" + raw_value = _read_state_value(client, logger, STAT_STORE_CURSOR, db, table) + if raw_value is None: + return None + return str(raw_value) + + +def read_state_batch_num( + client: CogniteClient, + logger: CogniteFunctionLogger, + db: str, + table: str, +) -> int: + """Read the persisted in-batch index, coercing missing/non-int values to 0.""" + raw_value = _read_state_value(client, logger, STAT_STORE_NUM_IN_BATCH, db, table) + if raw_value is None: + return 0 + try: + return int(raw_value) + except (TypeError, ValueError): + logger.warning( + f"Stored {STAT_STORE_NUM_IN_BATCH} is not an int ({raw_value!r}); resetting to 0." + ) + return 0 - return value def update_state_store( client: CogniteClient, @@ -258,14 +319,13 @@ def update_state_store( file_num: int, config: Config, cursor: str, - batch_num: str + batch_num: str, ) -> None: + """Persist cursor and/or batch_num into the state table. - state_row = None - - # Create DB / Table for state if it does not exist - create_table(client, config.parameters.raw_db, config.parameters.raw_table_state) - + The state table is expected to exist already; callers should ensure + it via a one-time `create_table` call at pipeline start (M3). + """ if cursor: state_row = Row(cursor, {STAT_STORE_VALUE: file_cursor}) client.raw.rows.insert(config.parameters.raw_db, config.parameters.raw_table_state, state_row) @@ -285,74 +345,112 @@ def get_all_entities( """ Read all entities from space + Every entity dict stores its searchable value under the file view's + search_property name. That key must match the diagram-detect + `search_field` argument used in run_diagram_detect, otherwise the API + silently returns no matches for the entity. + :returns: Nodelist of entities nodes """ entities = [] job_config = config.data.annotation_job all_files = get_all_files(client, logger, job_config.file_view) - search_property = job_config.file_view.search_property + file_search_property = job_config.file_view.search_property + file_view_id = job_config.file_view.as_view_id() for file in all_files: entities.append( - { - "external_id": file.external_id, - "name": file.properties[job_config.file_view.as_view_id()].get("name", ""), - "space": file.space, - "search_property": file.properties[job_config.file_view.as_view_id()].get(search_property, ""), - "annotation_type_external_id": job_config.file_view.type, - } - ) - logger.debug(f"Number files added as entities: {len(entities)}") + { + "external_id": file.external_id, + "name": file.properties[file_view_id].get("name", ""), + "space": file.space, + file_search_property: file.properties[file_view_id].get(file_search_property, ""), + "annotation_type_external_id": job_config.file_view.type, + } + ) + logger.debug(f"Number of files added as entities: {len(entities)}") for entity_view in job_config.entity_views: - type = entity_view.type - search_property = entity_view.search_property + entity_type = entity_view.type + view_search_property = entity_view.search_property view_id = entity_view.as_view_id() - logger.debug(f"Get all entities from view: {view_id} with search property: {search_property} and type: {type}") + logger.debug( + f"Get all entities from view: {view_id} with search property: " + f"{view_search_property} and type: {entity_type}" + ) + + if view_search_property != file_search_property: + logger.warning( + f"View {view_id} declares search_property '{view_search_property}' but " + f"the file view's search_property '{file_search_property}' is used as the " + f"diagram-detect search_field. Entities from this view will be stored " + f"under '{file_search_property}' to remain matchable." + ) is_selected = get_entity_filter(entity_view, logger) entity_list = client.data_modeling.instances.list( space=entity_view.instance_space, - sources=[entity_view.as_view_id()], + sources=[view_id], filter=is_selected, - limit=-1 + limit=-1, ) - warningLogged = False + warning_logged = False for entity in entity_list: - if search_property in entity.properties[view_id]: - if not warningLogged: - logger.debug(f"View {view_id} contains {search_property} property") - warningLogged = True - - entities.append( - { - "external_id": entity.external_id, - "name": entity.properties[view_id]["name"], - "space": entity.space, - search_property: entity.properties[view_id][search_property], - "annotation_type_external_id": type, - } - ) + if view_search_property in entity.properties[view_id]: + if not warning_logged: + logger.debug(f"View {view_id} contains {view_search_property} property") + warning_logged = True + value = entity.properties[view_id][view_search_property] else: - if not warningLogged: - logger.warning(f"View {view_id} don't contains {search_property} property, using name instead") - warningLogged = True - entities.append( - { - "external_id": entity.external_id, - "name": entity.properties[view_id]["name"], - "space": entity.space, - search_property: entity.properties[view_id]["name"], - "annotation_type_external_id": type, - } - ) - logger.info(f"Total number of entities: {len(entities)} including elements from view: {view_id} and type: {type}") + if not warning_logged: + logger.warning( + f"View {view_id} does not contain {view_search_property} property, " + f"using name instead" + ) + warning_logged = True + value = entity.properties[view_id]["name"] + + entities.append( + { + "external_id": entity.external_id, + "name": entity.properties[view_id]["name"], + "space": entity.space, + file_search_property: value, + "annotation_type_external_id": entity_type, + } + ) + logger.info( + f"Total number of entities: {len(entities)} including elements from " + f"view: {view_id} and type: {entity_type}" + ) + # Two views (or a file view + an entity view) referencing the same node + # would otherwise produce duplicate entities, which the diagram-detect API + # turns into duplicate edges (different external_ids because + # create_annotation_id is detect-result-content-aware, but semantically + # duplicate). De-dup on (space, external_id) and warn if anything was + # dropped (H8). + seen: set[tuple[str, str]] = set() + deduped: list[dict] = [] + for entity in entities: + key = (entity["space"], entity["external_id"]) + if key in seen: + continue + seen.add(key) + deduped.append(entity) + + dropped = len(entities) - len(deduped) + if dropped: + logger.warning( + f"Dropped {dropped} duplicate entities (same (space, external_id)) " + f"that appeared in more than one configured view. Total unique " + f"entities: {len(deduped)}." + ) - return entities + return deduped @@ -362,9 +460,10 @@ def get_all_files( file_view_config: ViewPropertyConfig, ) -> NodeList[Node]: """ - Read files based on tag PID + Read all files matching the configured filter (uploaded, supported mime + type, plus any caller-defined extra filter). - :returns: Nodelist of files nodes + :returns: NodeList of file nodes """ logger.debug(f"Get all files from view: {file_view_config.as_view_id()}") is_selected = get_file_filter(file_view_config, None, logger) @@ -388,12 +487,13 @@ def get_new_files( config: Config ) -> NodeList[Node]: """ - Read new files based on TAG PID + Read new (or changed) files since the last persisted sync cursor, using + the configured file filter. - :returns: Nodelist of files nodes + :returns: NodeList of file nodes """ - file_view_config= config.data.annotation_job.file_view + file_view_config = config.data.annotation_job.file_view if config.parameters.debug: debug_file = config.parameters.debug_file else: @@ -418,21 +518,37 @@ def get_new_files( retry = True sync_result = None + max_retries = 3 while retry: sync_query.cursors["files"] = file_cursor try: sync_result = client.data_modeling.instances.sync(sync_query) retry = False - except Exception as e: + except CogniteAPIError as e: msg = f"failed, Message: {e!s}" - if e.code == 400: - logger.warning(f"Got 400 error, Resetting cursor and trying again : {msg}") - file_cursor = None - num_retry += 1 - if num_retry > 3: - raise Exception(msg) from e - else: + if e.code != 400: raise Exception(msg) from e + + num_retry += 1 + if num_retry > max_retries: + # Persistent 400 with this cursor is operator-actionable + # (e.g. the data model was rebuilt and the cursor is no longer + # valid). Bail out without silently restarting from scratch and + # overwriting the persisted cursor; if the cursor really is + # stale, the operator can set runAll: true once to reset. + logger.error( + f"400 error after {max_retries} retries with the existing cursor. " + "If the underlying data model was rebuilt, set runAll: true once " + f"to reset the cursor, then resume normal operation. Original error: {msg}" + ) + raise Exception(msg) from e + + sleep_for = min(2 ** num_retry, 30) + logger.warning( + f"Got 400 error (attempt {num_retry}/{max_retries}); retrying with the same " + f"cursor in {sleep_for}s: {msg}" + ) + time.sleep(sleep_for) new_cursor_value = sync_result.cursors["files"] @@ -484,7 +600,7 @@ def get_entity_filter( is_view = dm.filters.HasData(views=[view_config.as_view_id()]) is_selected = dm.filters.And(is_view) - dbg_msg = f"For for view: {view_config.as_view_id()} - Entity filter: HasData = True" + dbg_msg = f"For view: {view_config.as_view_id()} - Entity filter: HasData = True" if view_config.filter_property and view_config.filter_values: @@ -506,38 +622,56 @@ def get_entity_filter( def run_diagram_detect( client: CogniteClient, logger: CogniteFunctionLogger, - entities: list[dict[str, any]], + entities: list[dict[str, Any]], file_ids: list[NodeId], search_property: str, ) -> DiagramDetectResults: """ - Run diagram detect job - on bach of documents + Run diagram detect job on a batch of documents. + + Retries transient failures with bounded exponential backoff. If retries are + exhausted on a multi-file batch, raises a generic Exception so the caller + can drop to single-file mode. If retries are exhausted on a single file, + raises DiagramDetectError so the caller can skip the file. """ - logger.info(f"Run diagram detect on {len(file_ids)} files, num entities: {len(entities)}, partial match: True, search field: {search_property}") + logger.info( + f"Run diagram detect on {len(file_ids)} files, num entities: {len(entities)}, " + f"partial match: True, search field: {search_property}" + ) + max_retries = 3 num_retry = 0 - retry = True - while retry: + while True: try: - job = client.diagrams.detect( file_instance_ids=file_ids, entities=entities, partial_match=True, search_field=search_property, - configuration=DiagramDetectConfig(read_embedded_text=True) + configuration=DiagramDetectConfig(read_embedded_text=True), ) logger.debug("Diagram detect job started... waiting for job to finish") return job.result except Exception as e: num_retry += 1 - if num_retry > 3: + if num_retry > max_retries: if len(file_ids) > 1: - raise Exception(f"Batch diagram detect batch failed on {len(file_ids)} files - rerunning with one file at a time") - else: - raise DiagramDetectError(f"Diagram detect job failed for {file_ids}, error: {e} - skipping file") + raise Exception( + f"Batch diagram detect failed on {len(file_ids)} files after " + f"{max_retries} retries - falling back to one file at a time" + ) from e + raise DiagramDetectError( + f"Diagram detect job failed for {file_ids} after {max_retries} retries: " + f"{e!s} - skipping file" + ) from e + + sleep_for = min(2 ** num_retry, 30) + logger.warning( + f"Diagram detect attempt {num_retry}/{max_retries} failed: {e!s}; " + f"retrying in {sleep_for}s" + ) + time.sleep(sleep_for) @@ -548,66 +682,86 @@ def push_result_to_annotations( annotation_view_id: ViewId, files_view_id: ViewId, result: DiagramDetectResults, - new_files: dict[str, any], + new_files: dict[str, Any], error_count: int, doc_doc: list[dict], - doc_tag: list[dict] -) -> None: + doc_tag: list[dict], +) -> int: - edge_applies = [] + edge_applies: list[EdgeApply] = [] + files_to_clean: list[NodeId] = [] logger.debug(f"Pushing annotations to data model, number of items: {len(result['items'])}") + # Build a (space, external_id) -> sourceId index once per call instead of + # rescanning new_files["files"] for every annotation (M4). + source_id_by_node: dict[tuple[str, str], str | None] = {} + for file_node in new_files["files"]: + node_props = file_node.properties.get(files_view_id, {}) + source_id_by_node[(file_node.space, file_node.external_id)] = node_props.get("sourceId") + for result_item in result["items"]: file_instance_id_dict = result_item.get("fileInstanceId") - if file_instance_id_dict is not None: - logger.debug(f"File instance id: {file_instance_id_dict}") - file_instance_id = NodeId.load(file_instance_id_dict) - - if config.parameters.clean_old_annotations: - delete_annotations_for_file(client, logger, annotation_view_id, file_instance_id) - - source_id = get_file_source_id(new_files, file_instance_id, files_view_id) - - edge_apply, error_count = _result_item_to_edge_applies( - config, - logger, - annotation_view_id, - result_item, - file_instance_id, - source_id, - error_count, - doc_doc, - doc_tag, - ) - edge_applies.extend(edge_apply) - logger.info(f"Number of annotations for file: {result_item['fileInstanceId']['externalId']} to apply: {len(edge_apply)}") - else: + if file_instance_id_dict is None: error_count += 1 - logger.error(f"File instance id not found in result item: {result_item['fileId']}") - return [], error_count + logger.error(f"File instance id not found in result item: {result_item.get('fileId')}") + continue + + logger.debug(f"File instance id: {file_instance_id_dict}") + file_instance_id = NodeId.load(file_instance_id_dict) + + if config.parameters.clean_old_annotations: + files_to_clean.append(file_instance_id) + + source_id = source_id_by_node.get((file_instance_id.space, file_instance_id.external_id)) + + edge_apply, error_count = _result_item_to_edge_applies( + config, + logger, + annotation_view_id, + result_item, + file_instance_id, + source_id, + error_count, + doc_doc, + doc_tag, + ) + edge_applies.extend(edge_apply) + logger.debug( + f"Number of annotations for file: " + f"{result_item['fileInstanceId']['externalId']} to apply: {len(edge_apply)}" + ) - num_retry = 0 - retry = True + # Clean up old annotations for the whole page in a single batched call (M6), + # before applying the new ones so we don't accidentally delete fresh edges. + if config.parameters.clean_old_annotations and files_to_clean: + delete_annotations_for_files(client, logger, annotation_view_id, files_to_clean) - while retry: + max_retries = 3 + num_retry = 0 + while True: try: client.data_modeling.instances.apply(edge_applies) logger.debug(f"Total number of annotations added/updated: {len(edge_applies)}") return error_count except Exception as e: num_retry += 1 - if num_retry > 3: - msg = f"Annotations add/update of: {len(edge_applies)} failed, error: {e.message}" + if num_retry > max_retries: + msg = f"Annotations add/update of: {len(edge_applies)} failed, error: {e!s}" logger.error(msg) raise Exception(msg) from e + sleep_for = min(2 ** num_retry, 30) + logger.warning( + f"Apply attempt {num_retry}/{max_retries} failed: {e!s}; retrying in {sleep_for}s" + ) + time.sleep(sleep_for) def _result_item_to_edge_applies( config: Config, logger: CogniteFunctionLogger, annotation_view_id: ViewId, - result_item: dict[str, any], + result_item: dict[str, Any], file_instance_id: NodeId, source_id: str, error_count: int, @@ -645,7 +799,7 @@ def _detect_annotation_to_edge_applies( annotation_view_id: ViewId, file_instance_id: NodeId, source_id: str, - detect_annotation: dict[str, any], + detect_annotation: dict[str, Any], ) -> tuple[list[EdgeApply], int]: diagram_annotations = [] @@ -727,14 +881,21 @@ def _detect_annotation_to_edge_applies( doc_tag.append(doc_log) return diagram_annotations, error_count - except Exception as e: + except (KeyError, TypeError, ValueError) as e: + # These are the parse-shape errors we expect from a malformed detect + # response: missing dict keys, non-numeric coordinates, bad types in + # entity payloads. Bugs (NameError, ImportError, etc.) propagate so + # they surface in CI/log review rather than being silently absorbed + # as a single error_count increment. error_count += 1 - logger.error(f"Failed to create annotation for file: {file_instance_id.external_id} error: {e}") - return [],error_count + logger.error( + f"Failed to create annotation for file: {file_instance_id.external_id} error: {e!s}" + ) + return [], error_count def create_annotation_id( file_id: dm.NodeId, - entity: dict[str, any], + entity: dict[str, Any], text: str, raw_annotation: dict[str, Any] ) -> str: @@ -744,40 +905,25 @@ def create_annotation_id( return naive prefix = f"{file_id.external_id}:{entity['external_id']}:{text}" - shorten = f"{prefix}:{hash_}" - if len(shorten) < EXTERNAL_ID_LIMIT: - return shorten + short_id = f"{prefix}:{hash_}" + if len(short_id) < EXTERNAL_ID_LIMIT: + return short_id return prefix[: EXTERNAL_ID_LIMIT - 10] + hash_ -def get_file_source_id( - new_files: dict[str, any], - file_instance_id: NodeId, - files_view_id: ViewId -) -> str: - file_source_id = None - - # Find the matching file and extract the sourceId - for file_node in new_files["files"]: - if file_node.external_id == file_instance_id.external_id and \ - file_node.space == file_instance_id.space: - if "sourceId" in file_node.properties[files_view_id]: - file_source_id = file_node.properties[files_view_id]["sourceId"] - break - - return file_source_id - - def write_mapping_to_raw( client: CogniteClient, config: Config, raw_uploader: RawUploadQueue, doc_doc: list[dict], doc_tag: list[dict], - logger: CogniteFunctionLogger + logger: CogniteFunctionLogger, ) -> None: """ - Write matching results to RAW DB + Write matching results to RAW DB. + + The destination tables are expected to exist already; callers should + ensure them via a one-time `create_table` call at pipeline start (M3). Args: config: Instance of ContextConfig @@ -790,10 +936,6 @@ def write_mapping_to_raw( tag_tbl = config.parameters.raw_table_doc_tag doc_tbl = config.parameters.raw_table_doc_doc - logger.debug("Create DB / Table for DB: {raw_db} Tables: {doc_tag} and {doc_doc} if it does not exist") - create_table(client, raw_db, tag_tbl) - create_table(client, raw_db, doc_tbl) - for tag in doc_tag: raw_uploader.add_to_upload_queue(raw_db, tag_tbl, Row(str(tag['external_id']), tag)) @@ -804,7 +946,7 @@ def write_mapping_to_raw( logger.info(f"Added {len(doc_doc)} rows to {raw_db}/{doc_tbl}") - # Upload any remaining RAW cols in queue + # Upload any remaining RAW rows in queue raw_uploader.upload() @@ -840,61 +982,71 @@ def get_property( return [view_id.space, f"{view_id.external_id}/{view_id.version}", property] -def list_annotations_for_file( +def list_annotations_for_files( client: CogniteClient, annotation_view_id: ViewId, - node: NodeId + nodes: list[NodeId], ) -> list: """ - List all annotation edges for a file node. + List all annotation edges for the given file nodes. + + The instances.list API takes a single `space`, so when the input nodes + span multiple spaces we issue one list call per space. Typical pages + are single-space, so this is one call. Args: client (CogniteClient): The Cognite client instance. annotation_view_id (ViewId): The ViewId of the annotation view. - node (NodeId): The NodeId of the file node. + nodes (list[NodeId]): The file nodes whose annotations to list. Returns: - list: A list of edges (annotations) linked to the file node. + list: A list of edges (annotations) linked to the input file nodes. """ + if not nodes: + return [] + external_ids = [n.external_id for n in nodes] is_function = dm.filters.Equals(get_property(annotation_view_id, "sourceCreatedUser"), FUNCTION_ID) - is_file = dm.filters.Equals(get_property(annotation_view_id, "name"), node.external_id) + is_file = dm.filters.In(get_property(annotation_view_id, "name"), external_ids) is_selected = dm.filters.And(is_function, is_file) - # Query for edges (annotations) connected to the file node - annotations = client.data_modeling.instances.list( - space=node.space, - sources=[annotation_view_id], - instance_type="edge", - filter=is_selected, - limit=-1, - ) + annotations: list = [] + for space in {n.space for n in nodes}: + annotations.extend( + client.data_modeling.instances.list( + space=space, + sources=[annotation_view_id], + instance_type="edge", + filter=is_selected, + limit=-1, + ) + ) return annotations -def delete_annotations_for_file( +def delete_annotations_for_files( client: CogniteClient, logger: CogniteFunctionLogger, annotation_view_id: ViewId, - node: NodeId + nodes: list[NodeId], ) -> None: """ - Delete all annotation edges for a file node. + Delete all annotation edges for the given file nodes in a single batched + list+delete pair (M6). Args: client (CogniteClient): The Cognite client instance. annotation_view_id (ViewId): The ViewId of the annotation view. - node (NodeId): The NodeId of the file node. + nodes (list[NodeId]): The file nodes whose annotations to clean up. """ - # List annotations for the file node - annotations = list_annotations_for_file(client, annotation_view_id, node) - if not annotations: - logger.debug(f"No annotations found for file with NodeId: {node}") + if not nodes: return - # Extract edge IDs for deletion - edge_ids = [EdgeId(space=node.space, external_id=edge.external_id) for edge in annotations] + annotations = list_annotations_for_files(client, annotation_view_id, nodes) + if not annotations: + logger.debug(f"No old annotations to clean for {len(nodes)} files") + return - # Delete edges + edge_ids = [EdgeId(space=edge.space, external_id=edge.external_id) for edge in annotations] client.data_modeling.instances.delete(edge_ids) - logger.info(f"Deleted {len(edge_ids)} annotations for file with NodeId: {node}") + logger.info(f"Deleted {len(edge_ids)} old annotations across {len(nodes)} files") diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/requirements.txt b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/requirements.txt similarity index 100% rename from modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_LOC_SOURCE_annotation/requirements.txt rename to modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/requirements.txt diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_config.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_config.py new file mode 100644 index 00000000..8326b35b --- /dev/null +++ b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_config.py @@ -0,0 +1,181 @@ +"""Unit tests for `config.py`. + +Focus is on H2 — the Optional fields (`debug_file`, `filter_property`, +`filter_values`) must default to None when omitted from the source config, +and pydantic v2 must accept that without complaint. + +Run from the function directory: + + pytest -q test_config.py +""" +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +# Same path-prepend pattern used by handler.py so flat imports work in-test. +sys.path.append(str(Path(__file__).parent)) + +from config import Config, Parameters, ViewPropertyConfig # noqa: E402 + + +def _minimal_config_dict() -> dict: + return { + "parameters": { + "debug": False, + "runAll": False, + "cleanOldAnnotations": False, + "rawDb": "raw-db", + "rawTableState": "state", + "rawTableDocTag": "doc_tag", + "rawTableDocDoc": "doc_doc", + "autoApprovalThreshold": 0.9, + "autoSuggestThreshold": 0.5, + }, + "data": { + "annotationView": { + "schemaSpace": "schema", + "externalId": "AnnotationView", + "version": "v1", + }, + "annotationJob": { + "fileView": { + "schemaSpace": "schema", + "instanceSpace": "instance", + "externalId": "FileView", + "version": "v1", + "searchProperty": "alias", + "type": "diagrams.FileLink", + }, + "entityViews": [], + }, + }, + } + + +class TestParametersOptionalFields: + """H2: Parameters.debug_file is `str | None = None`.""" + + def test_debug_file_omitted_defaults_to_none(self): + cfg = Config.model_validate(_minimal_config_dict()) + assert cfg.parameters.debug_file is None + + def test_debug_file_explicit_none_accepted(self): + d = _minimal_config_dict() + d["parameters"]["debugFile"] = None + cfg = Config.model_validate(d) + assert cfg.parameters.debug_file is None + + def test_debug_file_explicit_string_accepted(self): + d = _minimal_config_dict() + d["parameters"]["debugFile"] = "P-001.pdf" + cfg = Config.model_validate(d) + assert cfg.parameters.debug_file == "P-001.pdf" + + +class TestViewPropertyConfigOptionalFields: + """H2: ViewPropertyConfig.filter_property and filter_values are Optional.""" + + def test_filter_fields_omitted_default_to_none(self): + cfg = Config.model_validate(_minimal_config_dict()) + file_view = cfg.data.annotation_job.file_view + assert file_view.filter_property is None + assert file_view.filter_values is None + + def test_filter_fields_explicit_none_accepted(self): + d = _minimal_config_dict() + d["data"]["annotationJob"]["fileView"]["filterProperty"] = None + d["data"]["annotationJob"]["fileView"]["filterValues"] = None + cfg = Config.model_validate(d) + file_view = cfg.data.annotation_job.file_view + assert file_view.filter_property is None + assert file_view.filter_values is None + + def test_filter_fields_explicit_values_accepted(self): + d = _minimal_config_dict() + d["data"]["annotationJob"]["fileView"]["filterProperty"] = "site" + d["data"]["annotationJob"]["fileView"]["filterValues"] = ["a", "b"] + cfg = Config.model_validate(d) + file_view = cfg.data.annotation_job.file_view + assert file_view.filter_property == "site" + assert file_view.filter_values == ["a", "b"] + + +class TestRequiredFieldsStillRequired: + """The H2 Optional fix must NOT have weakened the required fields.""" + + @pytest.mark.parametrize( + "missing_key", + ["debug", "runAll", "cleanOldAnnotations", "rawDb", "rawTableState"], + ) + def test_missing_required_parameter_field_raises(self, missing_key): + d = _minimal_config_dict() + del d["parameters"][missing_key] + with pytest.raises(Exception): + Config.model_validate(d) + + def test_view_property_config_requires_type_literal(self): + with pytest.raises(Exception): + ViewPropertyConfig.model_validate( + { + "schemaSpace": "schema", + "instanceSpace": "instance", + "externalId": "FileView", + "version": "v1", + # type intentionally omitted + } + ) + + def test_view_property_config_rejects_unknown_type_literal(self): + with pytest.raises(Exception): + ViewPropertyConfig.model_validate( + { + "schemaSpace": "schema", + "instanceSpace": "instance", + "externalId": "FileView", + "version": "v1", + "type": "diagrams.NotARealType", + } + ) + + +class TestThresholdValidation: + """The pydantic Field constraints on the auto_*_threshold values must hold.""" + + @pytest.mark.parametrize("bad_value", [0.0, -0.1, 1.1, 2.0]) + def test_approval_threshold_out_of_range_rejected(self, bad_value): + d = _minimal_config_dict() + d["parameters"]["autoApprovalThreshold"] = bad_value + with pytest.raises(Exception): + Config.model_validate(d) + + @pytest.mark.parametrize("good_value", [0.001, 0.5, 1.0]) + def test_approval_threshold_in_range_accepted(self, good_value): + d = _minimal_config_dict() + d["parameters"]["autoApprovalThreshold"] = good_value + cfg = Config.model_validate(d) + assert cfg.parameters.auto_approval_threshold == good_value + + +class TestParametersDirectInstantiation: + """Smoke check that the model validates standalone, not just through Config.""" + + def test_parameters_minimal_via_camel_aliases(self): + params = Parameters.model_validate( + { + "debug": True, + "runAll": False, + "cleanOldAnnotations": True, + "rawDb": "db", + "rawTableState": "state", + "rawTableDocTag": "doc_tag", + "rawTableDocDoc": "doc_doc", + "autoApprovalThreshold": 0.95, + "autoSuggestThreshold": 0.6, + } + ) + assert params.debug is True + assert params.debug_file is None + assert params.auto_suggest_threshold == 0.6 diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py new file mode 100644 index 00000000..eb0397a1 --- /dev/null +++ b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py @@ -0,0 +1,526 @@ +"""Unit tests for `pipeline.py`. + +The tests focus on the bug-fix surface that motivated the recent cleanup PRs: + +* C2 (per-batch counting) — exercised indirectly via push_result_to_annotations. +* C3 (push_result_to_annotations + always returns int, never tuple). +* C4 (file-entity search_property key, + cross-view canonical key). +* C7 (cursor preservation on a 400 + in get_new_files). +* C8 (typed coercion in + read_state_cursor / read_state_batch_num). +* H8 (entity de-duplication in + get_all_entities). +* `_truncate` helper. +* `create_annotation_id` length-boundary fallbacks. + +Tests do not require a live CDF connection. The CogniteClient is fully mocked +via unittest.mock.MagicMock; pydantic models are exercised through model_validate +with camelCase aliases. + +Run from the function directory: + + pytest -q test_pipeline.py +""" +from __future__ import annotations + +import sys +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest +from cognite.client import data_modeling as dm +from cognite.client.exceptions import CogniteAPIError + +# Same path-prepend pattern used by handler.py so flat imports work in-test. +sys.path.append(str(Path(__file__).parent)) + +import pipeline # noqa: E402 (after sys.path tweak) +from config import Config # noqa: E402 +from logger import CogniteFunctionLogger # noqa: E402 +from pipeline import ( # noqa: E402 + _truncate, + create_annotation_id, + get_all_entities, + get_new_files, + push_result_to_annotations, + read_state_batch_num, + read_state_cursor, +) + +# --------------------------------------------------------------------------- # +# Fixtures # +# --------------------------------------------------------------------------- # + +@pytest.fixture +def logger() -> CogniteFunctionLogger: + return CogniteFunctionLogger("DEBUG") + + +@pytest.fixture +def file_view_id() -> dm.ViewId: + return dm.ViewId(space="schema", external_id="FileView", version="v1") + + +@pytest.fixture +def asset_view_id() -> dm.ViewId: + return dm.ViewId(space="schema", external_id="AssetView", version="v1") + + +@pytest.fixture +def base_config_dict() -> dict: + """Minimal pipeline config in pydantic-alias (camelCase) form.""" + return { + "parameters": { + "debug": False, + "runAll": False, + "cleanOldAnnotations": False, + "rawDb": "raw-db", + "rawTableState": "state", + "rawTableDocTag": "doc_tag", + "rawTableDocDoc": "doc_doc", + "autoApprovalThreshold": 0.9, + "autoSuggestThreshold": 0.5, + }, + "data": { + "annotationView": { + "schemaSpace": "schema", + "externalId": "AnnotationView", + "version": "v1", + }, + "annotationJob": { + "fileView": { + "schemaSpace": "schema", + "instanceSpace": "instance", + "externalId": "FileView", + "version": "v1", + "searchProperty": "alias", + "type": "diagrams.FileLink", + }, + "entityViews": [], + }, + }, + } + + +@pytest.fixture +def config(base_config_dict) -> Config: + return Config.model_validate(base_config_dict) + + +def _file_node(space: str, external_id: str, view_id: dm.ViewId, name: str, alias: str): + """Lightweight stand-in for a Cognite Node with the attrs the pipeline reads.""" + return SimpleNamespace( + external_id=external_id, + space=space, + properties={view_id: {"name": name, "alias": alias}}, + ) + + +# --------------------------------------------------------------------------- # +# _truncate # +# --------------------------------------------------------------------------- # + +class TestTruncate: + def test_short_message_unchanged(self): + assert _truncate("hello", 100) == "hello" + + def test_exactly_at_limit_unchanged(self): + assert _truncate("a" * 10, 10) == "a" * 10 + + def test_overflow_is_ellipsis_truncated(self): + result = _truncate("a" * 100, 10) + assert len(result) == 10 + assert result.endswith("...") + assert result.startswith("a" * 7) + + def test_empty_message(self): + assert _truncate("", 5) == "" + + def test_max_len_equal_to_ellipsis_returns_only_ellipsis(self): + # Pathological corner case: max_len == 3. Should not crash. + assert _truncate("abcdef", 3) == "..." + + +# --------------------------------------------------------------------------- # +# create_annotation_id # +# --------------------------------------------------------------------------- # + +class TestCreateAnnotationId: + @staticmethod + def _inputs(raw=None): + file_id = dm.NodeId(space="instance", external_id="file-001") + entity = {"space": "instance", "external_id": "asset-001"} + text = "TAG-001" + if raw is None: + raw = {"region": {"page": 1}, "confidence": 0.92} + return file_id, entity, text, raw + + def test_short_id_uses_naive_form(self): + file_id, entity, text, raw = self._inputs() + out = create_annotation_id(file_id, entity, text, raw) + assert out.startswith("instance:file-001:instance:asset-001:TAG-001:") + assert len(out) < pipeline.EXTERNAL_ID_LIMIT + + def test_deterministic_for_same_raw_annotation(self): + file_id, entity, text, raw = self._inputs() + a = create_annotation_id(file_id, entity, text, raw) + b = create_annotation_id(file_id, entity, text, raw) + assert a == b + + def test_distinct_raw_annotations_produce_distinct_ids(self): + file_id, entity, text, _ = self._inputs() + a = create_annotation_id(file_id, entity, text, {"region": {"page": 1}}) + b = create_annotation_id(file_id, entity, text, {"region": {"page": 2}}) + assert a != b + + def test_long_inputs_fall_back_to_short_form(self): + # We want inputs where the naive form (which includes both spaces) is + # >= EXTERNAL_ID_LIMIT but the short form (without spaces) is below. + # Naive layout: "{space}:{fid}:{ent_space}:{ent_ext}:{text}:{hash10}" + # Short layout: "{fid}:{ent_ext}:{text}:{hash10}" + # Choose sizes so naive >= 256 and short < 256. + file_id = dm.NodeId(space="instance", external_id="x" * 120) + entity = {"space": "instance", "external_id": "y" * 70} + text = "z" * 40 + out = create_annotation_id(file_id, entity, text, {"any": "data"}) + assert len(out) < pipeline.EXTERNAL_ID_LIMIT + # short_id form starts with the file's external_id, not the space-prefixed naive form. + assert out.startswith("x" * 120 + ":" + "y" * 70 + ":" + "z" * 40 + ":") + + def test_extremely_long_inputs_fall_back_to_truncated_prefix(self): + file_id = dm.NodeId(space="instance", external_id="x" * 300) + entity = {"space": "instance", "external_id": "y" * 100} + text = "z" * 100 + out = create_annotation_id(file_id, entity, text, {"any": "data"}) + assert len(out) <= pipeline.EXTERNAL_ID_LIMIT + + +# --------------------------------------------------------------------------- # +# read_state_cursor / read_state_batch_num (C8) # +# --------------------------------------------------------------------------- # + +class TestReadStateValues: + @staticmethod + def _client_with_rows(rows): + client = MagicMock() + client.raw.rows.list.return_value = rows + return client + + @staticmethod + def _row(key, value): + return SimpleNamespace(key=key, columns={pipeline.STAT_STORE_VALUE: value}) + + def test_cursor_returns_str_when_present(self, logger): + client = self._client_with_rows([self._row(pipeline.STAT_STORE_CURSOR, "abc-cursor")]) + assert read_state_cursor(client, logger, "db", "tbl") == "abc-cursor" + + def test_cursor_returns_none_when_missing(self, logger): + client = self._client_with_rows([]) + assert read_state_cursor(client, logger, "db", "tbl") is None + + def test_cursor_coerces_non_str_to_str(self, logger): + # RAW columns are JSON-typed: numbers come back as numbers. The + # cursor reader should still return a str so callers like + # sync_query.cursors["files"] = file_cursor get a string. + client = self._client_with_rows([self._row(pipeline.STAT_STORE_CURSOR, 12345)]) + assert read_state_cursor(client, logger, "db", "tbl") == "12345" + + def test_batch_num_returns_int_when_present(self, logger): + client = self._client_with_rows([self._row(pipeline.STAT_STORE_NUM_IN_BATCH, 42)]) + assert read_state_batch_num(client, logger, "db", "tbl") == 42 + + def test_batch_num_returns_zero_when_missing(self, logger): + client = self._client_with_rows([]) + assert read_state_batch_num(client, logger, "db", "tbl") == 0 + + def test_batch_num_coerces_string_int(self, logger): + client = self._client_with_rows([self._row(pipeline.STAT_STORE_NUM_IN_BATCH, "10")]) + assert read_state_batch_num(client, logger, "db", "tbl") == 10 + + def test_batch_num_returns_zero_for_unparseable_string(self, logger): + # The pre-C8 implementation would have returned a non-int and made + # the caller's `range(file_num, ...)` blow up. Now we coerce and warn. + client = self._client_with_rows( + [self._row(pipeline.STAT_STORE_NUM_IN_BATCH, "not-an-int")] + ) + assert read_state_batch_num(client, logger, "db", "tbl") == 0 + + +# --------------------------------------------------------------------------- # +# get_all_entities (C4 + H8) # +# --------------------------------------------------------------------------- # + +class TestGetAllEntities: + def test_file_entities_use_file_search_property_as_key( + self, logger, base_config_dict, file_view_id + ): + """C4: file entities must store their searchable value under + file_view.search_property (here: 'alias'), not the literal key + 'search_property'. Otherwise diagram-detect's search_field never + matches them.""" + config = Config.model_validate(base_config_dict) + all_files = [ + _file_node("instance", "f1", file_view_id, "F1", "F1-alias"), + _file_node("instance", "f2", file_view_id, "F2", "F2-alias"), + ] + + with patch("pipeline.get_all_files", return_value=all_files): + entities = get_all_entities(MagicMock(), logger, config) + + assert len(entities) == 2 + assert all(e["alias"] == f"F{i + 1}-alias" for i, e in enumerate(entities)) + assert all("search_property" not in e for e in entities) + assert all(e["annotation_type_external_id"] == "diagrams.FileLink" for e in entities) + + def test_entity_view_normalises_to_canonical_search_property( + self, logger, base_config_dict, asset_view_id + ): + """C4 second-order: entity views that declare a different + search_property than the file view must still store under the file + view's search_property (which is the diagram-detect search_field).""" + base_config_dict["data"]["annotationJob"]["entityViews"] = [ + { + "schemaSpace": "schema", + "instanceSpace": "instance", + "externalId": "AssetView", + "version": "v1", + "searchProperty": "tag", # differs from file view's "alias" + "type": "diagrams.AssetLink", + }, + ] + config = Config.model_validate(base_config_dict) + + client = MagicMock() + client.data_modeling.instances.list.return_value = [ + SimpleNamespace( + external_id="asset-1", + space="instance", + properties={asset_view_id: {"name": "Asset 1", "tag": "TAG-1"}}, + ), + ] + + with patch("pipeline.get_all_files", return_value=[]): + entities = get_all_entities(client, logger, config) + + assert len(entities) == 1 + assert entities[0]["alias"] == "TAG-1" # stored under canonical key + assert "tag" not in entities[0] + assert entities[0]["annotation_type_external_id"] == "diagrams.AssetLink" + + def test_dedup_drops_duplicate_space_external_id( + self, logger, base_config_dict, file_view_id, asset_view_id + ): + """H8: a node listed by two views (e.g. file_view + an entity_view + both pointing at the same instance) must yield a single entity + after dedup, not two.""" + base_config_dict["data"]["annotationJob"]["entityViews"] = [ + { + "schemaSpace": "schema", + "instanceSpace": "instance", + "externalId": "AssetView", + "version": "v1", + "searchProperty": "alias", + "type": "diagrams.AssetLink", + }, + ] + config = Config.model_validate(base_config_dict) + + all_files = [_file_node("instance", "n1", file_view_id, "Node 1", "N1")] + client = MagicMock() + client.data_modeling.instances.list.return_value = [ + SimpleNamespace( + external_id="n1", # same external_id as the file above + space="instance", + properties={asset_view_id: {"name": "Node 1", "alias": "N1"}}, + ), + ] + + with patch("pipeline.get_all_files", return_value=all_files): + entities = get_all_entities(client, logger, config) + + assert len(entities) == 1 + + +# --------------------------------------------------------------------------- # +# get_new_files (C7) # +# --------------------------------------------------------------------------- # + +class TestGetNewFiles: + @staticmethod + def _sync_result(files=None, cursor_value="next-cursor"): + files_list = list(files or []) + result = MagicMock() + result.cursors = {"files": cursor_value} + result.__getitem__.side_effect = lambda key: {"files": files_list}[key] + return result + + def test_successful_sync_persists_new_cursor(self, logger, config): + client = MagicMock() + client.data_modeling.instances.sync.return_value = self._sync_result() + files_view_id = config.data.annotation_job.file_view.as_view_id() + + with patch("pipeline.update_state_store") as mock_update: + result = get_new_files(client, logger, "old-cursor", files_view_id, config) + + assert result is client.data_modeling.instances.sync.return_value + mock_update.assert_called_once() + # update_state_store(client, logger, new_cursor_value, ...) — positional 2. + assert mock_update.call_args.args[2] == "next-cursor" + + def test_400_retries_with_same_cursor(self, logger, config): + """C7: a transient 400 must NOT reset the cursor to None. The retry + must re-issue the sync with the original cursor value.""" + client = MagicMock() + captured_cursors: list[str | None] = [] + + def fake_sync(query): + captured_cursors.append(query.cursors.get("files")) + if len(captured_cursors) == 1: + raise CogniteAPIError("temporary 400", code=400) + return self._sync_result() + + client.data_modeling.instances.sync.side_effect = fake_sync + files_view_id = config.data.annotation_job.file_view.as_view_id() + + with patch("pipeline.update_state_store"), patch("pipeline.time.sleep"): + get_new_files(client, logger, "old-cursor", files_view_id, config) + + assert captured_cursors == ["old-cursor", "old-cursor"] + + def test_400_after_max_retries_raises_without_persisting_state(self, logger, config): + client = MagicMock() + client.data_modeling.instances.sync.side_effect = CogniteAPIError( + "persistent 400", code=400 + ) + files_view_id = config.data.annotation_job.file_view.as_view_id() + + with patch("pipeline.update_state_store") as mock_update, patch("pipeline.time.sleep"): + with pytest.raises(Exception): + get_new_files(client, logger, "old-cursor", files_view_id, config) + + mock_update.assert_not_called() + + def test_non_400_error_raises_immediately(self, logger, config): + client = MagicMock() + client.data_modeling.instances.sync.side_effect = CogniteAPIError( + "server error", code=500 + ) + files_view_id = config.data.annotation_job.file_view.as_view_id() + + with patch("pipeline.update_state_store") as mock_update, \ + patch("pipeline.time.sleep") as mock_sleep: + with pytest.raises(Exception): + get_new_files(client, logger, "old-cursor", files_view_id, config) + + mock_update.assert_not_called() + mock_sleep.assert_not_called() + assert client.data_modeling.instances.sync.call_count == 1 + + +# --------------------------------------------------------------------------- # +# push_result_to_annotations (C3) # +# --------------------------------------------------------------------------- # + +class TestPushResultToAnnotations: + def test_missing_file_instance_id_does_not_abort_batch( + self, logger, config, file_view_id + ): + """C3: a result item missing 'fileInstanceId' must NOT cause the + function to bail with a tuple. The remaining valid items must still + be processed and the function must return an int. + """ + client = MagicMock() + + # `new_files` is duck-typed: code reads it like a dict and iterates + # `new_files["files"]`. A plain dict + SimpleNamespace nodes is enough. + new_files = { + "files": [_file_node("instance", "f1", file_view_id, "F1", "F1-alias")], + } + + result = { + "items": [ + {"fileId": 7, "annotations": []}, # bad: no fileInstanceId + { + "fileInstanceId": {"space": "instance", "externalId": "f1"}, + "annotations": [], + }, + ], + } + annotation_view_id = config.data.annotation_view.as_view_id() + + # The mock must echo the running error_count back so the +1 from the + # malformed item isn't clobbered when the second (good) item is + # processed. _result_item_to_edge_applies takes error_count as the + # 7th positional arg and returns it as the 2nd tuple element. + def echo_error_count(*args, **kwargs): + return ([], args[6]) + + with patch("pipeline._result_item_to_edge_applies", side_effect=echo_error_count): + count = push_result_to_annotations( + client, + config, + logger, + annotation_view_id, + file_view_id, + result, + new_files, + error_count=0, + doc_doc=[], + doc_tag=[], + ) + + assert isinstance(count, int) + assert count == 1 # exactly one error: the missing-fileInstanceId item + # apply was called once (after the loop), proving we did not abort + # the batch on the first malformed item. + client.data_modeling.instances.apply.assert_called_once() + + def test_clean_old_annotations_batches_into_single_call( + self, logger, base_config_dict, file_view_id + ): + """M6: with clean_old_annotations=True, the per-page cleanup must go + through delete_annotations_for_files in a single batched call, not + per-file.""" + base_config_dict["parameters"]["cleanOldAnnotations"] = True + config = Config.model_validate(base_config_dict) + + client = MagicMock() + new_files = { + "files": [ + _file_node("instance", "f1", file_view_id, "F1", "F1-alias"), + _file_node("instance", "f2", file_view_id, "F2", "F2-alias"), + ], + } + result = { + "items": [ + {"fileInstanceId": {"space": "instance", "externalId": "f1"}, "annotations": []}, + {"fileInstanceId": {"space": "instance", "externalId": "f2"}, "annotations": []}, + ], + } + annotation_view_id = config.data.annotation_view.as_view_id() + + with patch("pipeline._result_item_to_edge_applies", return_value=([], 0)), \ + patch("pipeline.delete_annotations_for_files") as mock_delete: + count = push_result_to_annotations( + client, + config, + logger, + annotation_view_id, + file_view_id, + result, + new_files, + error_count=0, + doc_doc=[], + doc_tag=[], + ) + + assert count == 0 + mock_delete.assert_called_once() + # Third positional arg is the list of NodeIds to clean. + passed_nodes = mock_delete.call_args.args[3] + assert len(passed_nodes) == 2 + assert {n.external_id for n in passed_nodes} == {"f1", "f2"} diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/functions.Function.yaml b/modules/contextualization/cdf_p_and_id_annotation/functions/functions.Function.yaml index b1283a54..380de357 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/functions/functions.Function.yaml +++ b/modules/contextualization/cdf_p_and_id_annotation/functions/functions.Function.yaml @@ -1,7 +1,7 @@ # The directory with the function code should have the same name # and externalId as the function itself as defined below. -- name: 'dm:context:files:{{location_name}}:{{source_name}}:annotation' - externalId: 'fn_dm_context_files_{{location_name}}_{{source_name}}_annotation' +- name: 'dm:context:files:annotation' + externalId: 'fn_dm_context_files_annotation' owner: 'Anonymous' description: 'Contextualization of P&ID files creating annotations in Data model' metadata: diff --git a/modules/contextualization/cdf_p_and_id_annotation/workflows/annotation.WorkflowVersion.yaml b/modules/contextualization/cdf_p_and_id_annotation/workflows/annotation.WorkflowVersion.yaml index 35dd5937..101ea7fb 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/workflows/annotation.WorkflowVersion.yaml +++ b/modules/contextualization/cdf_p_and_id_annotation/workflows/annotation.WorkflowVersion.yaml @@ -25,12 +25,12 @@ workflowDefinition: retries: 3 timeout: 3600 onFailure: 'abortWorkflow' - - externalId: 'fn_dm_context_files_{{location_name}}_{{source_name}}_annotation' + - externalId: 'fn_dm_context_files_annotation' type: 'function' parameters: function: - externalId: 'fn_dm_context_files_{{location_name}}_{{source_name}}_annotation' - data: { "ExtractionPipelineExtId": "ep_ctx_files_{{location_name}}_{{source_name}}_pandid_annotation" } + externalId: 'fn_dm_context_files_annotation' + data: { "ExtractionPipelineExtId": "ep_ctx_files_pandid_annotation" } name: 'Annotation Function' description: 'Contextualization of P&ID files creating annotations in Data model' retries: 3 From 92d80e39015a84824531227c69a1b7df7fc2a534 Mon Sep 17 00:00:00 2001 From: Jan Inge Bergseth <31886431+BergsethCognite@users.noreply.github.com> Date: Fri, 29 May 2026 12:31:54 +0200 Subject: [PATCH 3/4] Potential fix for pull request finding 'CodeQL / Module is imported with 'import' and 'import from'' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- .../test_pipeline.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py index eb0397a1..acb46688 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py +++ b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py @@ -41,15 +41,14 @@ import pipeline # noqa: E402 (after sys.path tweak) from config import Config # noqa: E402 from logger import CogniteFunctionLogger # noqa: E402 -from pipeline import ( # noqa: E402 - _truncate, - create_annotation_id, - get_all_entities, - get_new_files, - push_result_to_annotations, - read_state_batch_num, - read_state_cursor, -) + +_truncate = pipeline._truncate +create_annotation_id = pipeline.create_annotation_id +get_all_entities = pipeline.get_all_entities +get_new_files = pipeline.get_new_files +push_result_to_annotations = pipeline.push_result_to_annotations +read_state_batch_num = pipeline.read_state_batch_num +read_state_cursor = pipeline.read_state_cursor # --------------------------------------------------------------------------- # # Fixtures # From 05672bb5e7dd4de3ab6caa9570c246871839fd7c Mon Sep 17 00:00:00 2001 From: Jan Inge Bergseth <31886431+BergsethCognite@users.noreply.github.com> Date: Fri, 29 May 2026 12:40:19 +0200 Subject: [PATCH 4/4] Update test_pipeline.py --- .../test_pipeline.py | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py index acb46688..febd3ce6 100644 --- a/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py +++ b/modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/test_pipeline.py @@ -38,17 +38,21 @@ # Same path-prepend pattern used by handler.py so flat imports work in-test. sys.path.append(str(Path(__file__).parent)) -import pipeline # noqa: E402 (after sys.path tweak) from config import Config # noqa: E402 from logger import CogniteFunctionLogger # noqa: E402 - -_truncate = pipeline._truncate -create_annotation_id = pipeline.create_annotation_id -get_all_entities = pipeline.get_all_entities -get_new_files = pipeline.get_new_files -push_result_to_annotations = pipeline.push_result_to_annotations -read_state_batch_num = pipeline.read_state_batch_num -read_state_cursor = pipeline.read_state_cursor +from pipeline import ( # noqa: E402 + EXTERNAL_ID_LIMIT, + STAT_STORE_CURSOR, + STAT_STORE_NUM_IN_BATCH, + STAT_STORE_VALUE, + _truncate, + create_annotation_id, + get_all_entities, + get_new_files, + push_result_to_annotations, + read_state_batch_num, + read_state_cursor, +) # --------------------------------------------------------------------------- # # Fixtures # @@ -162,7 +166,7 @@ def test_short_id_uses_naive_form(self): file_id, entity, text, raw = self._inputs() out = create_annotation_id(file_id, entity, text, raw) assert out.startswith("instance:file-001:instance:asset-001:TAG-001:") - assert len(out) < pipeline.EXTERNAL_ID_LIMIT + assert len(out) < EXTERNAL_ID_LIMIT def test_deterministic_for_same_raw_annotation(self): file_id, entity, text, raw = self._inputs() @@ -186,7 +190,7 @@ def test_long_inputs_fall_back_to_short_form(self): entity = {"space": "instance", "external_id": "y" * 70} text = "z" * 40 out = create_annotation_id(file_id, entity, text, {"any": "data"}) - assert len(out) < pipeline.EXTERNAL_ID_LIMIT + assert len(out) < EXTERNAL_ID_LIMIT # short_id form starts with the file's external_id, not the space-prefixed naive form. assert out.startswith("x" * 120 + ":" + "y" * 70 + ":" + "z" * 40 + ":") @@ -195,7 +199,7 @@ def test_extremely_long_inputs_fall_back_to_truncated_prefix(self): entity = {"space": "instance", "external_id": "y" * 100} text = "z" * 100 out = create_annotation_id(file_id, entity, text, {"any": "data"}) - assert len(out) <= pipeline.EXTERNAL_ID_LIMIT + assert len(out) <= EXTERNAL_ID_LIMIT # --------------------------------------------------------------------------- # @@ -211,10 +215,10 @@ def _client_with_rows(rows): @staticmethod def _row(key, value): - return SimpleNamespace(key=key, columns={pipeline.STAT_STORE_VALUE: value}) + return SimpleNamespace(key=key, columns={STAT_STORE_VALUE: value}) def test_cursor_returns_str_when_present(self, logger): - client = self._client_with_rows([self._row(pipeline.STAT_STORE_CURSOR, "abc-cursor")]) + client = self._client_with_rows([self._row(STAT_STORE_CURSOR, "abc-cursor")]) assert read_state_cursor(client, logger, "db", "tbl") == "abc-cursor" def test_cursor_returns_none_when_missing(self, logger): @@ -225,11 +229,11 @@ def test_cursor_coerces_non_str_to_str(self, logger): # RAW columns are JSON-typed: numbers come back as numbers. The # cursor reader should still return a str so callers like # sync_query.cursors["files"] = file_cursor get a string. - client = self._client_with_rows([self._row(pipeline.STAT_STORE_CURSOR, 12345)]) + client = self._client_with_rows([self._row(STAT_STORE_CURSOR, 12345)]) assert read_state_cursor(client, logger, "db", "tbl") == "12345" def test_batch_num_returns_int_when_present(self, logger): - client = self._client_with_rows([self._row(pipeline.STAT_STORE_NUM_IN_BATCH, 42)]) + client = self._client_with_rows([self._row(STAT_STORE_NUM_IN_BATCH, 42)]) assert read_state_batch_num(client, logger, "db", "tbl") == 42 def test_batch_num_returns_zero_when_missing(self, logger): @@ -237,14 +241,14 @@ def test_batch_num_returns_zero_when_missing(self, logger): assert read_state_batch_num(client, logger, "db", "tbl") == 0 def test_batch_num_coerces_string_int(self, logger): - client = self._client_with_rows([self._row(pipeline.STAT_STORE_NUM_IN_BATCH, "10")]) + client = self._client_with_rows([self._row(STAT_STORE_NUM_IN_BATCH, "10")]) assert read_state_batch_num(client, logger, "db", "tbl") == 10 def test_batch_num_returns_zero_for_unparseable_string(self, logger): # The pre-C8 implementation would have returned a non-int and made # the caller's `range(file_num, ...)` blow up. Now we coerce and warn. client = self._client_with_rows( - [self._row(pipeline.STAT_STORE_NUM_IN_BATCH, "not-an-int")] + [self._row(STAT_STORE_NUM_IN_BATCH, "not-an-int")] ) assert read_state_batch_num(client, logger, "db", "tbl") == 0