From 2fe8a3e478d88ca734e7cf9eb4100fdc0b8736be Mon Sep 17 00:00:00 2001 From: Tuomas Siipola Date: Wed, 11 Jun 2025 15:04:14 +0300 Subject: [PATCH 1/8] Draft for new DVAS metadata schema --- scripts/dvas-json.py | 17 +++ src/processing/dvas.py | 211 ++++++++++++++++----------------- src/processing/metadata_api.py | 11 +- 3 files changed, 129 insertions(+), 110 deletions(-) create mode 100755 scripts/dvas-json.py diff --git a/scripts/dvas-json.py b/scripts/dvas-json.py new file mode 100755 index 00000000..d577ed7e --- /dev/null +++ b/scripts/dvas-json.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +import datetime +import json +import sys + +from processing.config import Config +from processing.dvas import DvasMetadata +from processing.metadata_api import MetadataApi + +file_uuid = sys.argv[1] +config = Config() +md_api = MetadataApi(config) +file = md_api.get(f"api/files/{file_uuid}") +dvas_metadata = DvasMetadata(file, md_api) +dvas_timestamp = datetime.datetime.now(datetime.timezone.utc) +dvas_json = dvas_metadata.create_dvas_json(dvas_timestamp) +json.dump(dvas_json, sys.stdout, indent=2) diff --git a/src/processing/dvas.py b/src/processing/dvas.py index 89095acb..a2131f8a 100644 --- a/src/processing/dvas.py +++ b/src/processing/dvas.py @@ -1,6 +1,6 @@ import base64 +import datetime import logging -from datetime import datetime, timezone from typing import Literal import requests @@ -40,15 +40,14 @@ def upload(self, file: dict): return try: dvas_metadata = DvasMetadata(file, self.md_api) - dvas_json = dvas_metadata.create_dvas_json() - if len(dvas_json["md_content_information"]["attribute_descriptions"]) == 0: + dvas_timestamp = datetime.datetime.now(datetime.timezone.utc) + dvas_json = dvas_metadata.create_dvas_json(dvas_timestamp) + if not dvas_json["variables"]: logging.error("Skipping - no ACTRIS variables") return self._delete_old_versions(file) dvas_id = self._post(dvas_json) - self.md_api.update_dvas_info( - file["uuid"], dvas_json["md_metadata"]["datestamp"], dvas_id - ) + self.md_api.update_dvas_info(file["uuid"], dvas_timestamp, dvas_id) except DvasError: logging.exception(f"Failed to upload {file['filename']} to DVAS") @@ -121,7 +120,7 @@ def __init__(self, file: dict, md_api: MetadataApi): self._product = file["product"] self._site = file["site"] - def create_dvas_json(self) -> dict: + def create_dvas_json(self, timestamp: datetime.datetime) -> dict: time_begin = ( self.file["startTime"] or f"{self.file['measurementDate']}T00:00:00.0000000Z" @@ -129,112 +128,118 @@ def create_dvas_json(self) -> dict: time_end = ( self.file["stopTime"] or f"{self.file['measurementDate']}T23:59:59.9999999Z" ) + timeliness = self._parse_timeliness() + instruments = list(self._find_instruments(self.file["uuid"]).values()) return { - "md_metadata": { - "file_identifier": self.file["filename"], - "language": "en", - "hierarchy_level": "dataset", - "online_resource": {"linkage": "https://cloudnet.fmi.fi/"}, - "datestamp": datetime.now(timezone.utc).isoformat(), - "contact": [ - { - "first_name": "Ewan", - "last_name": "O'Connor", - "organisation_name": "Finnish Meteorological Institute (FMI)", - "role_code": ["pointOfContact"], - "country_code": "FI", - } - ], + "dataset_metadata": { + "repository": {"repository_id": "CLU"}, + "time_file_created": self.file["createdAt"], + "time_metadata_created": timestamp.isoformat(), + "time_content_revised": self.file["updatedAt"], }, - "md_identification": { - "abstract": self._parse_title(), + "identification": { + "identifier": {"pid": self.file["pid"], "pid_type": "ePIC"}, "title": self._parse_title(), - "date_type": "creation", - "contact": [ + "abstract": self._parse_title(), + "roles": [ + { + "role_code": ["pointOfContact"], + "person": { + "first_name": "Ewan", + "last_name": "O'Connor", + "affiliation": { + "name": "Finnish Meteorological Institute", + "pid": "https://ror.org/05hppb561", + "pid_type": "other PID", + "country_code": "FI", + }, + "orcid": "https://orcid.org/0000-0001-9834-5100", + }, + }, { - "first_name": "Simo", - "last_name": "Tukiainen", - "organisation_name": "Finnish Meteorological Institute (FMI)", "role_code": ["processor"], - "country_code": "FI", - } + "person": { + "first_name": "Simo", + "last_name": "Tukiainen", + "affiliation": { + "name": "Finnish Meteorological Institute", + "pid": "https://ror.org/05hppb561", + "pid_type": "other PID", + "country_code": "FI", + }, + "orcid": "https://orcid.org/0000-0002-0651-4622", + }, + }, ], - "online_resource": { - "linkage": f"https://cloudnet.fmi.fi/file/{self.file['uuid']}" - }, - "identifier": { - "pid": self.file["pid"], - "type": "handle", - }, - "date": time_begin, }, - "md_constraints": { - "access_constraints": "license", - "use_constraints": "license", - "other_constraints": "N/A", - "data_licence": "CC-BY-4.0", - "metadata_licence": "CC-BY-4.0", + "usage_information": { + "data_license": "CC-BY-4.0", + "metadata_license": "CC0-1.0", "citation": self._fetch_credits("citation"), "acknowledgement": self._fetch_credits("acknowledgements"), }, - "md_keywords": { - "keywords": [ - "FMI", - "ACTRIS", - self._product["humanReadableName"], - ] - }, - "md_data_identification": { - "language": "en", - "topic_category": "climatologyMeteorologyAtmosphere", - "description": "time series of profile measurements", - "facility_identifier": self._site["dvasId"], + "product_type": "observation", + "facility": { + "identifier": self._site["dvasId"], }, - "ex_geographic_bounding_box": { + "spatial_extent": { "west_bound_longitude": self._site["longitude"], "east_bound_longitude": self._site["longitude"], "south_bound_latitude": self._site["latitude"], "north_bound_latitude": self._site["latitude"], + "lower_altitude": self._site["altitude"], + "upper_altitude": self._site["altitude"], }, - "ex_temporal_extent": { + "temporal_extent": { "time_period_begin": time_begin, "time_period_end": time_end, }, - "md_content_information": { - "attribute_descriptions": self._parse_variable_names(), - "content_type": "physicalMeasurement", - }, - "md_distribution_information": [ + "variables": [ + { + "variable_name": variable_name, + "timeliness": timeliness, + "instruments": instruments, + "data_quality_control": [ + { + "validtime_start": time_begin, + "validtime_end": time_end, + "compliance": self._parse_compliance(), + "quality_control_extent": "full quality control applied", + "quality_control_mechanism": "automatic quality control", + "quality_control_outcome": self._parse_qc_outcome(), + } + ], + "frameworks": [ + { + "validtime_start": time_begin, + "validtime_end": time_end, + "framework": framework, + } + for framework in self._parse_frameworks() + ], + "temporal_resolution": "P30S", + } + for variable_name in self._parse_variable_names() + ], + "distribution_information": [ { - "data_format": "netcdf", - "version_data_format": self._parse_netcdf_version(), + "data_format": self._parse_netcdf_version(), "dataset_url": self.file["downloadUrl"], "protocol": "HTTP", - "transfersize": self._calc_file_size(), - "description": "Direct download of data file", - "function": "download", - "restriction": { - "set": False, + "access_restriction": { + "restricted": False, }, + "transfersize": {"size": int(self.file["size"]), "unit": "B"}, } ], - "md_actris_specific": { - "facility_type": "observation platform, fixed", - "product_type": "observation", - "matrix": "cloud phase", - "sub_matrix": None, - "instrument_type": list(self._find_instrument_types(self.file["uuid"])), - "program_affiliation": self._parse_affiliation(), - "variable_statistical_property": None, - "legacy_data": self.file["legacy"], - "observation_timeliness": self._parse_timeliness(), - "data_product": self._parse_data_product(), - }, - "dq_data_quality_information": { - "level": "dataset", - "compliance": self._parse_compliance(), - "quality_control_extent": "full quality control applied", - "quality_control_outcome": self._parse_qc_outcome(), + "provenance": { + "software": [ + { + "title": software["title"], + "url": software["url"], + } + for software in self.file["software"] + ] }, } @@ -243,8 +248,7 @@ def _parse_variable_names(self) -> list[str]: file_vars = self.md_api.get(f"api/products/{self._product['id']}/variables") return [v["actrisName"] for v in file_vars if v["actrisName"] is not None] - def _parse_affiliation(self) -> list[str]: - # https://prod-actris-md.nilu.no/vocabulary/networkprogram + def _parse_frameworks(self) -> list[str]: affiliation = ["CLOUDNET"] if "arm" in self._site["type"]: affiliation.append("ARM") @@ -252,22 +256,21 @@ def _parse_affiliation(self) -> list[str]: affiliation.append("ACTRIS") return affiliation - def _find_instrument_types(self, uuid: str) -> set[str]: - """Recursively find instrument types from source files. - - Links: - https://vocabulary.actris.nilu.no/actris_vocab/instrumenttype - https://prod-actris-md.nilu.no/vocabulary/instrumenttype - """ - instruments = set() + def _find_instruments(self, uuid: str) -> dict[str, dict]: + """Recursively find instruments from source files.""" + instruments = {} json_data = utils.get_from_data_portal_api(f"api/files/{uuid}") assert isinstance(json_data, dict) if "instrument" in json_data and json_data["instrument"] is not None: - instruments.add(json_data["instrument"]["type"]) + instruments[json_data["instrument"]["pid"]] = { + "instrument_pid": json_data["instrument"]["pid"], + "instrument_type": json_data["instrument"]["type"], + "instrument_name": json_data["instrument"]["name"], + } source_ids = json_data.get("sourceFileIds", []) if source_ids: for source_uuid in source_ids: - instruments.update(self._find_instrument_types(source_uuid)) + instruments.update(self._find_instruments(source_uuid)) return instruments def _parse_timeliness(self) -> str: @@ -279,10 +282,6 @@ def _parse_timeliness(self) -> str: } return clu_to_dvas_map[self.file["timeliness"]] - def _parse_data_product(self) -> str: - """Description of the data product""" - return f"{self._parse_timeliness()} data" - def _parse_compliance(self) -> str: return ( "ACTRIS legacy" @@ -310,10 +309,6 @@ def _parse_title(self) -> str: f"at {self._site['humanReadableName']}" ) - def _calc_file_size(self) -> float: - file_size = int(self.file["size"]) / 1000 / 1000 # MB - return round(file_size, 3) - def _fetch_credits(self, type: Literal["citation", "acknowledgements"]) -> str: params = {"format": "txt"} response = self.md_api.get( diff --git a/src/processing/metadata_api.py b/src/processing/metadata_api.py index b8e269d3..8a328535 100644 --- a/src/processing/metadata_api.py +++ b/src/processing/metadata_api.py @@ -1,5 +1,6 @@ """Metadata API for Cloudnet files.""" +import datetime import logging import uuid @@ -89,8 +90,14 @@ def upload_instrument_file( return self.put_file("upload/data", checksum, base.daily_file.name, self._auth) - def update_dvas_info(self, uuid: uuid.UUID, timestamp: str, dvas_id: str): - payload = {"uuid": uuid, "dvasUpdatedAt": timestamp, "dvasId": dvas_id} + def update_dvas_info( + self, uuid: uuid.UUID, timestamp: datetime.datetime, dvas_id: str + ): + payload = { + "uuid": uuid, + "dvasUpdatedAt": timestamp.isoformat(), + "dvasId": dvas_id, + } self.post("files", payload) def clean_dvas_info(self, uuid: uuid.UUID): From 098d64d85a588fe39e09f685df62b3dc1d427458 Mon Sep 17 00:00:00 2001 From: Tuomas Siipola Date: Mon, 21 Jul 2025 16:06:00 +0300 Subject: [PATCH 2/8] Adjust based on feedback --- src/processing/dvas.py | 53 +++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/src/processing/dvas.py b/src/processing/dvas.py index a2131f8a..dd90585e 100644 --- a/src/processing/dvas.py +++ b/src/processing/dvas.py @@ -173,8 +173,8 @@ def create_dvas_json(self, timestamp: datetime.datetime) -> dict: ], }, "usage_information": { - "data_license": "CC-BY-4.0", - "metadata_license": "CC0-1.0", + "data_licence": "CC-BY-4.0", + "metadata_licence": "CC0-1.0", "citation": self._fetch_credits("citation"), "acknowledgement": self._fetch_credits("acknowledgements"), }, @@ -183,12 +183,19 @@ def create_dvas_json(self, timestamp: datetime.datetime) -> dict: "identifier": self._site["dvasId"], }, "spatial_extent": { - "west_bound_longitude": self._site["longitude"], - "east_bound_longitude": self._site["longitude"], - "south_bound_latitude": self._site["latitude"], - "north_bound_latitude": self._site["latitude"], - "lower_altitude": self._site["altitude"], - "upper_altitude": self._site["altitude"], + "type": "LineString", + "coordinates": [ + [ + self._site["longitude"], + self._site["latitude"], + self._site["altitude"], + ], + [ + self._site["longitude"], + self._site["latitude"], + self._site["altitude"] + 12_000, + ], + ], }, "temporal_extent": { "time_period_begin": time_begin, @@ -197,24 +204,20 @@ def create_dvas_json(self, timestamp: datetime.datetime) -> dict: "variables": [ { "variable_name": variable_name, + "variable_matrix": "cloud phase", + "variable_geometry": "atmospheric vertical profile", "timeliness": timeliness, - "instruments": instruments, + "instrument": instruments, "data_quality_control": [ { - "validtime_start": time_begin, - "validtime_end": time_end, "compliance": self._parse_compliance(), "quality_control_extent": "full quality control applied", "quality_control_mechanism": "automatic quality control", "quality_control_outcome": self._parse_qc_outcome(), } ], - "frameworks": [ - { - "validtime_start": time_begin, - "validtime_end": time_end, - "framework": framework, - } + "framework": [ + {"framework": framework} for framework in self._parse_frameworks() ], "temporal_resolution": "P30S", @@ -232,15 +235,13 @@ def create_dvas_json(self, timestamp: datetime.datetime) -> dict: "transfersize": {"size": int(self.file["size"]), "unit": "B"}, } ], - "provenance": { - "software": [ - { - "title": software["title"], - "url": software["url"], - } - for software in self.file["software"] - ] - }, + "provenance": [ + { + "title": software["title"], + "url": software["url"], + } + for software in self.file["software"] + ], } def _parse_variable_names(self) -> list[str]: From 3f66b64d1befb32d014f5e30562dcfb0c66aa67e Mon Sep 17 00:00:00 2001 From: Tuomas Siipola Date: Tue, 26 Aug 2025 15:15:42 +0300 Subject: [PATCH 3/8] ready to merge --- scripts/dvas-json.py | 17 ------------ src/processing/dvas.py | 61 +++++++++++++++++++++--------------------- 2 files changed, 30 insertions(+), 48 deletions(-) delete mode 100755 scripts/dvas-json.py diff --git a/scripts/dvas-json.py b/scripts/dvas-json.py deleted file mode 100755 index d577ed7e..00000000 --- a/scripts/dvas-json.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 -import datetime -import json -import sys - -from processing.config import Config -from processing.dvas import DvasMetadata -from processing.metadata_api import MetadataApi - -file_uuid = sys.argv[1] -config = Config() -md_api = MetadataApi(config) -file = md_api.get(f"api/files/{file_uuid}") -dvas_metadata = DvasMetadata(file, md_api) -dvas_timestamp = datetime.datetime.now(datetime.timezone.utc) -dvas_json = dvas_metadata.create_dvas_json(dvas_timestamp) -json.dump(dvas_json, sys.stdout, indent=2) diff --git a/src/processing/dvas.py b/src/processing/dvas.py index dd90585e..08a2fd2f 100644 --- a/src/processing/dvas.py +++ b/src/processing/dvas.py @@ -117,28 +117,30 @@ class DvasMetadata: def __init__(self, file: dict, md_api: MetadataApi): self.file = file self.md_api = md_api - self._product = file["product"] - self._site = file["site"] def create_dvas_json(self, timestamp: datetime.datetime) -> dict: - time_begin = ( - self.file["startTime"] - or f"{self.file['measurementDate']}T00:00:00.0000000Z" + time_begin = self.file.start_time or datetime.datetime.combine( + self.file.measurement_date, datetime.time(0, 0, 0), datetime.timezone.utc ) - time_end = ( - self.file["stopTime"] or f"{self.file['measurementDate']}T23:59:59.9999999Z" + time_end = self.file.stop_time or datetime.datetime.combine( + self.file.measurement_date, + datetime.time(23, 59, 59, 999999), + datetime.timezone.utc, ) timeliness = self._parse_timeliness() - instruments = list(self._find_instruments(self.file["uuid"]).values()) + instruments = self.client.source_instruments(self.file.uuid) + compliance = self._parse_compliance() + qc_outcome = self._parse_qc_outcome() + frameworks = self._parse_frameworks() return { "dataset_metadata": { "repository": {"repository_id": "CLU"}, - "time_file_created": self.file["createdAt"], + "time_file_created": self.file.created_at.isoformat(), "time_metadata_created": timestamp.isoformat(), - "time_content_revised": self.file["updatedAt"], + "time_content_revised": self.file.updated_at.isoformat(), }, "identification": { - "identifier": {"pid": self.file["pid"], "pid_type": "ePIC"}, + "identifier": {"pid": self.file.pid, "pid_type": "ePIC"}, "title": self._parse_title(), "abstract": self._parse_title(), "roles": [ @@ -180,26 +182,26 @@ def create_dvas_json(self, timestamp: datetime.datetime) -> dict: }, "product_type": "observation", "facility": { - "identifier": self._site["dvasId"], + "identifier": self.file.site.dvas_id, }, "spatial_extent": { "type": "LineString", "coordinates": [ [ - self._site["longitude"], - self._site["latitude"], - self._site["altitude"], + self.file.site.longitude, + self.file.site.latitude, + self.file.site.altitude, ], [ - self._site["longitude"], - self._site["latitude"], - self._site["altitude"] + 12_000, + self.file.sitelongitude, + self.file.sitelatitude, + self.file.sitealtitude + 12_000, ], ], }, "temporal_extent": { - "time_period_begin": time_begin, - "time_period_end": time_end, + "time_period_begin": time_begin.isoformat(), + "time_period_end": time_end.isoformat(), }, "variables": [ { @@ -210,16 +212,13 @@ def create_dvas_json(self, timestamp: datetime.datetime) -> dict: "instrument": instruments, "data_quality_control": [ { - "compliance": self._parse_compliance(), + "compliance": compliance, "quality_control_extent": "full quality control applied", "quality_control_mechanism": "automatic quality control", - "quality_control_outcome": self._parse_qc_outcome(), + "quality_control_outcome": qc_outcome, } ], - "framework": [ - {"framework": framework} - for framework in self._parse_frameworks() - ], + "framework": [{"framework": framework} for framework in frameworks], "temporal_resolution": "P30S", } for variable_name in self._parse_variable_names() @@ -227,20 +226,20 @@ def create_dvas_json(self, timestamp: datetime.datetime) -> dict: "distribution_information": [ { "data_format": self._parse_netcdf_version(), - "dataset_url": self.file["downloadUrl"], + "dataset_url": self.file.download_url, "protocol": "HTTP", "access_restriction": { "restricted": False, }, - "transfersize": {"size": int(self.file["size"]), "unit": "B"}, + "transfersize": {"size": self.file.size, "unit": "B"}, } ], "provenance": [ { - "title": software["title"], - "url": software["url"], + "title": software.title, + "url": software.url, } - for software in self.file["software"] + for software in self.file.software ], } From f95ecdf23f43e8b81888cb696b3f1b47ab625ea0 Mon Sep 17 00:00:00 2001 From: Tuomas Siipola Date: Tue, 26 Aug 2025 15:25:56 +0300 Subject: [PATCH 4/8] Fix instruments --- src/processing/dvas.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/processing/dvas.py b/src/processing/dvas.py index e3d94bd2..28dcba4e 100644 --- a/src/processing/dvas.py +++ b/src/processing/dvas.py @@ -212,7 +212,14 @@ def create_dvas_json(self, timestamp: datetime.datetime) -> dict: "variable_matrix": "cloud phase", "variable_geometry": "atmospheric vertical profile", "timeliness": timeliness, - "instrument": instruments, + "instrument": [ + { + "instrument_pid": instrument.pid, + "instrument_type": instrument.type, + "instrument_name": instrument.name, + } + for instrument in instruments + ], "data_quality_control": [ { "compliance": compliance, From f2478b4b154d2350535e14e2f2601ade10a77955 Mon Sep 17 00:00:00 2001 From: Tuomas Siipola Date: Tue, 26 Aug 2025 17:05:23 +0300 Subject: [PATCH 5/8] Add software back --- src/processing/dvas.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/processing/dvas.py b/src/processing/dvas.py index 28dcba4e..a78a3cef 100644 --- a/src/processing/dvas.py +++ b/src/processing/dvas.py @@ -6,7 +6,7 @@ import requests from cloudnet_api_client import APIClient -from cloudnet_api_client.containers import ProductMetadata, VersionMetadata +from cloudnet_api_client.containers import ExtendedProductMetadata, VersionMetadata from processing import utils from processing.config import Config @@ -26,7 +26,7 @@ def __init__(self, config: Config, md_api: MetadataApi, client: APIClient): self.session = self._init_session() self.client = client - def upload(self, file: ProductMetadata): + def upload(self, file: ExtendedProductMetadata): """Upload Cloudnet file metadata to DVAS API and update Cloudnet data portal""" landing_page_url = utils.build_file_landing_page_url(file.uuid) logging.info(f"Uploading {landing_page_url} metadata to DVAS") @@ -79,7 +79,7 @@ def _delete(self, url: str): raise DvasError(res) logging.debug(f"DELETE successful: {res.status_code} {res.text}") - def _delete_old_versions(self, file: ProductMetadata): + def _delete_old_versions(self, file: ExtendedProductMetadata): """Delete all versions of the given file from DVAS API. To be used before posting new version.""" versions = self.client.versions(file.uuid) for version in versions: @@ -116,7 +116,9 @@ def _init_session(self) -> requests.Session: class DvasMetadata: """Create metadata for DVAS API from Cloudnet file metadata""" - def __init__(self, file: ProductMetadata, md_api: MetadataApi, client: APIClient): + def __init__( + self, file: ExtendedProductMetadata, md_api: MetadataApi, client: APIClient + ): self.file = file self.md_api = md_api self.client = client @@ -244,14 +246,13 @@ def create_dvas_json(self, timestamp: datetime.datetime) -> dict: "transfersize": {"size": self.file.size, "unit": "B"}, } ], - # TODO: - # "provenance": [ - # { - # "title": software.title, - # "url": software.url, - # } - # for software in self.file.software - # ], + "provenance": [ + { + "title": software.title, + "url": software.url, + } + for software in self.file.software + ], } def _parse_variable_names(self) -> list[str]: From bf7d296653d69fe4f14b57d1b38949dd8cae48ae Mon Sep 17 00:00:00 2001 From: Tuomas Siipola Date: Wed, 27 Aug 2025 09:22:54 +0300 Subject: [PATCH 6/8] Add type annotations everywhere --- .pre-commit-config.yaml | 8 +- pyproject.toml | 8 +- scripts/cloudnet.py | 9 +- scripts/cronjobs/freeze.py | 9 +- scripts/cronjobs/yesterdays-qc.py | 7 +- scripts/dvas-json.py | 1 + scripts/submit-data-to-dev.py | 2 +- scripts/worker.py | 8 +- src/housekeeping/hatpro.py | 10 +- src/housekeeping/housekeeping.py | 12 +-- src/processing/fetch.py | 16 +-- src/processing/harmonizer/core.py | 52 ++++----- src/processing/harmonizer/doppler_lidar.py | 24 ++--- src/processing/harmonizer/halo_calibrated.py | 12 +-- src/processing/harmonizer/hatpro.py | 14 +-- src/processing/harmonizer/parsivel.py | 26 ++--- src/processing/harmonizer/rain_gauge.py | 14 +-- src/processing/instrument.py | 4 +- src/processing/instrument_process.py | 108 +++++++++---------- src/processing/metadata_api.py | 11 +- src/processing/netcdf_comparer.py | 9 +- src/processing/pid_utils.py | 2 +- src/processing/processor.py | 10 +- src/processing/product.py | 8 +- src/processing/storage_api.py | 4 +- src/processing/utils.py | 24 +++-- tests/integration/test.py | 7 +- tests/unit/test_housekeeping.py | 5 +- tests/unit/test_utils_module.py | 49 ++++++--- 29 files changed, 261 insertions(+), 212 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d31712b8..d47a9f94 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v6.0.0 hooks: - id: check-added-large-files args: ["--maxkb=6000"] @@ -18,7 +18,7 @@ repos: args: ["--fix", "lf"] - id: trailing-whitespace - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.5 + rev: v0.12.10 hooks: - id: ruff args: ["--fix"] @@ -28,7 +28,7 @@ repos: hooks: - id: prettier - repo: https://github.com/pappasam/toml-sort - rev: v0.23.1 + rev: v0.24.2 hooks: - id: toml-sort-fix - repo: local @@ -39,7 +39,7 @@ repos: language: system types: [python] - repo: https://github.com/crate-ci/typos - rev: v1.23.5 + rev: v1.35.5 hooks: - id: typos args: ["--force-exclude"] diff --git a/pyproject.toml b/pyproject.toml index d9734a32..f25ba369 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,12 +65,10 @@ follow_imports = "skip" filename = "src/processing/version.py" pattern = ["MAJOR = (?P\\d+)", "MINOR = (?P\\d+)", "PATCH = (?P\\d+)"] -[tool.ruff] -select = ["ANN"] -ignore = ["ANN001", "ANN002", "ANN101", "ANN201", "ANN202", "ANN204"] -extend-select = ["I"] +[tool.ruff.lint] +select = ["ANN", "I"] -[tool.ruff.lint.extend-per-file-ignores] +[tool.ruff.lint.per-file-ignores] "__init__.py" = ["F401"] [tool.setuptools.dynamic] diff --git a/scripts/cloudnet.py b/scripts/cloudnet.py index 3f784622..854c6eba 100755 --- a/scripts/cloudnet.py +++ b/scripts/cloudnet.py @@ -24,6 +24,8 @@ RawMetadata, Site, ) +from requests import Session + from processing import utils from processing.config import Config from processing.dvas import Dvas @@ -43,7 +45,6 @@ from processing.product import process_product from processing.storage_api import StorageApi from processing.utils import SkipTaskError -from requests import Session logging.basicConfig(level=logging.INFO) @@ -71,7 +72,7 @@ RESET = "" -def main(): +def main() -> None: config = Config() session = utils.make_session() client = APIClient(f"{config.dataportal_url}/api/", session) @@ -187,7 +188,9 @@ def _parse_args(client: APIClient) -> Namespace: return args -def process_main(args: Namespace, config: Config, session: Session, client: APIClient): +def process_main( + args: Namespace, config: Config, session: Session, client: APIClient +) -> None: md_api = MetadataApi(config, session) storage_api = StorageApi(config, session) pid_utils = PidUtils(config, session) diff --git a/scripts/cronjobs/freeze.py b/scripts/cronjobs/freeze.py index 9092c70a..05729beb 100755 --- a/scripts/cronjobs/freeze.py +++ b/scripts/cronjobs/freeze.py @@ -4,15 +4,16 @@ import traceback from datetime import timedelta +from requests import Session + from processing import utils from processing.config import Config from processing.metadata_api import MetadataApi -from requests import Session logging.basicConfig(level=logging.INFO) -def main(): +def main() -> None: config = Config() session = utils.make_session() md_api = MetadataApi(config, session) @@ -54,7 +55,7 @@ def _get_freeze_payload(freeze_after_days: int) -> dict: return {"volatile": True, "releasedBefore": updated_before.isoformat()} -def _is_freezable(md_api: MetadataApi, file_uuid: str, depth: int = 0): +def _is_freezable(md_api: MetadataApi, file_uuid: str, depth: int = 0) -> bool: file = md_api.get(f"api/files/{file_uuid}") return ( (depth == 0 or not file["volatile"]) @@ -66,7 +67,7 @@ def _is_freezable(md_api: MetadataApi, file_uuid: str, depth: int = 0): ) -def _publish_freeze_task(config: Config, session: Session, file: dict): +def _publish_freeze_task(config: Config, session: Session, file: dict) -> None: task = { "type": "freeze", "siteId": file["site"]["id"], diff --git a/scripts/cronjobs/yesterdays-qc.py b/scripts/cronjobs/yesterdays-qc.py index ae38c7a3..4a6f1940 100755 --- a/scripts/cronjobs/yesterdays-qc.py +++ b/scripts/cronjobs/yesterdays-qc.py @@ -4,15 +4,16 @@ import traceback from datetime import timedelta +from requests import Session + from processing import utils from processing.config import Config from processing.metadata_api import MetadataApi -from requests import Session logging.basicConfig(level=logging.INFO) -def main(): +def main() -> None: config = Config() session = utils.make_session() md_api = MetadataApi(config, session) @@ -43,7 +44,7 @@ def _find_yesterdays_files(md_api: MetadataApi) -> list: return regular_files + model_files -def _publish_qc_task(config: Config, session: Session, file: dict): +def _publish_qc_task(config: Config, session: Session, file: dict) -> None: task = { "type": "qc", "siteId": file["site"]["id"], diff --git a/scripts/dvas-json.py b/scripts/dvas-json.py index b3c69807..5e8ce89b 100755 --- a/scripts/dvas-json.py +++ b/scripts/dvas-json.py @@ -5,6 +5,7 @@ from uuid import UUID from cloudnet_api_client import APIClient + from processing.config import Config from processing.dvas import DvasMetadata from processing.metadata_api import MetadataApi diff --git a/scripts/submit-data-to-dev.py b/scripts/submit-data-to-dev.py index 6da0869d..35cbe6fa 100755 --- a/scripts/submit-data-to-dev.py +++ b/scripts/submit-data-to-dev.py @@ -13,7 +13,7 @@ PASSWORD = "admin" -def main(): +def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("filename", type=Path) parser.add_argument("-s", "--site", required=True) diff --git a/scripts/worker.py b/scripts/worker.py index 0241a306..cb42002d 100755 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -8,9 +8,11 @@ from pathlib import Path from tempfile import TemporaryDirectory from threading import Event +from types import FrameType from cloudnet_api_client import APIClient from cloudnet_api_client.containers import ExtendedProduct, Site + from processing import utils from processing.config import Config from processing.dvas import Dvas @@ -48,7 +50,7 @@ def __init__(self) -> None: stderr_handler.setFormatter(formatter) logger.addHandler(stderr_handler) - def clear_memory(self): + def clear_memory(self) -> None: """Clear log memory.""" self._memory.truncate(0) self._memory.seek(0) @@ -223,7 +225,7 @@ def publish_followup_tasks( for product_id in product.derived_product_ids: self.publish_followup_task(product_id, params) - def publish_followup_task(self, product_id: str, params: ProcessParams): + def publish_followup_task(self, product_id: str, params: ProcessParams) -> None: product = self.client.product(product_id) if product.experimental and product.id not in ( "cpr-simulation", @@ -281,7 +283,7 @@ def main() -> None: worker = Worker(config) exit = Event() - def signal_handler(sig, frame): + def signal_handler(sig: int, frame: FrameType | None) -> None: logging.info("Received termination signal") exit.set() diff --git a/src/housekeeping/hatpro.py b/src/housekeeping/hatpro.py index 786f3f83..ca107d11 100644 --- a/src/housekeeping/hatpro.py +++ b/src/housekeeping/hatpro.py @@ -49,13 +49,13 @@ class HatproHkd: header: dict data: dict - def __init__(self, filename: bytes | Path): + def __init__(self, filename: bytes | Path) -> None: with open(filename, "rb") as file: self._read_header(file) self._read_data(file) _check_eof(file) - def _read_header(self, file: BinaryIO): + def _read_header(self, file: BinaryIO) -> None: self.header = _read_from_file( file, [ @@ -66,11 +66,11 @@ def _read_header(self, file: BinaryIO): ], ) if self.header["HKDCode"] != 837854832: - raise HatproHkdError(f'Unknown file signature: {self.header["HKDCode"]}') + raise HatproHkdError(f"Unknown file signature: {self.header['HKDCode']}") if self.header["HKDTimeRef"] != TIME_REF_UTC: raise HatproHkdError("Only UTC time reference is supported") - def _read_data(self, file: BinaryIO): + def _read_data(self, file: BinaryIO) -> None: fields = [("T", " None: with netCDF4.Dataset(filename) as nc: time_ref = nc.variables.get("time_reference") if time_ref and time_ref[0] != TIME_REF_UTC: diff --git a/src/housekeeping/housekeeping.py b/src/housekeeping/housekeeping.py index c7ee91df..4f6203b1 100644 --- a/src/housekeeping/housekeeping.py +++ b/src/housekeeping/housekeeping.py @@ -16,11 +16,11 @@ from influxdb_client.client.write_api import SYNCHRONOUS from netCDF4 import Dataset from numpy import ma -from processing.utils import unzip_gz_file from rpgpy import read_rpg from rpgpy.utils import decode_rpg_status_flags, rpg_seconds2datetime64 from housekeeping.cl61 import read_cl61 +from processing.utils import unzip_gz_file from .basta import read_basta from .ceilo import read_cl31_cl51, read_cs135, read_ct25k @@ -35,7 +35,7 @@ class ValidDateRange(Enum): MONTH = "month" -def process_record(record: RawMetadata, client: APIClient, db: Database): +def process_record(record: RawMetadata, client: APIClient, db: Database) -> None: try: reader = get_reader(record) if reader is None: @@ -164,7 +164,7 @@ def _handle_halo_doppler_lidar(filepath: Path, metadata: RawMetadata) -> list[Po def get_reader( - metadata: RawMetadata + metadata: RawMetadata, ) -> Callable[[Path, RawMetadata], list[Point]] | None: instrument_id = metadata.instrument.instrument_id filename = metadata.filename.lower() @@ -217,13 +217,13 @@ def __init__(self) -> None: self.bucket = os.environ["INFLUXDB_BUCKET"] self.write_api = self.client.write_api(write_options=SYNCHRONOUS) - def write(self, points: Iterable[Point]): + def write(self, points: Iterable[Point]) -> None: self.write_api.write(bucket=self.bucket, record=points) - def __enter__(self): + def __enter__(self) -> Database: return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type, exc_val, exc_tb) -> None: # noqa: ANN001 self.write_api.close() self.client.close() diff --git a/src/processing/fetch.py b/src/processing/fetch.py index bbe92a0b..37cbd12e 100644 --- a/src/processing/fetch.py +++ b/src/processing/fetch.py @@ -45,7 +45,7 @@ def __init__( site: Site, date: datetime.date, args: Namespace, - ): + ) -> None: self.product = product self.args = args self.payload = {"site": site.id, "date": date.isoformat()} @@ -114,7 +114,11 @@ def get_model_metadata(self) -> list: def fetch( - product: Product, site: Site, date: datetime.date, args: Namespace, client=APIClient + product: Product, + site: Site, + date: datetime.date, + args: Namespace, + client: APIClient, ) -> None: is_production = os.environ.get("PID_SERVICE_TEST_ENV", "false").lower() != "true" if is_production: @@ -166,8 +170,8 @@ def _process_metadata( submitter: Callable[[Path, dict], str], upload_metadata: list, show_progress: bool = True, -): - def process_row(row: dict): +) -> None: + def process_row(row: dict) -> str: filename = _download_file(row) return submitter(filename, row) @@ -245,7 +249,7 @@ def _submit_upload(filename: Path, row: dict) -> str: if res.status_code == 200: with filename.open("rb") as f: res = requests.put( - f'{url}data/{metadata["checksum"]}', data=f, auth=DATAPORTAL_AUTH + f"{url}data/{metadata['checksum']}", data=f, auth=DATAPORTAL_AUTH ) res.raise_for_status() elif res.status_code != 409: @@ -329,7 +333,7 @@ def _submit_file(filename: Path, row: dict) -> str: return "OK" -def _fetch_calibration(upload_metadata: list): +def _fetch_calibration(upload_metadata: list) -> None: first = True processed_pid_dates: set[tuple] = set() for upload in upload_metadata: diff --git a/src/processing/harmonizer/core.py b/src/processing/harmonizer/core.py index 97bff8ea..38821645 100644 --- a/src/processing/harmonizer/core.py +++ b/src/processing/harmonizer/core.py @@ -14,12 +14,14 @@ class Level1Nc: - def __init__(self, nc_raw: netCDF4.Dataset, nc: netCDF4.Dataset, data: dict): + def __init__( + self, nc_raw: netCDF4.Dataset, nc: netCDF4.Dataset, data: dict + ) -> None: self.nc_raw = nc_raw self.nc = nc self.data = data - def convert_time(self): + def convert_time(self) -> None: """Converts time to decimal hours.""" time = self.nc.variables["time"] calendar = getattr(time, "calendar", "standard") @@ -58,7 +60,7 @@ def copy_file_contents( keys: tuple | None = None, time_ind: list | None = None, skip: tuple | None = None, - ): + ) -> None: """Copies all variables and global attributes from one file to another. Optionally copies only certain keys and / or uses certain time indices only. """ @@ -75,7 +77,7 @@ def copy_file_contents( self.copy_variable(key, time_ind) self._copy_global_attributes() - def copy_variable(self, key: str, time_ind: list | None = None): + def copy_variable(self, key: str, time_ind: list | None = None) -> None: """Copies one variable from source file to target. Optionally uses certain time indices only. """ @@ -97,7 +99,7 @@ def copy_variable(self, key: str, time_ind: list | None = None): screened_data = self._screen_data(variable, time_ind) var_out[:] = screened_data - def add_geolocation(self): + def add_geolocation(self) -> None: """Adds standard geolocation information.""" for key in ("altitude", "latitude", "longitude"): if key not in self.nc.variables.keys(): @@ -111,7 +113,7 @@ def add_global_attributes( self, cloudnet_file_type: str, instrument: cloudnetpy.instruments.instruments.Instrument, - ): + ) -> None: """Adds standard global attributes.""" location = self.data["site_meta"]["name"] self.nc.Conventions = "CF-1.8" @@ -127,7 +129,7 @@ def add_uuid(self) -> str: self.nc.file_uuid = uuid return uuid - def add_history(self, product: str, source: str = "history"): + def add_history(self, product: str, source: str = "history") -> None: """Adds history attribute.""" old_history = getattr(self.nc_raw, source, "") history = ( @@ -138,11 +140,11 @@ def add_history(self, product: str, source: str = "history"): history = f"{history}\n{old_history}" self.nc.history = history - def add_date(self): + def add_date(self) -> None: """Adds date attributes.""" self.nc.year, self.nc.month, self.nc.day = self.data["date"].split("-") - def harmonize_attribute(self, attribute: str, keys: tuple | None = None): + def harmonize_attribute(self, attribute: str, keys: tuple | None = None) -> None: """Harmonizes variable attributes.""" keys_to_process = keys if keys is not None else self.nc.variables.keys() for key in keys_to_process: @@ -154,12 +156,12 @@ def harmonize_attribute(self, attribute: str, keys: tuple | None = None): else: logging.debug(f"Can't find {attribute} for {key}") - def harmonize_standard_attributes(self, key: str): + def harmonize_standard_attributes(self, key: str) -> None: """Harmonizes standard attributes of one variable.""" for attribute in ("units", "long_name", "standard_name"): self.harmonize_attribute(attribute, (key,)) - def clean_variable_attributes(self, accepted_extra: tuple | None = None): + def clean_variable_attributes(self, accepted_extra: tuple | None = None) -> None: """Removes obsolete variable attributes.""" accepted = ("_FillValue", "units") + (accepted_extra or ()) for _, item in self.nc.variables.items(): @@ -167,17 +169,17 @@ def clean_variable_attributes(self, accepted_extra: tuple | None = None): if attr not in accepted: delattr(item, attr) - def clean_global_attributes(self): + def clean_global_attributes(self) -> None: """Removes all global attributes.""" for attr in self.nc.ncattrs(): delattr(self.nc, attr) - def fix_name(self, keymap: dict): + def fix_name(self, keymap: dict) -> None: for old_name, new_name in keymap.items(): if old_name in self.nc.variables: self.nc.renameVariable(old_name, new_name) - def fix_attribute(self, keymap: dict, attribute: str): + def fix_attribute(self, keymap: dict, attribute: str) -> None: """Fixes one attribute.""" if attribute not in ("units", "long_name", "standard_name"): raise ValueError @@ -228,7 +230,7 @@ def get_valid_time_indices(self) -> list: raise cloudnetpy.exceptions.ValidTimeStampError return valid_ind - def to_ms1(self, variable: str): + def to_ms1(self, variable: str) -> None: """Converts velocity to m s-1.""" target_unit = "m s-1" if not hasattr(self.nc.variables[variable], "units"): @@ -253,7 +255,7 @@ def to_ms1(self, variable: str): logging.info(f"Converting {variable} from {units} to {target_unit}.") self.nc.variables[variable].units = target_unit - def to_m(self, variable: str): + def to_m(self, variable: str) -> None: """Converts length to m.""" target_unit = "m" if not hasattr(self.nc.variables[variable], "units"): @@ -274,7 +276,7 @@ def to_m(self, variable: str): logging.info(f"Converting {variable} from {units} to {target_unit}.") self.nc.variables[variable].units = target_unit - def to_ratio(self, variable: str): + def to_ratio(self, variable: str) -> None: """Converts percent to ratio.""" target_unit = "1" if not hasattr(self.nc.variables[variable], "units"): @@ -295,7 +297,7 @@ def to_ratio(self, variable: str): logging.info(f"Converting {variable} from {units} to {target_unit}.") self.nc.variables[variable].units = target_unit - def to_pa(self, variable: str): + def to_pa(self, variable: str) -> None: """Converts pressure to Pa.""" target_unit = "Pa" if not hasattr(self.nc.variables[variable], "units"): @@ -316,7 +318,7 @@ def to_pa(self, variable: str): logging.info(f"Converting {variable} from {units} to {target_unit}.") self.nc.variables[variable].units = target_unit - def to_degree(self, variable: str): + def to_degree(self, variable: str) -> None: """Converts direction to degree.""" target_unit = "degree" if not hasattr(self.nc.variables[variable], "units"): @@ -335,7 +337,7 @@ def to_degree(self, variable: str): logging.info(f"Converting {variable} from {units} to {target_unit}.") self.nc.variables[variable].units = target_unit - def to_k(self, variable: str): + def to_k(self, variable: str) -> None: target_unit = "K" if not hasattr(self.nc.variables[variable], "units"): self._set_fallback_unit(variable, target_unit) @@ -361,16 +363,16 @@ def to_k(self, variable: str): ) self.nc.variables[variable].units = target_unit - def _set_fallback_unit(self, variable: str, fallback: str): + def _set_fallback_unit(self, variable: str, fallback: str) -> None: logging.warning(f"No units attribute in '{variable}'! Assuming '{fallback}'.") self.nc.variables[variable].units = fallback - def _copy_global_attributes(self): + def _copy_global_attributes(self) -> None: for name in self.nc_raw.ncattrs(): setattr(self.nc, name, self.nc_raw.getncattr(name)) def _get_time_units(self) -> str: - return f'hours since {self.data["date"]} 00:00:00 +00:00' + return f"hours since {self.data['date']} 00:00:00 +00:00" @staticmethod def _screen_data( @@ -390,6 +392,8 @@ def _screen_data( return variable[:] @staticmethod - def _copy_variable_attributes(source, target) -> None: + def _copy_variable_attributes( + source: netCDF4.Variable, target: netCDF4.Variable + ) -> None: attr = {k: source.getncattr(k) for k in source.ncattrs() if k != "_FillValue"} target.setncatts(attr) diff --git a/src/processing/harmonizer/doppler_lidar.py b/src/processing/harmonizer/doppler_lidar.py index 243b8ae6..fc1ce26b 100644 --- a/src/processing/harmonizer/doppler_lidar.py +++ b/src/processing/harmonizer/doppler_lidar.py @@ -80,7 +80,7 @@ def harmonize_doppler_lidar_stare_file( class DopplerLidarWindNc(core.Level1Nc): - def copy_file(self, valid_ind: list): + def copy_file(self, valid_ind: list) -> None: keys = ( "time", "height", @@ -92,12 +92,12 @@ def copy_file(self, valid_ind: list): ) self.copy_file_contents(keys, valid_ind) - def height_to_asl_height(self): + def height_to_asl_height(self) -> None: self.nc.variables["height"][:] += self.nc.variables["altitude"][:] self.nc.variables["height"].standard_name = "height_above_mean_sea_level" self.nc.variables["height"].long_name = "Height above mean sea level" - def harmonise_serial_number(self): + def harmonise_serial_number(self) -> None: if "serial_number" in self.nc.ncattrs(): self.nc.serial_number = _harmonise_doppler_lidar_serial_number( self.nc.serial_number @@ -105,7 +105,7 @@ def harmonise_serial_number(self): class DopplerLidarStareNc(core.Level1Nc): - def clean_global_attributes(self): + def clean_global_attributes(self) -> None: for attr in self.nc.ncattrs(): if attr == "filename": delattr(self.nc, attr) @@ -113,7 +113,7 @@ def clean_global_attributes(self): self.nc.serial_number = getattr(self.nc, attr) delattr(self.nc, attr) - def copy_file(self, valid_ind: list): + def copy_file(self, valid_ind: list) -> None: """Copies useful variables only.""" keys = ( "beta", @@ -133,12 +133,12 @@ def copy_file(self, valid_ind: list): ) self.copy_file_contents(keys, valid_ind) - def add_zenith_angle(self): + def add_zenith_angle(self) -> None: """Converts elevation to zenith angle.""" self.nc.renameVariable("elevation", "zenith_angle") self.nc.variables["zenith_angle"][:] = 90 - self.nc.variables["zenith_angle"][:] - def check_zenith_angle(self): + def check_zenith_angle(self) -> None: """Checks zenith angle value.""" threshold = 15 if ( @@ -146,7 +146,7 @@ def check_zenith_angle(self): ) > threshold: raise MiscError(f"Invalid zenith angle {zenith_angle}") - def add_range(self): + def add_range(self) -> None: """Converts halo 'range', which is actually height, to true range (towards LOS).""" self.nc.renameVariable("range", "height") @@ -155,16 +155,16 @@ def add_range(self): self.nc.variables["range"][:] /= np.cos(np.radians(zenith_angle)) self.nc.variables["height"][:] += self.nc.variables["altitude"][:] - def add_wavelength(self): + def add_wavelength(self) -> None: """Converts wavelength m to nm.""" self.nc.variables["wavelength"][:] *= 1e9 - def fix_time_units(self): + def fix_time_units(self) -> None: """Fixes time units.""" self.nc.variables["time"].units = self._get_time_units() self.nc.variables["time"].calendar = "standard" - def fix_long_names(self): + def fix_long_names(self) -> None: if "depolarisation_raw" in self.nc.variables: self.nc.variables[ "depolarisation_raw" @@ -196,7 +196,7 @@ def fix_long_names(self): "Attenuated backscatter coefficient for the cross-polarised signal" ) - def harmonise_serial_number(self): + def harmonise_serial_number(self) -> None: if "serial_number" in self.nc.ncattrs(): self.nc.serial_number = _harmonise_doppler_lidar_serial_number( self.nc.serial_number diff --git a/src/processing/harmonizer/halo_calibrated.py b/src/processing/harmonizer/halo_calibrated.py index f1a825a9..84411630 100644 --- a/src/processing/harmonizer/halo_calibrated.py +++ b/src/processing/harmonizer/halo_calibrated.py @@ -44,17 +44,17 @@ def harmonize_halo_calibrated_file(data: dict) -> str: class HaloNcCalibrated(core.Level1Nc): - def copy_file(self, valid_ind: list): + def copy_file(self, valid_ind: list) -> None: """Copies useful variables only.""" keys = ("beta", "beta_raw", "time", "wavelength", "elevation", "range") self.copy_file_contents(keys, valid_ind) - def add_zenith_angle(self): + def add_zenith_angle(self) -> None: """Converts elevation to zenith angle.""" self.nc.renameVariable("elevation", "zenith_angle") self.nc.variables["zenith_angle"][:] = 90 - self.nc.variables["zenith_angle"][:] - def check_zenith_angle(self): + def check_zenith_angle(self) -> None: """Checks zenith angle value.""" threshold = 15 if ( @@ -62,18 +62,18 @@ def check_zenith_angle(self): ) > threshold: raise MiscError(f"Invalid zenith angle {zenith_angle}") - def add_range(self): + def add_range(self) -> None: """Converts halo 'range', which is actually height, to true range (towards LOS).""" self.nc.renameVariable("range", "height") self.copy_variable("range") zenith_angle = np.median(self.nc.variables["zenith_angle"][:]) self.nc.variables["range"][:] /= np.cos(np.radians(zenith_angle)) - def add_wavelength(self): + def add_wavelength(self) -> None: """Converts wavelength m to nm.""" self.nc.variables["wavelength"][:] *= 1e9 - def fix_time_units(self): + def fix_time_units(self) -> None: """Fixes time units.""" self.nc.variables["time"].units = self._get_time_units() self.nc.variables["time"].calendar = "standard" diff --git a/src/processing/harmonizer/hatpro.py b/src/processing/harmonizer/hatpro.py index 40c4fca6..6c07250a 100644 --- a/src/processing/harmonizer/hatpro.py +++ b/src/processing/harmonizer/hatpro.py @@ -45,7 +45,7 @@ def harmonize_hatpro_file(data: dict) -> str: class HatproNc(core.Level1Nc): bad_lwp_keys = ("LWP", "LWP_data", "clwvi", "atmosphere_liquid_water_content") - def copy_file(self, all_keys: bool = False): + def copy_file(self, all_keys: bool = False) -> None: """Copies essential fields only.""" valid_ind = self._get_valid_timestamps() if all_keys is True: @@ -54,7 +54,7 @@ def copy_file(self, all_keys: bool = False): possible_keys = ("lwp", "time") + self.bad_lwp_keys self._copy_hatpro_file_contents(valid_ind, possible_keys) - def add_lwp(self): + def add_lwp(self) -> None: """Converts lwp and fixes its attributes.""" key = "lwp" for invalid_name in self.bad_lwp_keys: @@ -72,7 +72,7 @@ def add_lwp(self): if hasattr(lwp, attr): delattr(lwp, attr) - def check_lwp_data(self): + def check_lwp_data(self) -> None: """Sanity checks LWP data.""" threshold_kg = 10 lwp = self.nc.variables["lwp"][:] @@ -82,7 +82,7 @@ def check_lwp_data(self): f"Invalid LWP data, median value: {np.round(median_value, 2)} kg" ) - def sort_time(self): + def sort_time(self) -> None: """Sorts time array.""" time = self.nc.variables["time"][:] array = self.nc.variables["lwp"][:] @@ -90,7 +90,7 @@ def sort_time(self): self.nc.variables["time"][:] = time[ind] self.nc.variables["lwp"][:] = array[ind] - def check_time_reference(self): + def check_time_reference(self) -> None: """Checks the reference time zone.""" key = "time_reference" if key in self.nc_raw.variables: @@ -112,7 +112,9 @@ def _get_valid_timestamps(self) -> list: _, ind = np.unique(time_stamps[valid_ind], return_index=True) return list(np.array(valid_ind)[ind]) - def _copy_hatpro_file_contents(self, time_ind: list, keys: tuple | None = None): + def _copy_hatpro_file_contents( + self, time_ind: list, keys: tuple | None = None + ) -> None: self.nc.createDimension("time", len(time_ind)) for name, variable in self.nc_raw.variables.items(): if keys is not None and name not in keys: diff --git a/src/processing/harmonizer/parsivel.py b/src/processing/harmonizer/parsivel.py index 316f8cb8..de97a2e4 100644 --- a/src/processing/harmonizer/parsivel.py +++ b/src/processing/harmonizer/parsivel.py @@ -191,7 +191,7 @@ def harmonize_parsivel_file(data: dict) -> str: class ParsivelNc(core.Level1Nc): - def create_dimensions(self, time_ind: list): + def create_dimensions(self, time_ind: list) -> None: for name, dimension in self.nc_raw.dimensions.items(): name = DIMENSION_MAP.get(name, name) n = len(time_ind) if name == "time" else dimension.size @@ -201,13 +201,13 @@ def copy_file( self, time_ind: list, skip: tuple, - ): + ) -> None: for key in self.nc_raw.variables.keys(): if key not in skip: self.copy_var(key, time_ind) self._copy_global_attributes() - def copy_var(self, key: str, time_ind: list): + def copy_var(self, key: str, time_ind: list) -> None: variable = self.nc_raw.variables[key] if key in ("time", "Meas_Time"): @@ -244,7 +244,7 @@ def _fetch_fill_value( return netCDF4.default_fillvals[dtype] return None - def _get_new_name(self, key) -> str: + def _get_new_name(self, key: str) -> str: keymap = { "V_sensor": "V_power_supply", "E_kin": "kinetic_energy", @@ -289,14 +289,14 @@ def _get_new_name(self, key) -> str: } return keymap.get(key, key) - def add_serial_number(self): + def add_serial_number(self) -> None: if "serial_no" in self.nc_raw.variables: self.nc.serial_number = str(self.nc_raw["serial_no"][0]) for attr in ("Sensor_ID", "sensor_serial_number"): if hasattr(self.nc_raw, attr): self.nc.serial_number = getattr(self.nc_raw, attr) - def fix_long_names(self): + def fix_long_names(self) -> None: keymap = { "diameter_bnds": "Diameter bounds", "velocity_bnds": "Velocity bounds", @@ -329,7 +329,7 @@ def fix_long_names(self): if key not in skip and hasattr(var, "long_name"): var.long_name = var.long_name.lower().capitalize() - def fix_units(self): + def fix_units(self) -> None: keymap = { "velocity_spread": "m s-1", "velocity_bnds": "m s-1", @@ -346,7 +346,7 @@ def fix_units(self): } self.fix_attribute(keymap, "units") - def fix_standard_names(self): + def fix_standard_names(self) -> None: for key in self.nc.variables: if key not in ( "time", @@ -364,24 +364,24 @@ def fix_standard_names(self): } self.fix_attribute(keymap, "standard_name") - def fix_comments(self): + def fix_comments(self) -> None: for key in self.nc.variables: if hasattr(self.nc.variables[key], "comment"): delattr(self.nc.variables[key], "comment") if (attr := ATTRIBUTES.get(key)) and attr.comment: self.nc.variables[key].comment = attr.comment - def convert_precipitations(self): + def convert_precipitations(self) -> None: for key in self.nc.variables: if key in ("rainfall_rate", "snowfall_rate", "fall_velocity"): self.to_ms1(key) - def convert_diameters(self): + def convert_diameters(self) -> None: for key in self.nc.variables: if key in ("diameter", "diameter_spread"): self.to_m(key) - def convert_temperatures(self): + def convert_temperatures(self) -> None: for key in self.nc.variables: if key in ("T_sensor"): self.to_k(key) @@ -392,7 +392,7 @@ def _find_bad_values(self, variable: netCDF4.Variable) -> np.ndarray: data = variable[:] return np.isclose(data, bad_value, atol=threshold) - def _mask_bad_values(self): + def _mask_bad_values(self) -> None: for variable in self.nc.variables.values(): mask = self._find_bad_values(variable) variable[:] = ma.masked_array(variable[:], mask=mask) diff --git a/src/processing/harmonizer/rain_gauge.py b/src/processing/harmonizer/rain_gauge.py index 7c7d628d..5143adf9 100644 --- a/src/processing/harmonizer/rain_gauge.py +++ b/src/processing/harmonizer/rain_gauge.py @@ -38,7 +38,7 @@ def harmonize_pluvio_nc(data: dict) -> str: return _harmonize(data, instruments.PLUVIO2) -def _harmonize(data: dict, instrument: Instrument): +def _harmonize(data: dict, instrument: Instrument) -> str: if "output_path" not in data: temp_file = NamedTemporaryFile() with ( @@ -73,11 +73,11 @@ def _harmonize(data: dict, instrument: Instrument): class RainGaugeNc(core.Level1Nc): - def mask_bad_data_values(self): + def mask_bad_data_values(self) -> None: for _, variable in self.nc.variables.items(): variable[:] = ma.masked_invalid(variable[:]) - def fix_variable_attributes(self): + def fix_variable_attributes(self) -> None: for key in (RATE, AMOUNT): for attr in ("long_name", "standard_name", "comment"): self.harmonize_attribute(attr, (key,)) @@ -85,11 +85,11 @@ def fix_variable_attributes(self): def copy_data( self, time_ind: list, - ): + ) -> None: for key in VALID_KEYS: self._copy_variable(key, time_ind) - def _copy_variable(self, key: str, time_ind: list): + def _copy_variable(self, key: str, time_ind: list) -> None: if key not in self.nc_raw.variables.keys(): logging.debug(f"Key {key} not found from the source file.") return @@ -111,7 +111,7 @@ def _copy_variable(self, key: str, time_ind: list): screened_data = self._screen_data(variable, time_ind) var_out[:] = screened_data - def fix_variable_names(self): + def fix_variable_names(self) -> None: keymap = { "rain_rate": RATE, "int_h": RATE, @@ -122,7 +122,7 @@ def fix_variable_names(self): } self.fix_name(keymap) - def fix_pt_jumps(self): + def fix_pt_jumps(self) -> None: """Fixes suspicious jumps from a valid value to single 0-value and back in Thies PT data.""" data = self.nc.variables[AMOUNT][:] for i in range(1, len(data) - 1): diff --git a/src/processing/instrument.py b/src/processing/instrument.py index 93ec7f29..2e837d50 100644 --- a/src/processing/instrument.py +++ b/src/processing/instrument.py @@ -14,7 +14,9 @@ ProcessClass = Type[instrument_process.ProcessInstrument] -def process_instrument(processor: Processor, params: InstrumentParams, directory: Path): +def process_instrument( + processor: Processor, params: InstrumentParams, directory: Path +) -> None: uuid = Uuid() pid_to_new_file = None if existing_product := processor.get_product(params): diff --git a/src/processing/instrument_process.py b/src/processing/instrument_process.py index a09cc09d..6679ca71 100644 --- a/src/processing/instrument_process.py +++ b/src/processing/instrument_process.py @@ -7,7 +7,7 @@ import shutil from collections import defaultdict from pathlib import Path -from typing import Literal +from typing import Literal, NoReturn from uuid import UUID import doppy @@ -49,7 +49,7 @@ def __init__( params: InstrumentParams, uuid: Uuid, processor: Processor, - ): + ) -> None: self.output_path = directory / "output.nc" self.daily_path = directory / "daily.nc" self.raw_dir = directory / "raw" @@ -94,7 +94,7 @@ def download_instrument( include_tag_subset: set[str] | None = None, exclude_tag_subset: set[str] | None = None, date: datetime.date | tuple[datetime.date, datetime.date] | None = None, - allow_empty=False, + allow_empty: bool = False, filename_prefix: str | None = None, filename_suffix: str | None = None, subdir: str | None = None, @@ -123,7 +123,7 @@ def download_instrument( class ProcessRadar(ProcessInstrument): - def process_rpg_fmcw_94(self): + def process_rpg_fmcw_94(self) -> None: if self.params.site.id == "rv-meteor": full_paths, self.uuid.raw = self.download_instrument() concat_wrapper.concat_netcdf_files( @@ -142,10 +142,10 @@ def process_rpg_fmcw_94(self): ) self.uuid.raw = _get_valid_uuids(raw_uuids, full_paths, valid_full_paths) - def process_rpg_fmcw_35(self): + def process_rpg_fmcw_35(self) -> None: self.process_rpg_fmcw_94() - def process_mira_10(self): + def process_mira_10(self) -> None: full_paths, self.uuid.raw = self.download_instrument() full_paths = _unzip_gz_files(full_paths) full_paths = self._fix_suffices(full_paths, ".znc") @@ -160,7 +160,7 @@ def process_mira_10(self): **self._kwargs, ) - def process_mira_35(self): + def process_mira_35(self) -> None: full_paths, self.uuid.raw = self.download_instrument() full_paths = _unzip_gz_files(full_paths) full_paths = self._fix_suffices(full_paths, ".mmclx") @@ -175,21 +175,21 @@ def process_mira_35(self): **self._kwargs, ) - def process_basta(self): + def process_basta(self) -> None: full_paths, self.uuid.raw = self.download_instrument() concat_wrapper.concat_netcdf_files( full_paths, self.params.date.isoformat(), self.daily_path ) self.uuid.product = basta2nc(str(self.daily_path), *self._args, **self._kwargs) - def process_copernicus(self): + def process_copernicus(self) -> None: full_paths, self.uuid.raw = self.download_instrument() self._add_calibration("range_offset", 0) self.uuid.product = copernicus2nc( str(self.raw_dir), *self._args, **self._kwargs ) - def process_galileo(self): + def process_galileo(self) -> None: full_paths, self.uuid.raw = self.download_instrument() self.uuid.product = galileo2nc(str(self.raw_dir), *self._args, **self._kwargs) @@ -222,7 +222,7 @@ def _add_calibration( class ProcessDopplerLidarWind(ProcessInstrument): - def process_halo_doppler_lidar(self): + def process_halo_doppler_lidar(self) -> None: try: calibration = self.processor.client.calibration( self.params.instrument.pid, self.params.date @@ -256,16 +256,16 @@ def process_halo_doppler_lidar(self): data, instruments.HALO ) - def process_wls100s(self): + def process_wls100s(self) -> NoReturn: raise NotImplementedError() - def process_wls200s(self): + def process_wls200s(self) -> None: self._process_windcube(instruments.WINDCUBE_WLS200S) - def process_wls400s(self): + def process_wls400s(self) -> None: self._process_windcube(instruments.WINDCUBE_WLS400S) - def process_wls70(self): + def process_wls70(self) -> None: full_paths, self.uuid.raw = self.download_instrument(include_pattern=r".*\.rtd") try: options = self._calibration_options() @@ -280,7 +280,7 @@ def process_wls70(self): data, instruments.WINDCUBE_WLS70 ) - def _process_windcube(self, instrument: instruments.Instrument): + def _process_windcube(self, instrument: instruments.Instrument) -> None: file_groups = defaultdict(list) full_paths, self.uuid.raw = self.download_instrument( include_pattern=r".*(vad)|(dbs).*\.nc.*", @@ -344,7 +344,7 @@ def _calibration_options(self) -> doppy.product.WindOptions | None: class ProcessDopplerLidar(ProcessInstrument): - def process_halo_doppler_lidar(self): + def process_halo_doppler_lidar(self) -> None: try: calibration = self.processor.client.calibration( self.params.instrument.pid, self.params.date @@ -410,19 +410,19 @@ def process_halo_doppler_lidar(self): data, instruments.HALO ) - def process_wls100s(self): + def process_wls100s(self) -> NoReturn: raise NotImplementedError() - def process_wls200s(self): + def process_wls200s(self) -> None: self._process_windcube(instruments.WINDCUBE_WLS200S) - def process_wls400s(self): + def process_wls400s(self) -> None: self._process_windcube(instruments.WINDCUBE_WLS400S) - def process_wls70(self): + def process_wls70(self) -> NoReturn: raise NotImplementedError() - def _process_windcube(self, instrument: instruments.Instrument): + def _process_windcube(self, instrument: instruments.Instrument) -> None: file_groups = defaultdict(list) full_paths, self.uuid.raw = self.download_instrument( include_pattern=r".*fixed.*\.nc(\..+)?" @@ -469,22 +469,22 @@ def _process_windcube(self, instrument: instruments.Instrument): class ProcessLidar(ProcessInstrument): - def process_cs135(self): + def process_cs135(self) -> None: full_paths, self.uuid.raw = self.download_instrument() full_paths.sort() _concatenate_text_files(full_paths, self.daily_path) self._call_ceilo2nc("cs135") - def process_chm15k(self): + def process_chm15k(self) -> None: self._process_chm_lidar("chm15k") - def process_chm15x(self): + def process_chm15x(self) -> None: self._process_chm_lidar("chm15x") - def process_chm15kx(self): + def process_chm15kx(self) -> None: self._process_chm_lidar("chm15kx") - def _process_chm_lidar(self, model: str): + def _process_chm_lidar(self, model: str) -> None: full_paths, raw_uuids = self.download_instrument() valid_full_paths = concat_wrapper.concat_chm15k_files( full_paths, self.params.date.isoformat(), str(self.daily_path) @@ -500,12 +500,12 @@ def process_ct25k(self) -> None: _concatenate_text_files(full_paths, self.daily_path) self._call_ceilo2nc("ct25k") - def process_halo_doppler_lidar_calibrated(self): + def process_halo_doppler_lidar_calibrated(self) -> None: full_path, self.uuid.raw = self.download_instrument(largest_only=True) data = self._get_payload_for_nc_file_augmenter(full_path) self.uuid.product = harmonizer.harmonize_halo_calibrated_file(data) - def process_pollyxt(self): + def process_pollyxt(self) -> None: full_paths, self.uuid.raw = self.download_instrument() calibration = self._fetch_pollyxt_calibration() site_meta = self.site_meta | calibration @@ -517,19 +517,19 @@ def process_pollyxt(self): date=self.params.date.isoformat(), ) - def process_minimpl(self): + def process_minimpl(self) -> NoReturn: raise NotImplementedError() - def process_ld40(self): + def process_ld40(self) -> NoReturn: raise NotImplementedError() - def process_cl31(self): + def process_cl31(self) -> None: full_paths, self.uuid.raw = self.download_instrument() full_paths.sort() _concatenate_text_files(full_paths, self.daily_path) self._call_ceilo2nc("cl31") - def process_cl51(self): + def process_cl51(self) -> None: try: calibration = self.processor.client.calibration( self.params.instrument.pid, self.params.date @@ -552,7 +552,7 @@ def process_cl51(self): _fix_cl51_timestamps(str(self.daily_path), time_offset) self._call_ceilo2nc("cl51") - def process_cl61d(self): + def process_cl61d(self) -> None: full_paths, raw_uuids = self.download_instrument( exclude_pattern="clu-generated" ) @@ -577,7 +577,7 @@ def process_cl61d(self): self._call_ceilo2nc("cl61d") self.uuid.raw = _get_valid_uuids(raw_uuids, full_paths, valid_full_paths) - def _call_ceilo2nc(self, model: str): + def _call_ceilo2nc(self, model: str) -> None: calibration = self._fetch_ceilo_calibration() site_meta = self.site_meta | calibration site_meta["model"] = model @@ -619,18 +619,18 @@ def _fetch_pollyxt_calibration(self) -> dict: class ProcessMwrL1c(ProcessInstrument): - def process_hatpro(self): + def process_hatpro(self) -> None: self._process_rpg("hatpro") - def process_lhatpro(self): + def process_lhatpro(self) -> None: self._process_rpg("lhatpro") - def process_lhumpro(self): + def process_lhumpro(self) -> None: self._process_rpg("lhumpro_u90") def _process_rpg( self, instrument_type: Literal["hatpro", "lhatpro", "lhumpro_u90"] - ): + ) -> None: calibration = self._get_calibration_data() full_paths, self.uuid.raw = self.download_instrument( include_pattern=r"\.(brt|hkd|met|irt|blb|bls)$", @@ -678,16 +678,16 @@ def _get_calibration_data(self) -> dict: class ProcessMwr(ProcessInstrument): - def process_hatpro(self): + def process_hatpro(self) -> None: self._process_rpg("hatpro") - def process_lhatpro(self): + def process_lhatpro(self) -> None: self._process_rpg("lhatpro") - def process_lhumpro(self): + def process_lhumpro(self) -> None: self._process_rpg("lhumpro_u90") - def process_radiometrics(self): + def process_radiometrics(self) -> None: full_paths, self.uuid.raw = self.download_instrument() _unzip_gz_files(full_paths) self.uuid.product = radiometrics2nc( @@ -696,7 +696,7 @@ def process_radiometrics(self): def _process_rpg( self, instrument_type: Literal["hatpro", "lhatpro", "lhumpro_u90"] - ): + ) -> None: try: full_paths, raw_uuids = self.download_instrument( include_pattern=r"\.(lwp|iwv)$", @@ -722,7 +722,7 @@ def _process_rpg( class ProcessDisdrometer(ProcessInstrument): - def process_parsivel(self): + def process_parsivel(self) -> None: try: full_paths, self.uuid.raw = self.download_instrument(largest_only=True) data = self._get_payload_for_nc_file_augmenter(full_paths) @@ -769,7 +769,7 @@ def process_parsivel(self): f.writelines(lines) self.uuid.product = parsivel2nc(full_paths, *self._args, **kwargs) - def process_thies_lnm(self): + def process_thies_lnm(self) -> None: full_paths, self.uuid.raw = self.download_instrument() full_paths.sort() full_paths = _unzip_gz_files(full_paths) @@ -796,23 +796,23 @@ def _fetch_parsivel_calibration(self) -> dict: class ProcessRainGauge(ProcessInstrument): - def process_pluvio(self): + def process_pluvio(self) -> None: full_path, self.uuid.raw = self.download_instrument(largest_only=True) data = self._get_payload_for_nc_file_augmenter(full_path) self.uuid.product = harmonizer.harmonize_pluvio_nc(data) - def process_thies_precipitation_transmitter(self): + def process_thies_precipitation_transmitter(self) -> None: full_path, self.uuid.raw = self.download_instrument(largest_only=True) data = self._get_payload_for_nc_file_augmenter(full_path) self.uuid.product = harmonizer.harmonize_thies_pt_nc(data) - def process_rain_e_h3(self): + def process_rain_e_h3(self) -> None: full_path, self.uuid.raw = self.download_instrument(largest_only=True) self.uuid.product = rain_e_h32nc(full_path[0], *self._args, **self._kwargs) class ProcessWeatherStation(ProcessInstrument): - def process_weather_station(self): + def process_weather_station(self) -> None: supported_sites = ( "palaiseau", "lindenberg", @@ -855,13 +855,13 @@ def process_weather_station(self): [str(path) for path in full_paths], *self._args, **self._kwargs ) - def process_fd12(self): + def process_fd12(self) -> None: full_path, self.uuid.raw = self.download_instrument(largest_only=True) self.uuid.product = fd12p2nc(full_path[0], *self._args, **self._kwargs) class ProcessRainRadar(ProcessInstrument): - def process_mrr_pro(self): + def process_mrr_pro(self) -> None: full_paths, self.uuid.raw = self.download_instrument() full_paths = _unzip_gz_files(full_paths) self.uuid.product = mrr2nc(full_paths, *self._args, **self._kwargs) @@ -1221,8 +1221,8 @@ def _count_lines(filename: Path) -> int: return n_lines -def _check_chm_version(filename: str, identifier: str): - def print_warning(expected: str): +def _check_chm_version(filename: str, identifier: str) -> None: + def print_warning(expected: str) -> None: logging.warning( f"{expected} data submitted with incorrect identifier {identifier}" ) diff --git a/src/processing/metadata_api.py b/src/processing/metadata_api.py index 14a404d1..eab80533 100644 --- a/src/processing/metadata_api.py +++ b/src/processing/metadata_api.py @@ -1,6 +1,7 @@ """Metadata API for Cloudnet files.""" import uuid +from typing import Any import requests @@ -13,13 +14,13 @@ class MetadataApi: def __init__( self, config: Config, session: requests.Session = utils.make_session() - ): + ) -> None: self.config = config self.session = session self._url = config.dataportal_url self._auth = config.data_submission_auth - def get(self, end_point: str, payload: dict | None = None, json: bool = True): + def get(self, end_point: str, payload: dict | None = None, json: bool = True): # noqa: ANN201 """Get Cloudnet metadata.""" url = f"{self._url}/{end_point}" res = self.session.get(url, params=payload) @@ -51,7 +52,7 @@ def delete(self, end_point: str, params: dict | None = None) -> requests.Respons res.raise_for_status() return res - def put_images(self, img_metadata: list, product_uuid: str | uuid.UUID): + def put_images(self, img_metadata: list, product_uuid: str | uuid.UUID) -> None: for data in img_metadata: payload = { "sourceFileId": str(product_uuid), @@ -60,10 +61,10 @@ def put_images(self, img_metadata: list, product_uuid: str | uuid.UUID): } self.put("visualizations", data["s3key"], payload) - def update_dvas_info(self, uuid: uuid.UUID, timestamp: str, dvas_id: str): + def update_dvas_info(self, uuid: uuid.UUID, timestamp: str, dvas_id: str) -> None: payload = {"uuid": str(uuid), "dvasUpdatedAt": timestamp, "dvasId": dvas_id} self.post("files", payload) - def clean_dvas_info(self, uuid: uuid.UUID): + def clean_dvas_info(self, uuid: uuid.UUID) -> None: payload = {"uuid": str(uuid), "dvasUpdatedAt": None, "dvasId": None} self.post("files", payload) diff --git a/src/processing/netcdf_comparer.py b/src/processing/netcdf_comparer.py index 5c02a0a2..3978bcdc 100644 --- a/src/processing/netcdf_comparer.py +++ b/src/processing/netcdf_comparer.py @@ -27,15 +27,16 @@ def __init__( "folding_flag", "nyquist_velocity", ), - ): + ) -> None: self.old_file = old_file self.new_file = new_file self.ignore_vars = ignore_vars def compare(self) -> NCDiff: - with netCDF4.Dataset(self.old_file, "r") as old, netCDF4.Dataset( - self.new_file, "r" - ) as new: + with ( + netCDF4.Dataset(self.old_file, "r") as old, + netCDF4.Dataset(self.new_file, "r") as new, + ): self.old = old self.new = new diff --git a/src/processing/pid_utils.py b/src/processing/pid_utils.py index 932a56c0..497b5386 100644 --- a/src/processing/pid_utils.py +++ b/src/processing/pid_utils.py @@ -13,7 +13,7 @@ class PidUtils: - def __init__(self, config: Config, session: requests.Session | None = None): + def __init__(self, config: Config, session: requests.Session | None = None) -> None: self._pid_service_url = f"{config.pid_service_url}/pid/" self._is_production = config.is_production if session is None: diff --git a/src/processing/processor.py b/src/processing/processor.py index fe09c033..913f7f74 100644 --- a/src/processing/processor.py +++ b/src/processing/processor.py @@ -4,7 +4,6 @@ from pathlib import Path from uuid import UUID -import housekeeping import numpy as np import numpy.typing as npt from cloudnet_api_client import APIClient @@ -19,6 +18,7 @@ from cloudnetpy_qc import quality from cloudnetpy_qc.quality import ErrorLevel +import housekeeping from processing import utils from processing.dvas import Dvas from processing.metadata_api import MetadataApi @@ -74,7 +74,7 @@ def __init__( pid_utils: PidUtils, dvas: Dvas, client: APIClient, - ): + ) -> None: self.md_api = md_api self.storage_api = storage_api self.pid_utils = pid_utils @@ -155,7 +155,7 @@ def download_instrument( exclude_pattern: str | None = None, include_tag_subset: set[str] | None = None, exclude_tag_subset: set[str] | None = None, - allow_empty=False, + allow_empty: bool = False, filename_prefix: str | None = None, filename_suffix: str | None = None, time_offset: datetime.timedelta | None = None, @@ -230,7 +230,7 @@ def upload_file( s3key: str, volatile: bool, patch: bool, - ): + ) -> None: file_info = self.storage_api.upload_product(full_path, s3key, volatile) payload = utils.create_product_put_payload( full_path, @@ -481,7 +481,7 @@ def _get_housekeeping_records(self, params: InstrumentParams) -> list[RawMetadat def _select_halo_doppler_lidar_hkd_records( - records: list[RawMetadata] + records: list[RawMetadata], ) -> list[RawMetadata]: if not records: return [] diff --git a/src/processing/product.py b/src/processing/product.py index c3907db7..b9a7882e 100644 --- a/src/processing/product.py +++ b/src/processing/product.py @@ -26,7 +26,7 @@ def process_product( processor: Processor, params: ProductParams | ModelParams, directory: Path -): +) -> None: uuid = Uuid() pid_to_new_file = None if existing_product := processor.get_product(params): @@ -169,7 +169,7 @@ def _process_mwrpy( def _process_cpr_simulation( processor: Processor, params: ProductParams, uuid: Uuid, directory: Path -): +) -> Path: earthcare_launch_date = datetime.date(2024, 5, 28) if params.date < earthcare_launch_date: raise SkipTaskError( @@ -420,7 +420,7 @@ def _find_instrument_product( if require: fallback = require - def file_key(file: ProductMetadata): + def file_key(file: ProductMetadata) -> int: if ( nominal_instrument_pid and file.instrument is not None @@ -492,7 +492,7 @@ def _get_input_files_for_voodoo( return [str(path) for path in full_paths], uuids -def _update_dvas_metadata(processor: Processor, params: ProductParams): +def _update_dvas_metadata(processor: Processor, params: ProductParams) -> None: metadata = processor.client.files( site_id=params.site.id, date=params.date, diff --git a/src/processing/storage_api.py b/src/processing/storage_api.py index 3356e31a..9a3d4244 100644 --- a/src/processing/storage_api.py +++ b/src/processing/storage_api.py @@ -28,7 +28,7 @@ class StorageApiError(Exception): class StorageApi: """Class for uploading and downloading files from the Cloudnet S3 data archive.""" - def __init__(self, config: Config, session: requests.Session): + def __init__(self, config: Config, session: requests.Session) -> None: self.session = session self.config = config self._url = config.storage_service_url @@ -161,7 +161,7 @@ def _download_url( checksum_algorithm: str, output_path: Path, auth: tuple[str, str], -): +) -> None: if not hasattr(thread_local, "session"): thread_local.session = requests.Session() res_size = 0 diff --git a/src/processing/utils.py b/src/processing/utils.py index 6bda97ce..b1936d99 100644 --- a/src/processing/utils.py +++ b/src/processing/utils.py @@ -23,7 +23,7 @@ class Uuid: __slots__ = ["raw", "product", "volatile"] - def __init__(self): + def __init__(self) -> None: self.raw: list[str] = [] self.product: str = "" self.volatile: str | None = None @@ -32,7 +32,7 @@ def __init__(self): class MiscError(Exception): """Internal exception class.""" - def __init__(self, msg: str): + def __init__(self, msg: str) -> None: self.message = msg super().__init__(self.message) @@ -40,7 +40,7 @@ def __init__(self, msg: str): class RawDataMissingError(Exception): """Internal exception class.""" - def __init__(self, msg: str = "Missing raw data"): + def __init__(self, msg: str = "Missing raw data") -> None: self.message = msg super().__init__(self.message) @@ -48,20 +48,26 @@ def __init__(self, msg: str = "Missing raw data"): class SkipTaskError(Exception): """Unable to complete task for an expected reason.""" - def __init__(self, msg: str): + def __init__(self, msg: str) -> None: self.message = msg super().__init__(self.message) class MyAdapter(HTTPAdapter): - def __init__(self): + def __init__(self) -> None: retry_strategy = Retry( total=10, backoff_factor=0.1, status_forcelist=[504, 524] ) super().__init__(max_retries=retry_strategy) - def send( - self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None + def send( # noqa: ANN201 + self, + request, # noqa: ANN001 + stream=False, # noqa: ANN001 + timeout=None, # noqa: ANN001 + verify=True, # noqa: ANN001 + cert=None, # noqa: ANN001 + proxies=None, # noqa: ANN001 ): if timeout is None: timeout = 120 @@ -143,7 +149,7 @@ def send_slack_alert( logging.fatal(f"Failed to send Slack notification: {body.text}") -def add_global_attributes(full_path: Path, instrument_pid: str | None = None): +def add_global_attributes(full_path: Path, instrument_pid: str | None = None) -> None: """Add cloudnet-processing package version to file attributes.""" with netCDF4.Dataset(full_path, "r+") as nc: nc.cloudnet_processing_version = cloudnet_processing_version @@ -237,7 +243,7 @@ def create_product_put_payload( return payload -def _get_file_format(nc: netCDF4.Dataset): +def _get_file_format(nc: netCDF4.Dataset) -> str: """Returns netCDF file format.""" file_format = nc.file_format.lower() if "netcdf4" in file_format: diff --git a/tests/integration/test.py b/tests/integration/test.py index c945bd27..8ad44d70 100644 --- a/tests/integration/test.py +++ b/tests/integration/test.py @@ -7,6 +7,7 @@ import requests from cloudnet_api_client import APIClient from cloudnet_api_client.containers import Instrument + from processing import utils from processing.config import Config from processing.dvas import Dvas @@ -21,7 +22,7 @@ @pytest.fixture(scope="session") -def processor(): +def processor() -> Processor: session = utils.make_session() md_api = MetadataApi(CONFIG, session) storage_api = StorageApi(CONFIG, session) @@ -59,7 +60,9 @@ class Meta: @pytest.mark.parametrize("meta", meta_list) -def test_instrument_processing(processor: Processor, meta: Meta, tmp_path): +def test_instrument_processing( + processor: Processor, meta: Meta, tmp_path: Path +) -> None: instrument = processor.client.instrument(meta.uuid) _submit_file(meta, instrument) date = datetime.date.fromisoformat(meta.date) diff --git a/tests/unit/test_housekeeping.py b/tests/unit/test_housekeeping.py index 71051758..1e6f2a41 100644 --- a/tests/unit/test_housekeeping.py +++ b/tests/unit/test_housekeeping.py @@ -5,6 +5,7 @@ import numpy as np import pytest from cloudnet_api_client.containers import Instrument, RawMetadata, Site + from housekeeping.housekeeping import get_reader from housekeeping.utils import decode_bits @@ -49,7 +50,7 @@ ), ], ) -def test_something(filename: str, instrument_id: str, measurement_date: str): +def test_something(filename: str, instrument_id: str, measurement_date: str) -> None: path = Path(filename) instrument = Instrument( uuid=uuid.uuid4(), @@ -97,7 +98,7 @@ def test_something(filename: str, instrument_id: str, measurement_date: str): assert len(points) > 0 -def test_decode_bits(): +def test_decode_bits() -> None: values = decode_bits(np.array([0b101010]), [("A", 4), ("_B", 1), ("C", 1)]) assert values == { "A": np.array([0b1010]), diff --git a/tests/unit/test_utils_module.py b/tests/unit/test_utils_module.py index ce8db150..eb87f6fe 100644 --- a/tests/unit/test_utils_module.py +++ b/tests/unit/test_utils_module.py @@ -2,9 +2,11 @@ import netCDF4 import numpy as np +import numpy.typing as npt import pytest from cloudnet_api_client.utils import md5sum, sha256sum from numpy import ma + from processing import netcdf_comparer from processing.netcdf_comparer import NCDiff from processing.storage_api import _get_product_bucket @@ -12,7 +14,7 @@ test_file_path = Path(__file__).parent.absolute() -def test_get_product_bucket(): +def test_get_product_bucket() -> None: assert _get_product_bucket(True) == "cloudnet-product-volatile" assert _get_product_bucket(False) == "cloudnet-product" @@ -20,11 +22,11 @@ def test_get_product_bucket(): class TestHash: file = "tests/data/20201121_bucharest_classification.nc" - def test_md5sum(self): + def test_md5sum(self) -> None: hash_sum = md5sum(self.file) assert hash_sum == "c81d7834d7189facbc5f63416fe5b3da" - def test_sha256sum2(self): + def test_sha256sum2(self) -> None: hash_sum = sha256sum(self.file) assert ( hash_sum @@ -32,7 +34,7 @@ def test_sha256sum2(self): ) -def test_are_identical_nc_files_real_data(): +def test_are_identical_nc_files_real_data() -> None: fname1 = "tests/data/20180703_granada_classification_old.nc" fname2 = "tests/data/20180703_granada_classification.nc" fname3 = "tests/data/20180703_granada_classification_reprocessed.nc" @@ -229,8 +231,15 @@ def test_are_identical_nc_files_real_data(): ], ) def test_are_identical_nc_files_generated_data( - data1, kwargs1, ncattrs1, data2, kwargs2, ncattrs2, expected, tmp_path -): + data1: npt.NDArray, + kwargs1: dict, + ncattrs1: dict, + data2: npt.NDArray, + kwargs2: dict, + ncattrs2: dict, + expected: NCDiff, + tmp_path: Path, +) -> None: fname1 = tmp_path / "old.nc" fname2 = tmp_path / "new.nc" for fname, data, kwargs, ncattrs in zip( @@ -316,7 +325,9 @@ def test_are_identical_nc_files_generated_data( ), ], ) -def test_are_identical_nc_files_global_attributes(tmp_path, attrs1, attrs2, expected): +def test_are_identical_nc_files_global_attributes( + tmp_path: Path, attrs1: dict, attrs2: dict, expected: NCDiff +) -> None: fname1 = tmp_path / "old.nc" fname2 = tmp_path / "new.nc" for fname, attrs in zip((fname1, fname2), (attrs1, attrs2)): @@ -347,7 +358,9 @@ def test_are_identical_nc_files_global_attributes(tmp_path, attrs1, attrs2, expe ), ], ) -def test_compare_variables(array1, array2, expected: NCDiff, tmp_path): +def test_compare_variables( + array1: npt.ArrayLike, array2: npt.ArrayLike, expected: NCDiff, tmp_path: Path +) -> None: temp1 = tmp_path / "file1.nc" temp2 = tmp_path / "file2.nc" with ( @@ -363,7 +376,7 @@ def test_compare_variables(array1, array2, expected: NCDiff, tmp_path): assert netcdf_comparer.nc_difference(temp1, temp2) == expected -def test_missing_variable_in_new_file(tmp_path): +def test_missing_variable_in_new_file(tmp_path: Path) -> None: old_file = tmp_path / "file1.nc" new_file = tmp_path / "file2.nc" with ( @@ -378,7 +391,7 @@ def test_missing_variable_in_new_file(tmp_path): assert netcdf_comparer.nc_difference(old_file, new_file) == NCDiff.MAJOR -def test_missing_variable_in_old_file(tmp_path): +def test_missing_variable_in_old_file(tmp_path: Path) -> None: old_file = tmp_path / "file1.nc" new_file = tmp_path / "file2.nc" with ( @@ -402,7 +415,9 @@ def test_missing_variable_in_old_file(tmp_path): ("f4", "i4", NCDiff.MINOR), ], ) -def test_compare_variable_dtypes(dtype_old, dtype_new, expected, tmp_path): +def test_compare_variable_dtypes( + dtype_old: str, dtype_new: str, expected: NCDiff, tmp_path: Path +) -> None: old_file = tmp_path / "file1.nc" new_file = tmp_path / "file2.nc" with ( @@ -427,7 +442,9 @@ def test_compare_variable_dtypes(dtype_old, dtype_new, expected, tmp_path): (10000, NCDiff.MAJOR), ], ) -def test_compare_variable_values_added(change, expected, tmp_path): +def test_compare_variable_values_added( + change: float, expected: NCDiff, tmp_path: Path +) -> None: old_file = tmp_path / "file1.nc" new_file = tmp_path / "file2.nc" len_data = 100_000 @@ -464,7 +481,9 @@ def test_compare_variable_values_added(change, expected, tmp_path): (2, NCDiff.MAJOR), ], ) -def test_compare_variable_values_multiplied(change, expected, tmp_path): +def test_compare_variable_values_multiplied( + change: float, expected: NCDiff, tmp_path: Path +) -> None: old_file = tmp_path / "file1.nc" new_file = tmp_path / "file2.nc" len_data = 10_000 @@ -485,7 +504,7 @@ def test_compare_variable_values_multiplied(change, expected, tmp_path): assert netcdf_comparer.nc_difference(old_file, new_file) == expected -def test_missing_units_in_new_variable(tmp_path): +def test_missing_units_in_new_variable(tmp_path: Path) -> None: old_file = tmp_path / "file1.nc" new_file = tmp_path / "file2.nc" with ( @@ -512,7 +531,7 @@ def test_missing_units_in_new_variable(tmp_path): (1000, NCDiff.MAJOR), ], ) -def test_compare_masks(n_masked: int, expected: NCDiff, tmp_path): +def test_compare_masks(n_masked: int, expected: NCDiff, tmp_path: Path) -> None: temp1 = tmp_path / "file1.nc" temp2 = tmp_path / "file2.nc" with ( From 3c6be19ae6b455a44d229a3e7aa1b3cd8f7c0d9d Mon Sep 17 00:00:00 2001 From: Tuomas Siipola Date: Wed, 27 Aug 2025 09:59:21 +0300 Subject: [PATCH 7/8] old and new --- scripts/dvas-json.py | 9 +- src/processing/dvas.py | 211 +++++++++++++++++++++++++++++++++++++- src/processing/jobs.py | 6 +- src/processing/product.py | 14 +-- 4 files changed, 225 insertions(+), 15 deletions(-) diff --git a/scripts/dvas-json.py b/scripts/dvas-json.py index 220f85a8..0cc07cc3 100755 --- a/scripts/dvas-json.py +++ b/scripts/dvas-json.py @@ -8,7 +8,7 @@ from cloudnet_api_client import APIClient from processing.config import Config -from processing.dvas import DvasMetadata +from processing.dvas import DvasMetadata, NewDvasMetadata from processing.metadata_api import MetadataApi if __name__ == "__main__": @@ -16,12 +16,17 @@ parser.add_argument( "file_uuid", type=UUID, help="Output DVAS metadata for this file UUID" ) + parser.add_argument("--new", action="store_true") args = parser.parse_args() config = Config() md_api = MetadataApi(config) client = APIClient(config.dataportal_url + "/api") file = client.file(args.file_uuid) - dvas_metadata = DvasMetadata(file, md_api, client) + dvas_metadata = ( + NewDvasMetadata(file, md_api, client) + if args.new + else DvasMetadata(file, md_api, client) + ) dvas_timestamp = datetime.datetime.now(datetime.timezone.utc) dvas_json = dvas_metadata.create_dvas_json(dvas_timestamp) json.dump(dvas_json, sys.stdout, indent=2) diff --git a/src/processing/dvas.py b/src/processing/dvas.py index 7e3cdf58..d8bd7182 100644 --- a/src/processing/dvas.py +++ b/src/processing/dvas.py @@ -6,7 +6,11 @@ import requests from cloudnet_api_client import APIClient -from cloudnet_api_client.containers import ExtendedProductMetadata, VersionMetadata +from cloudnet_api_client.containers import ( + ExtendedProductMetadata, + ProductMetadata, + VersionMetadata, +) from processing import utils from processing.config import Config @@ -116,6 +120,211 @@ def _init_session(self) -> requests.Session: class DvasMetadata: """Create metadata for DVAS API from Cloudnet file metadata""" + def __init__( + self, file: ProductMetadata, md_api: MetadataApi, client: APIClient + ) -> None: + self.file = file + self.md_api = md_api + self.client = client + + def create_dvas_json(self, dvas_timestamp: datetime.datetime) -> dict: + time_begin = self.file.start_time or datetime.datetime.combine( + self.file.measurement_date, datetime.time(0, 0, 0), datetime.timezone.utc + ) + time_end = self.file.stop_time or datetime.datetime.combine( + self.file.measurement_date, + datetime.time(23, 59, 59, 999999), + datetime.timezone.utc, + ) + return { + "md_metadata": { + "file_identifier": self.file.filename, + "language": "en", + "hierarchy_level": "dataset", + "online_resource": {"linkage": "https://cloudnet.fmi.fi/"}, + "datestamp": dvas_timestamp.isoformat(), + "contact": [ + { + "first_name": "Ewan", + "last_name": "O'Connor", + "organisation_name": "Finnish Meteorological Institute (FMI)", + "role_code": ["pointOfContact"], + "country_code": "FI", + } + ], + }, + "md_identification": { + "abstract": self._parse_title(), + "title": self._parse_title(), + "date_type": "creation", + "contact": [ + { + "first_name": "Simo", + "last_name": "Tukiainen", + "organisation_name": "Finnish Meteorological Institute (FMI)", + "role_code": ["processor"], + "country_code": "FI", + } + ], + "online_resource": { + "linkage": f"https://cloudnet.fmi.fi/file/{self.file.uuid}" + }, + "identifier": { + "pid": self.file.pid, + "type": "handle", + }, + "date": time_begin.isoformat(), + }, + "md_constraints": { + "access_constraints": "license", + "use_constraints": "license", + "other_constraints": "N/A", + "data_licence": "CC-BY-4.0", + "metadata_licence": "CC-BY-4.0", + "citation": self._fetch_credits("citation"), + "acknowledgement": self._fetch_credits("acknowledgements"), + }, + "md_keywords": { + "keywords": [ + "FMI", + "ACTRIS", + self.file.product.human_readable_name, + ] + }, + "md_data_identification": { + "language": "en", + "topic_category": "climatologyMeteorologyAtmosphere", + "description": "time series of profile measurements", + "facility_identifier": self.file.site.dvas_id, + }, + "ex_geographic_bounding_box": { + "west_bound_longitude": self.file.site.longitude, + "east_bound_longitude": self.file.site.longitude, + "south_bound_latitude": self.file.site.latitude, + "north_bound_latitude": self.file.site.latitude, + }, + "ex_temporal_extent": { + "time_period_begin": time_begin.isoformat(), + "time_period_end": time_end.isoformat(), + }, + "md_content_information": { + "attribute_descriptions": self._parse_variable_names(), + "content_type": "physicalMeasurement", + }, + "md_distribution_information": [ + { + "data_format": "netcdf", + "version_data_format": self._parse_netcdf_version(), + "dataset_url": self.file.download_url, + "protocol": "HTTP", + "transfersize": self._calc_file_size(), + "description": "Direct download of data file", + "function": "download", + "restriction": { + "set": False, + }, + } + ], + "md_actris_specific": { + "facility_type": "observation platform, fixed", + "product_type": "observation", + "matrix": "cloud phase", + "sub_matrix": None, + "instrument_type": self._find_instrument_types(self.file.uuid), + "program_affiliation": self._parse_affiliation(), + "variable_statistical_property": None, + "legacy_data": self.file.legacy, + "observation_timeliness": self._parse_timeliness(), + "data_product": self._parse_data_product(), + }, + "dq_data_quality_information": { + "level": "dataset", + "compliance": self._parse_compliance(), + "quality_control_extent": "full quality control applied", + "quality_control_outcome": self._parse_qc_outcome(), + }, + } + + def _parse_variable_names(self) -> list[str]: + # https://prod-actris-md.nilu.no/Vocabulary/ContentAttribute + file_vars = self.md_api.get(f"api/products/{self.file.product.id}/variables") + return [v["actrisName"] for v in file_vars if v["actrisName"] is not None] + + def _parse_affiliation(self) -> list[str]: + # https://prod-actris-md.nilu.no/vocabulary/networkprogram + affiliation = ["CLOUDNET"] + if "arm" in self.file.site.type: + affiliation.append("ARM") + if "cloudnet" in self.file.site.type: + affiliation.append("ACTRIS") + return affiliation + + def _find_instrument_types(self, uuid: UUID) -> list[str]: + """Return all source instruments used to create a product. + + Links: + https://vocabulary.actris.nilu.no/actris_vocab/instrumenttype + https://prod-actris-md.nilu.no/vocabulary/instrumenttype + """ + return [i.type for i in self.client.source_instruments(uuid)] + + def _parse_timeliness(self) -> str: + # https://prod-actris-md.nilu.no/vocabulary/observationtimeliness + clu_to_dvas_map = { + "nrt": "near real-time", + "rrt": "real real-time", + "scheduled": "scheduled", + } + return clu_to_dvas_map[self.file.timeliness] + + def _parse_data_product(self) -> str: + """Description of the data product""" + return f"{self._parse_timeliness()} data" + + def _parse_compliance(self) -> str: + return ( + "ACTRIS legacy" + if self.file.measurement_date < datetime.date(2023, 4, 25) + else "ACTRIS associated" + ) + + def _parse_qc_outcome(self) -> str: + outcome_map = { + "pass": "1 - Good", + "info": "3 - Questionable/suspect", + "warning": "3 - Questionable/suspect", + "error": "4 - Bad", + } + unknown_outcome = "2 - Not evaluated, not available or unknown" + if self.file.error_level is None: + return unknown_outcome + return outcome_map.get(self.file.error_level, unknown_outcome) + + def _parse_netcdf_version(self) -> str: + return self.file.format + + def _parse_title(self) -> str: + return ( + f"{self.file.product.human_readable_name} data " + f"derived from cloud remote sensing measurements " + f"at {self.file.site.human_readable_name}" + ) + + def _calc_file_size(self) -> float: + file_size = int(self.file.size) / 1000 / 1000 # MB + return round(file_size, 3) + + def _fetch_credits(self, type: Literal["citation", "acknowledgements"]) -> str: + params = {"format": "txt"} + response = self.md_api.get( + f"api/reference/{self.file.uuid}/{type}", params, json=False + ) + return response.text + + +class NewDvasMetadata: + """Create metadata for DVAS API from Cloudnet file metadata""" + def __init__( self, file: ExtendedProductMetadata, md_api: MetadataApi, client: APIClient ) -> None: diff --git a/src/processing/jobs.py b/src/processing/jobs.py index 64f691c5..93c15423 100644 --- a/src/processing/jobs.py +++ b/src/processing/jobs.py @@ -89,7 +89,8 @@ def freeze(processor: Processor, params: ProcessParams, directory: Path) -> None processor.storage_api.delete_volatile_product(s3key) metadata = processor.client.file(file_uuid) if processor.md_api.config.is_production: - processor.dvas.upload(metadata) + ext_metadata = processor.client.file(metadata.uuid) + processor.dvas.upload(ext_metadata) def upload_to_dvas(processor: Processor, params: ProcessParams) -> None: @@ -98,7 +99,8 @@ def upload_to_dvas(processor: Processor, params: ProcessParams) -> None: raise utils.SkipTaskError("Product not found") if metadata.dvas_id: raise utils.SkipTaskError("Already uploaded to DVAS") - processor.dvas.upload(metadata) + ext_metadata = processor.client.file(metadata.uuid) + processor.dvas.upload(ext_metadata) logging.info("Uploaded to DVAS") diff --git a/src/processing/product.py b/src/processing/product.py index b9a7882e..e437bcdb 100644 --- a/src/processing/product.py +++ b/src/processing/product.py @@ -112,7 +112,7 @@ def process_product( ) utils.print_info(uuid, volatile, patch, upload, qc_result) if processor.md_api.config.is_production and isinstance(params, ProductParams): - _update_dvas_metadata(processor, params) + _update_dvas_metadata(processor, UUID(uuid.product)) def _generate_filename(params: ProductParams | ModelParams) -> str: @@ -492,15 +492,9 @@ def _get_input_files_for_voodoo( return [str(path) for path in full_paths], uuids -def _update_dvas_metadata(processor: Processor, params: ProductParams) -> None: - metadata = processor.client.files( - site_id=params.site.id, - date=params.date, - product_id=params.product.id, - instrument_pid=params.instrument.pid if params.instrument else None, - ) - if metadata: - processor.dvas.upload(metadata[0]) +def _update_dvas_metadata(processor: Processor, uuid: UUID) -> None: + meta = processor.client.file(uuid) + processor.dvas.upload(meta) def _check_response(metadata: list[ProductMetadata], product: str) -> None: From 7258607d1ca871527a5fd3f07a8845a204efadde Mon Sep 17 00:00:00 2001 From: Tuomas Siipola Date: Wed, 27 Aug 2025 10:10:50 +0300 Subject: [PATCH 8/8] Fix check --- src/processing/dvas.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/processing/dvas.py b/src/processing/dvas.py index d8bd7182..f83597ba 100644 --- a/src/processing/dvas.py +++ b/src/processing/dvas.py @@ -50,7 +50,13 @@ def upload(self, file: ExtendedProductMetadata) -> None: dvas_metadata = DvasMetadata(file, self.md_api, self.client) dvas_timestamp = datetime.datetime.now(datetime.timezone.utc) dvas_json = dvas_metadata.create_dvas_json(dvas_timestamp) - if not dvas_json["variables"]: + if ( + isinstance(dvas_metadata, DvasMetadata) + and not dvas_json["md_content_information"]["attribute_descriptions"] + ) or ( + isinstance(dvas_metadata, NewDvasMetadata) + and not dvas_json["variables"] + ): logging.error("Skipping - no ACTRIS variables") return self._delete_old_versions(file)