diff --git a/scripts/dvas-json.py b/scripts/dvas-json.py index 5e8ce89b..0cc07cc3 100755 --- a/scripts/dvas-json.py +++ b/scripts/dvas-json.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import argparse +import datetime import json import sys from uuid import UUID @@ -7,7 +8,7 @@ from cloudnet_api_client import APIClient from processing.config import Config -from processing.dvas import DvasMetadata +from processing.dvas import DvasMetadata, NewDvasMetadata from processing.metadata_api import MetadataApi if __name__ == "__main__": @@ -15,11 +16,17 @@ parser.add_argument( "file_uuid", type=UUID, help="Output DVAS metadata for this file UUID" ) + parser.add_argument("--new", action="store_true") args = parser.parse_args() config = Config() md_api = MetadataApi(config) client = APIClient(config.dataportal_url + "/api") file = client.file(args.file_uuid) - dvas_metadata = DvasMetadata(file, md_api, client) - dvas_json = dvas_metadata.create_dvas_json() + dvas_metadata = ( + NewDvasMetadata(file, md_api, client) + if args.new + else DvasMetadata(file, md_api, client) + ) + dvas_timestamp = datetime.datetime.now(datetime.timezone.utc) + dvas_json = dvas_metadata.create_dvas_json(dvas_timestamp) json.dump(dvas_json, sys.stdout, indent=2) diff --git a/src/processing/dvas.py b/src/processing/dvas.py index 54ebd5d7..f83597ba 100644 --- a/src/processing/dvas.py +++ b/src/processing/dvas.py @@ -6,7 +6,11 @@ import requests from cloudnet_api_client import APIClient -from cloudnet_api_client.containers import ProductMetadata, VersionMetadata +from cloudnet_api_client.containers import ( + ExtendedProductMetadata, + ProductMetadata, + VersionMetadata, +) from processing import utils from processing.config import Config @@ -26,7 +30,7 @@ def __init__(self, config: Config, md_api: MetadataApi, client: APIClient) -> No self.session = self._init_session() self.client = client - def upload(self, file: ProductMetadata) -> None: + def upload(self, file: ExtendedProductMetadata) -> None: """Upload Cloudnet file metadata to DVAS API and update Cloudnet data portal""" landing_page_url = utils.build_file_landing_page_url(file.uuid) logging.info(f"Uploading {landing_page_url} metadata to DVAS") @@ -44,15 +48,20 @@ def upload(self, file: ProductMetadata) -> None: return try: dvas_metadata = DvasMetadata(file, self.md_api, self.client) - dvas_json = dvas_metadata.create_dvas_json() - if len(dvas_json["md_content_information"]["attribute_descriptions"]) == 0: + dvas_timestamp = datetime.datetime.now(datetime.timezone.utc) + dvas_json = dvas_metadata.create_dvas_json(dvas_timestamp) + if ( + isinstance(dvas_metadata, DvasMetadata) + and not dvas_json["md_content_information"]["attribute_descriptions"] + ) or ( + isinstance(dvas_metadata, NewDvasMetadata) + and not dvas_json["variables"] + ): logging.error("Skipping - no ACTRIS variables") return self._delete_old_versions(file) dvas_id = self._post(dvas_json) - self.md_api.update_dvas_info( - file.uuid, dvas_json["md_metadata"]["datestamp"], dvas_id - ) + self.md_api.update_dvas_info(file.uuid, dvas_timestamp, dvas_id) except DvasError: logging.exception(f"Failed to upload {file.filename} to DVAS") @@ -80,7 +89,7 @@ def _delete(self, url: str) -> None: raise DvasError(res) logging.debug(f"DELETE successful: {res.status_code} {res.text}") - def _delete_old_versions(self, file: ProductMetadata) -> None: + def _delete_old_versions(self, file: ExtendedProductMetadata) -> None: """Delete all versions of the given file from DVAS API. To be used before posting new version.""" versions = self.client.versions(file.uuid) for version in versions: @@ -124,7 +133,7 @@ def __init__( self.md_api = md_api self.client = client - def create_dvas_json(self, dvas_timestamp: datetime.datetime | None = None) -> dict: + def create_dvas_json(self, dvas_timestamp: datetime.datetime) -> dict: time_begin = self.file.start_time or datetime.datetime.combine( self.file.measurement_date, datetime.time(0, 0, 0), datetime.timezone.utc ) @@ -133,8 +142,6 @@ def create_dvas_json(self, dvas_timestamp: datetime.datetime | None = None) -> d datetime.time(23, 59, 59, 999999), datetime.timezone.utc, ) - if dvas_timestamp is None: - dvas_timestamp = utils.utcnow() return { "md_metadata": { "file_identifier": self.file.filename, @@ -319,3 +326,204 @@ def _fetch_credits(self, type: Literal["citation", "acknowledgements"]) -> str: f"api/reference/{self.file.uuid}/{type}", params, json=False ) return response.text + + +class NewDvasMetadata: + """Create metadata for DVAS API from Cloudnet file metadata""" + + def __init__( + self, file: ExtendedProductMetadata, md_api: MetadataApi, client: APIClient + ) -> None: + self.file = file + self.md_api = md_api + self.client = client + + def create_dvas_json(self, timestamp: datetime.datetime) -> dict: + time_begin = self.file.start_time or datetime.datetime.combine( + self.file.measurement_date, datetime.time(0, 0, 0), datetime.timezone.utc + ) + time_end = self.file.stop_time or datetime.datetime.combine( + self.file.measurement_date, + datetime.time(23, 59, 59, 999999), + datetime.timezone.utc, + ) + timeliness = self._parse_timeliness() + instruments = self.client.source_instruments(self.file.uuid) + compliance = self._parse_compliance() + qc_outcome = self._parse_qc_outcome() + frameworks = self._parse_frameworks() + return { + "dataset_metadata": { + "repository": {"repository_id": "CLU"}, + "time_file_created": self.file.created_at.isoformat(), + "time_metadata_created": timestamp.isoformat(), + "time_content_revised": self.file.updated_at.isoformat(), + }, + "identification": { + "identifier": {"pid": self.file.pid, "pid_type": "ePIC"}, + "title": self._parse_title(), + "abstract": self._parse_title(), + "roles": [ + { + "role_code": ["pointOfContact"], + "person": { + "first_name": "Ewan", + "last_name": "O'Connor", + "affiliation": { + "name": "Finnish Meteorological Institute", + "pid": "https://ror.org/05hppb561", + "pid_type": "other PID", + "country_code": "FI", + }, + "orcid": "https://orcid.org/0000-0001-9834-5100", + }, + }, + { + "role_code": ["processor"], + "person": { + "first_name": "Simo", + "last_name": "Tukiainen", + "affiliation": { + "name": "Finnish Meteorological Institute", + "pid": "https://ror.org/05hppb561", + "pid_type": "other PID", + "country_code": "FI", + }, + "orcid": "https://orcid.org/0000-0002-0651-4622", + }, + }, + ], + }, + "usage_information": { + "data_licence": "CC-BY-4.0", + "metadata_licence": "CC0-1.0", + "citation": self._fetch_credits("citation"), + "acknowledgement": self._fetch_credits("acknowledgements"), + }, + "product_type": "observation", + "facility": { + "identifier": self.file.site.dvas_id, + }, + "spatial_extent": { + "type": "LineString", + "coordinates": [ + [ + self.file.site.longitude, + self.file.site.latitude, + self.file.site.altitude, + ], + [ + self.file.site.longitude, + self.file.site.latitude, + self.file.site.altitude + 12_000, + ], + ], + }, + "temporal_extent": { + "time_period_begin": time_begin.isoformat(), + "time_period_end": time_end.isoformat(), + }, + "variables": [ + { + "variable_name": variable_name, + "variable_matrix": "cloud phase", + "variable_geometry": "atmospheric vertical profile", + "timeliness": timeliness, + "instrument": [ + { + "instrument_pid": instrument.pid, + "instrument_type": instrument.type, + "instrument_name": instrument.name, + } + for instrument in instruments + ], + "data_quality_control": [ + { + "compliance": compliance, + "quality_control_extent": "full quality control applied", + "quality_control_mechanism": "automatic quality control", + "quality_control_outcome": qc_outcome, + } + ], + "framework": [{"framework": framework} for framework in frameworks], + "temporal_resolution": "P30S", + } + for variable_name in self._parse_variable_names() + ], + "distribution_information": [ + { + "data_format": self._parse_netcdf_version(), + "dataset_url": self.file.download_url, + "protocol": "HTTP", + "access_restriction": { + "restricted": False, + }, + "transfersize": {"size": self.file.size, "unit": "B"}, + } + ], + "provenance": [ + { + "title": software.title, + "url": software.url, + } + for software in self.file.software + ], + } + + def _parse_variable_names(self) -> list[str]: + # https://prod-actris-md.nilu.no/Vocabulary/ContentAttribute + file_vars = self.md_api.get(f"api/products/{self.file.product.id}/variables") + return [v["actrisName"] for v in file_vars if v["actrisName"] is not None] + + def _parse_frameworks(self) -> list[str]: + affiliation = ["CLOUDNET"] + if "arm" in self.file.site.type: + affiliation.append("ARM") + if "cloudnet" in self.file.site.type: + affiliation.append("ACTRIS") + return affiliation + + def _parse_timeliness(self) -> str: + # https://prod-actris-md.nilu.no/vocabulary/observationtimeliness + clu_to_dvas_map = { + "nrt": "near real-time", + "rrt": "real real-time", + "scheduled": "scheduled", + } + return clu_to_dvas_map[self.file.timeliness] + + def _parse_compliance(self) -> str: + return ( + "ACTRIS legacy" + if self.file.measurement_date < datetime.date(2023, 4, 25) + else "ACTRIS associated" + ) + + def _parse_qc_outcome(self) -> str: + outcome_map = { + "pass": "1 - Good", + "info": "3 - Questionable/suspect", + "warning": "3 - Questionable/suspect", + "error": "4 - Bad", + } + unknown_outcome = "2 - Not evaluated, not available or unknown" + if self.file.error_level is None: + return unknown_outcome + return outcome_map.get(self.file.error_level, unknown_outcome) + + def _parse_netcdf_version(self) -> str: + return self.file.format + + def _parse_title(self) -> str: + return ( + f"{self.file.product.human_readable_name} data " + f"derived from cloud remote sensing measurements " + f"at {self.file.site.human_readable_name}" + ) + + def _fetch_credits(self, type: Literal["citation", "acknowledgements"]) -> str: + params = {"format": "txt"} + response = self.md_api.get( + f"api/reference/{self.file.uuid}/{type}", params, json=False + ) + return response.text diff --git a/src/processing/jobs.py b/src/processing/jobs.py index 64f691c5..93c15423 100644 --- a/src/processing/jobs.py +++ b/src/processing/jobs.py @@ -89,7 +89,8 @@ def freeze(processor: Processor, params: ProcessParams, directory: Path) -> None processor.storage_api.delete_volatile_product(s3key) metadata = processor.client.file(file_uuid) if processor.md_api.config.is_production: - processor.dvas.upload(metadata) + ext_metadata = processor.client.file(metadata.uuid) + processor.dvas.upload(ext_metadata) def upload_to_dvas(processor: Processor, params: ProcessParams) -> None: @@ -98,7 +99,8 @@ def upload_to_dvas(processor: Processor, params: ProcessParams) -> None: raise utils.SkipTaskError("Product not found") if metadata.dvas_id: raise utils.SkipTaskError("Already uploaded to DVAS") - processor.dvas.upload(metadata) + ext_metadata = processor.client.file(metadata.uuid) + processor.dvas.upload(ext_metadata) logging.info("Uploaded to DVAS") diff --git a/src/processing/metadata_api.py b/src/processing/metadata_api.py index eab80533..0199f5fb 100644 --- a/src/processing/metadata_api.py +++ b/src/processing/metadata_api.py @@ -1,5 +1,6 @@ """Metadata API for Cloudnet files.""" +import datetime import uuid from typing import Any @@ -61,8 +62,14 @@ def put_images(self, img_metadata: list, product_uuid: str | uuid.UUID) -> None: } self.put("visualizations", data["s3key"], payload) - def update_dvas_info(self, uuid: uuid.UUID, timestamp: str, dvas_id: str) -> None: - payload = {"uuid": str(uuid), "dvasUpdatedAt": timestamp, "dvasId": dvas_id} + def update_dvas_info( + self, uuid: uuid.UUID, timestamp: datetime.datetime, dvas_id: str + ) -> None: + payload = { + "uuid": str(uuid), + "dvasUpdatedAt": timestamp.isoformat(), + "dvasId": dvas_id, + } self.post("files", payload) def clean_dvas_info(self, uuid: uuid.UUID) -> None: diff --git a/src/processing/product.py b/src/processing/product.py index b9a7882e..e437bcdb 100644 --- a/src/processing/product.py +++ b/src/processing/product.py @@ -112,7 +112,7 @@ def process_product( ) utils.print_info(uuid, volatile, patch, upload, qc_result) if processor.md_api.config.is_production and isinstance(params, ProductParams): - _update_dvas_metadata(processor, params) + _update_dvas_metadata(processor, UUID(uuid.product)) def _generate_filename(params: ProductParams | ModelParams) -> str: @@ -492,15 +492,9 @@ def _get_input_files_for_voodoo( return [str(path) for path in full_paths], uuids -def _update_dvas_metadata(processor: Processor, params: ProductParams) -> None: - metadata = processor.client.files( - site_id=params.site.id, - date=params.date, - product_id=params.product.id, - instrument_pid=params.instrument.pid if params.instrument else None, - ) - if metadata: - processor.dvas.upload(metadata[0]) +def _update_dvas_metadata(processor: Processor, uuid: UUID) -> None: + meta = processor.client.file(uuid) + processor.dvas.upload(meta) def _check_response(metadata: list[ProductMetadata], product: str) -> None: