Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 129 additions & 24 deletions modules/contextualization/cdf_p_and_id_annotation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ Key features are:
- Use sync api against data modeling for only processing new/updated files
- Store state/cursor in RAW db/table (as provided in configuration)
- Run ALL mode
- Clean out status an logged status from previous runs in RAW, and process
all P&ID files.
- NOTE: to also clean / delete previous annotations also add:
cleanOldAnnotations = True in configuration
- Clean out cursor/state and per-run progress from previous runs in RAW,
then process every matching P&ID file from scratch.
- NOTE: to also clean / delete previous annotations, set
`cleanOldAnnotations: true` in configuration.
- Annotation process
- Optional: Use configuration for filter property to find Files and/or
Assets
Expand Down Expand Up @@ -103,9 +103,9 @@ This module manages the following resources:
pipelines, transformations, and raw tables

3. extraction pipeline:
- ID: `ep_ctx_files_{{location_name}}_{{source_name}}_pandid_annotation`
- ID: `ep_ctx_files_pandid_annotation`
- Content: Documentation and configuration for a CDF function running
P&ID contextualization/annotation (see function for more description)
P&ID contextualization/annotation (see function for more description).

4. transformations:
- ID: `asset_tagging_tr`
Expand All @@ -118,27 +118,30 @@ This module manages the following resources:
adding tags that can be used in the annotation configuration's filterProperty.

5. function:
- ID: `fn_dm_context_files_{{location_name}}_{{source_name}}_annotation`
- Content: Reads new/updated files using the SYNC api. Extracts all
tags in P&ID that match tags from Asset & Files to create CDF
- ID: `fn_dm_context_files_annotation`
- Content: Reads new/updated files using the SYNC API. Extracts all
tags in P&ID that match tags from Assets & Files to create CDF
annotations used for linking found objects in the document to
other resource types in CDF

6. raw: in database `{{files_dataset}}` (typically `ds_files_{{location_name}}_{{source_name}}`)
- ID: `documents_docs`
- Content: Table storing document-to-document relationships found in P&ID files
- ID: `documents_tags`
- Content: Table storing document-to-tag relationships found in P&ID files
- ID: `files_state_store`
- Content: Table storing state/cursor information for incremental processing
other resource types in CDF.

6. raw: stored in the database configured by the `rawDb` parameter in the
extraction pipeline config (by convention this matches the dataset external
id, e.g. `ds_files_{{location_name}}_{{source_name}}`).
- Table: `documents_docs`
- Content: Document-to-document relationships found in P&ID files.
- Table: `documents_tags`
- Content: Document-to-tag relationships found in P&ID files.
- Table: `files_state_store`
- Content: Cursor and per-batch progress for incremental processing
(read on resume, written after each successful batch).

7. workflow
- ID: `{{workflow}}` (default: `entity_matching`)
- Content: Orchestrates the P&ID annotation process:
1. Runs `asset_tagging_tr` transformation to tag assets
2. Runs `file_tagging_tr` transformation to tag files
3. Runs `fn_dm_context_files_{{location_name}}_{{source_name}}_annotation`
function to perform annotation (depends on both tagging transformations)
3. Runs `fn_dm_context_files_annotation` function to perform annotation
(depends on both tagging transformations)

## Variables

Expand Down Expand Up @@ -359,11 +362,43 @@ config:

#### Running functions locally

You may run locally:
You can run the function on your machine without deploying to CDF Functions —
useful for debugging configuration changes against a real CDF project, and for
the initial bulk pass before switching to incremental mode.

1. From the function directory:

```bash
cd modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation
```

2. Create a local `.env` file with the credentials needed for your CDF project
(see `handler.py::run_locally` for the full list):

```env
CDF_PROJECT=your-project
CDF_CLUSTER=greenfield
IDP_CLIENT_ID=...
IDP_CLIENT_SECRET=...
IDP_TOKEN_URL=...
```

3. Install the function's runtime dependencies into your local environment:

```bash
pip install -r requirements.txt
```

To run `fn_dm_context_files_LOC_SOURCE_annotation`,
simply call the `handler.py` with a local `.env`file that contain env variable
for logging on to your CDF project
4. Run the handler:

```bash
python handler.py
```

This calls `run_locally()`, which authenticates to CDF and invokes the same
`handle()` entry point as the deployed function. The extraction-pipeline
external id used by `run_locally()` is set near the bottom of `handler.py`
— adjust it to match your environment's actual extraction pipeline.

#### Cognite Function runtime

Expand All @@ -376,3 +411,73 @@ The current implementation processes files sequentially, which works well for in

**Future Scalability:**
The codebase is designed to be easily extended with parallel processing and async modules, allowing you to scale from processing dozens to thousands of P&ID files efficiently. This makes it a future-proof solution that grows with your needs.

## Testing

The function ships with a unit-test suite that pins down the bug-fix surface
(per-batch counting, cursor preservation on transient API errors, the
diagram-detect search-property key, RAW state coercion, entity de-duplication,
annotation-id boundary cases, and config validation). Tests do not require a
CDF connection — the `CogniteClient` is fully mocked.

### Prerequisites

- Python 3.11+ (matches the Cognite Functions runtime)
- The minimum runtime libraries needed to import `pipeline.py` and `config.py`:

```bash
pip install pytest cognite-sdk pyyaml pydantic
```

`cognite-extractor-utils` is **not** required to run the suite — it is
lazy-imported inside the runtime function and the test suite never reaches
the construction site.

### Run

From the repo root:

```bash
pytest -q modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/
```

Expected output (47 tests):

```
............................................... [100%]
47 passed in ~1s
```

### What's covered

- **`test_pipeline.py`** (26 tests)
- `_truncate` — short / at-limit / overflow / empty / pathological max-len.
- `create_annotation_id` — naive form, determinism, distinct raw → distinct
id, and the two length-boundary fallbacks (short form vs. truncated prefix).
- `read_state_cursor` / `read_state_batch_num` — typed coercion of the RAW
state values (e.g. numeric cursor coerced to `str`, string `"10"` coerced
to `int`, unparseable values reset to `0` with a warning).
- `get_all_entities` — file entities use the canonical `search_property` key,
cross-view entities are normalised to that same key, and duplicate
`(space, external_id)` entries are dropped.
- `get_new_files` — successful sync persists the new cursor; a transient 400
retries with the **same** cursor (verified by capturing each call's
cursor); persistent 400 raises after `max_retries` without overwriting
state; non-400 errors propagate immediately with no sleep.
- `push_result_to_annotations` — a result item missing `fileInstanceId` no
longer aborts the batch (function returns `int`, not a tuple, and the
surviving items are still applied); cleanup goes through a single batched
`delete_annotations_for_files` call when `cleanOldAnnotations` is enabled.

- **`test_config.py`** (21 tests)
- `Optional` field defaults (`debugFile`, `filterProperty`, `filterValues`).
- Required-field enforcement (parameterized over the mandatory fields).
- Threshold range validation (parameterized over good/bad values).
- `Literal[...]` validation for the `type` field.

### Adding new tests

Tests live alongside the function code following the repo convention. Create or
extend a `test_*.py` file in
`modules/contextualization/cdf_p_and_id_annotation/functions/fn_dm_context_files_annotation/`
and pytest will pick it up automatically.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
externalId: 'ep_ctx_files_{{location_name}}_{{source_name}}_pandid_annotation'
name: 'ctx:files:{{location_name}}:{{source_name}}:pandid_annotation'
externalId: 'ep_ctx_files_pandid_annotation'
name: 'ctx:files:pandid_annotation'
dataSetExternalId: '{{files_dataset}}'
description: 'Annotation of P&ID documents from file source {{location_name}}:{{source_name}}'
description: 'Annotation of P&ID documents from file source {{location_name}}:{{source_name}}'
rawTables:
- dbName: 'ds_files_{{location_name}}_{{source_name}}'
tableName: 'documents_tags'
Expand Down Expand Up @@ -55,7 +55,7 @@ documentation: >
Provide a data block with the parameters for the process to run.

```
data = {"logLevel":"INFO", "ExtractionPipelineExtId": "ep_ctx_files_LOC_SOURCE_pandid_annotation"}
data = {"logLevel":"INFO", "ExtractionPipelineExtId": "ep_ctx_files_pandid_annotation"}

```

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
externalId: 'ep_ctx_files_{{location_name}}_{{source_name}}_pandid_annotation'
externalId: 'ep_ctx_files_pandid_annotation'
config:
parameters:
debug: False
Expand All @@ -22,16 +22,16 @@ config:
instanceSpace: {{ fileInstanceSpace }}
externalId: {{organization}}File
version: {{ viewVersion }}
searchProperty: aliases
searchProperty: name
type: diagrams.FileLink
filterProperty: tags
filterValues: ["PID", "ISO", "PLOT PLANS"]
filterValues: []
entityViews:
- schemaSpace: {{ schemaSpace }}
instanceSpace: {{ assetInstanceSpace }}
externalId: {{organization}}Asset
version: {{ viewVersion }}
searchProperty: aliases
searchProperty: name
type: diagrams.AssetLink
filterProperty: tags
filterValues: ["PID"]
filterValues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# Configuration classes
class Parameters(BaseModel, alias_generator=to_camel):
debug: bool
debug_file: str = None
debug_file: str | None = None
run_all: bool
clean_old_annotations: bool
raw_db: str
Expand All @@ -31,8 +31,8 @@ class ViewPropertyConfig(BaseModel, alias_generator=to_camel):
version: str
search_property: str = "alias"
type: Literal["diagrams.FileLink", "diagrams.AssetLink"]
filter_property: str = None
filter_values: list[str] = None
filter_property: str | None = None
filter_values: list[str] | None = None

def as_view_id(self) -> dm.ViewId:
return dm.ViewId(space=self.schema_space, external_id=self.external_id, version=self.version)
Expand Down Expand Up @@ -60,12 +60,6 @@ class Config(BaseModel, alias_generator=to_camel):
parameters: Parameters
data: ConfigData

@classmethod
def pares_direct_relation(cls, value: Any) -> Any:
if isinstance(value, dict):
return dm.DirectRelationReference.load(value)
return value


def load_config_parameters(client: CogniteClient, function_data: dict[str, Any]) -> Config:
"""Retrieves the configuration parameters from the function data and loads the configuration from CDF."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _send() -> None:
"cdf_cluster": client.config.cdf_cluster,
"cdf_project": client.config.project,
})
threading.Thread(target=_send, daemon=False).start()
threading.Thread(target=_send, daemon=True).start()
except Exception:
# Usage tracking is best-effort; must not affect the handler.
pass
Expand Down Expand Up @@ -85,7 +85,7 @@ def run_locally():
),
)
)
data = {"logLevel":"INFO", "ExtractionPipelineExtId": "ep_ctx_files_LOC_SOURCE_pandid_annotation"}
data = {"logLevel": "INFO", "ExtractionPipelineExtId": "ep_ctx_files_pandid_annotation"}
handle(data, client)


Expand Down
Loading
Loading