Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions scripts/dvas-json.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
#!/usr/bin/env python3
import argparse
import datetime
import json
import sys
from uuid import UUID

from cloudnet_api_client import APIClient

from processing.config import Config
from processing.dvas import DvasMetadata
from processing.dvas import DvasMetadata, NewDvasMetadata
from processing.metadata_api import MetadataApi

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"file_uuid", type=UUID, help="Output DVAS metadata for this file UUID"
)
parser.add_argument("--new", action="store_true")
args = parser.parse_args()
config = Config()
md_api = MetadataApi(config)
client = APIClient(config.dataportal_url + "/api")
file = client.file(args.file_uuid)
dvas_metadata = DvasMetadata(file, md_api, client)
dvas_json = dvas_metadata.create_dvas_json()
dvas_metadata = (
NewDvasMetadata(file, md_api, client)
if args.new
else DvasMetadata(file, md_api, client)
)
dvas_timestamp = datetime.datetime.now(datetime.timezone.utc)
dvas_json = dvas_metadata.create_dvas_json(dvas_timestamp)
json.dump(dvas_json, sys.stdout, indent=2)
230 changes: 219 additions & 11 deletions src/processing/dvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

import requests
from cloudnet_api_client import APIClient
from cloudnet_api_client.containers import ProductMetadata, VersionMetadata
from cloudnet_api_client.containers import (
ExtendedProductMetadata,
ProductMetadata,
VersionMetadata,
)

from processing import utils
from processing.config import Config
Expand All @@ -26,7 +30,7 @@ def __init__(self, config: Config, md_api: MetadataApi, client: APIClient) -> No
self.session = self._init_session()
self.client = client

def upload(self, file: ProductMetadata) -> None:
def upload(self, file: ExtendedProductMetadata) -> None:
"""Upload Cloudnet file metadata to DVAS API and update Cloudnet data portal"""
landing_page_url = utils.build_file_landing_page_url(file.uuid)
logging.info(f"Uploading {landing_page_url} metadata to DVAS")
Expand All @@ -44,15 +48,20 @@ def upload(self, file: ProductMetadata) -> None:
return
try:
dvas_metadata = DvasMetadata(file, self.md_api, self.client)
dvas_json = dvas_metadata.create_dvas_json()
if len(dvas_json["md_content_information"]["attribute_descriptions"]) == 0:
dvas_timestamp = datetime.datetime.now(datetime.timezone.utc)
dvas_json = dvas_metadata.create_dvas_json(dvas_timestamp)
if (
isinstance(dvas_metadata, DvasMetadata)
and not dvas_json["md_content_information"]["attribute_descriptions"]
) or (
isinstance(dvas_metadata, NewDvasMetadata)
and not dvas_json["variables"]
):
logging.error("Skipping - no ACTRIS variables")
return
self._delete_old_versions(file)
dvas_id = self._post(dvas_json)
self.md_api.update_dvas_info(
file.uuid, dvas_json["md_metadata"]["datestamp"], dvas_id
)
self.md_api.update_dvas_info(file.uuid, dvas_timestamp, dvas_id)
except DvasError:
logging.exception(f"Failed to upload {file.filename} to DVAS")

Expand Down Expand Up @@ -80,7 +89,7 @@ def _delete(self, url: str) -> None:
raise DvasError(res)
logging.debug(f"DELETE successful: {res.status_code} {res.text}")

def _delete_old_versions(self, file: ProductMetadata) -> None:
def _delete_old_versions(self, file: ExtendedProductMetadata) -> None:
"""Delete all versions of the given file from DVAS API. To be used before posting new version."""
versions = self.client.versions(file.uuid)
for version in versions:
Expand Down Expand Up @@ -124,7 +133,7 @@ def __init__(
self.md_api = md_api
self.client = client

def create_dvas_json(self, dvas_timestamp: datetime.datetime | None = None) -> dict:
def create_dvas_json(self, dvas_timestamp: datetime.datetime) -> dict:
time_begin = self.file.start_time or datetime.datetime.combine(
self.file.measurement_date, datetime.time(0, 0, 0), datetime.timezone.utc
)
Expand All @@ -133,8 +142,6 @@ def create_dvas_json(self, dvas_timestamp: datetime.datetime | None = None) -> d
datetime.time(23, 59, 59, 999999),
datetime.timezone.utc,
)
if dvas_timestamp is None:
dvas_timestamp = utils.utcnow()
return {
"md_metadata": {
"file_identifier": self.file.filename,
Expand Down Expand Up @@ -319,3 +326,204 @@ def _fetch_credits(self, type: Literal["citation", "acknowledgements"]) -> str:
f"api/reference/{self.file.uuid}/{type}", params, json=False
)
return response.text


class NewDvasMetadata:
"""Create metadata for DVAS API from Cloudnet file metadata"""

def __init__(
self, file: ExtendedProductMetadata, md_api: MetadataApi, client: APIClient
) -> None:
self.file = file
self.md_api = md_api
self.client = client

def create_dvas_json(self, timestamp: datetime.datetime) -> dict:
time_begin = self.file.start_time or datetime.datetime.combine(
self.file.measurement_date, datetime.time(0, 0, 0), datetime.timezone.utc
)
time_end = self.file.stop_time or datetime.datetime.combine(
self.file.measurement_date,
datetime.time(23, 59, 59, 999999),
datetime.timezone.utc,
)
timeliness = self._parse_timeliness()
instruments = self.client.source_instruments(self.file.uuid)
compliance = self._parse_compliance()
qc_outcome = self._parse_qc_outcome()
frameworks = self._parse_frameworks()
return {
"dataset_metadata": {
"repository": {"repository_id": "CLU"},
"time_file_created": self.file.created_at.isoformat(),
"time_metadata_created": timestamp.isoformat(),
"time_content_revised": self.file.updated_at.isoformat(),
},
"identification": {
"identifier": {"pid": self.file.pid, "pid_type": "ePIC"},
"title": self._parse_title(),
"abstract": self._parse_title(),
"roles": [
{
"role_code": ["pointOfContact"],
"person": {
"first_name": "Ewan",
"last_name": "O'Connor",
"affiliation": {
"name": "Finnish Meteorological Institute",
"pid": "https://ror.org/05hppb561",
"pid_type": "other PID",
"country_code": "FI",
},
"orcid": "https://orcid.org/0000-0001-9834-5100",
},
},
{
"role_code": ["processor"],
"person": {
"first_name": "Simo",
"last_name": "Tukiainen",
"affiliation": {
"name": "Finnish Meteorological Institute",
"pid": "https://ror.org/05hppb561",
"pid_type": "other PID",
"country_code": "FI",
},
"orcid": "https://orcid.org/0000-0002-0651-4622",
},
},
],
},
"usage_information": {
"data_licence": "CC-BY-4.0",
"metadata_licence": "CC0-1.0",
"citation": self._fetch_credits("citation"),
"acknowledgement": self._fetch_credits("acknowledgements"),
},
"product_type": "observation",
"facility": {
"identifier": self.file.site.dvas_id,
},
"spatial_extent": {
"type": "LineString",
"coordinates": [
[
self.file.site.longitude,
self.file.site.latitude,
self.file.site.altitude,
],
[
self.file.site.longitude,
self.file.site.latitude,
self.file.site.altitude + 12_000,
],
],
},
"temporal_extent": {
"time_period_begin": time_begin.isoformat(),
"time_period_end": time_end.isoformat(),
},
"variables": [
{
"variable_name": variable_name,
"variable_matrix": "cloud phase",
"variable_geometry": "atmospheric vertical profile",
"timeliness": timeliness,
"instrument": [
{
"instrument_pid": instrument.pid,
"instrument_type": instrument.type,
"instrument_name": instrument.name,
}
for instrument in instruments
],
"data_quality_control": [
{
"compliance": compliance,
"quality_control_extent": "full quality control applied",
"quality_control_mechanism": "automatic quality control",
"quality_control_outcome": qc_outcome,
}
],
"framework": [{"framework": framework} for framework in frameworks],
"temporal_resolution": "P30S",
}
for variable_name in self._parse_variable_names()
],
"distribution_information": [
{
"data_format": self._parse_netcdf_version(),
"dataset_url": self.file.download_url,
"protocol": "HTTP",
"access_restriction": {
"restricted": False,
},
"transfersize": {"size": self.file.size, "unit": "B"},
}
],
"provenance": [
{
"title": software.title,
"url": software.url,
}
for software in self.file.software
],
}

def _parse_variable_names(self) -> list[str]:
# https://prod-actris-md.nilu.no/Vocabulary/ContentAttribute
file_vars = self.md_api.get(f"api/products/{self.file.product.id}/variables")
return [v["actrisName"] for v in file_vars if v["actrisName"] is not None]

def _parse_frameworks(self) -> list[str]:
affiliation = ["CLOUDNET"]
if "arm" in self.file.site.type:
affiliation.append("ARM")
if "cloudnet" in self.file.site.type:
affiliation.append("ACTRIS")
return affiliation

def _parse_timeliness(self) -> str:
# https://prod-actris-md.nilu.no/vocabulary/observationtimeliness
clu_to_dvas_map = {
"nrt": "near real-time",
"rrt": "real real-time",
"scheduled": "scheduled",
}
return clu_to_dvas_map[self.file.timeliness]

def _parse_compliance(self) -> str:
return (
"ACTRIS legacy"
if self.file.measurement_date < datetime.date(2023, 4, 25)
else "ACTRIS associated"
)

def _parse_qc_outcome(self) -> str:
outcome_map = {
"pass": "1 - Good",
"info": "3 - Questionable/suspect",
"warning": "3 - Questionable/suspect",
"error": "4 - Bad",
}
unknown_outcome = "2 - Not evaluated, not available or unknown"
if self.file.error_level is None:
return unknown_outcome
return outcome_map.get(self.file.error_level, unknown_outcome)

def _parse_netcdf_version(self) -> str:
return self.file.format

def _parse_title(self) -> str:
return (
f"{self.file.product.human_readable_name} data "
f"derived from cloud remote sensing measurements "
f"at {self.file.site.human_readable_name}"
)

def _fetch_credits(self, type: Literal["citation", "acknowledgements"]) -> str:
params = {"format": "txt"}
response = self.md_api.get(
f"api/reference/{self.file.uuid}/{type}", params, json=False
)
return response.text
6 changes: 4 additions & 2 deletions src/processing/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def freeze(processor: Processor, params: ProcessParams, directory: Path) -> None
processor.storage_api.delete_volatile_product(s3key)
metadata = processor.client.file(file_uuid)
if processor.md_api.config.is_production:
processor.dvas.upload(metadata)
ext_metadata = processor.client.file(metadata.uuid)
processor.dvas.upload(ext_metadata)


def upload_to_dvas(processor: Processor, params: ProcessParams) -> None:
Expand All @@ -98,7 +99,8 @@ def upload_to_dvas(processor: Processor, params: ProcessParams) -> None:
raise utils.SkipTaskError("Product not found")
if metadata.dvas_id:
raise utils.SkipTaskError("Already uploaded to DVAS")
processor.dvas.upload(metadata)
ext_metadata = processor.client.file(metadata.uuid)
processor.dvas.upload(ext_metadata)
logging.info("Uploaded to DVAS")


Expand Down
11 changes: 9 additions & 2 deletions src/processing/metadata_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Metadata API for Cloudnet files."""

import datetime
import uuid
from typing import Any

Expand Down Expand Up @@ -61,8 +62,14 @@ def put_images(self, img_metadata: list, product_uuid: str | uuid.UUID) -> None:
}
self.put("visualizations", data["s3key"], payload)

def update_dvas_info(self, uuid: uuid.UUID, timestamp: str, dvas_id: str) -> None:
payload = {"uuid": str(uuid), "dvasUpdatedAt": timestamp, "dvasId": dvas_id}
def update_dvas_info(
self, uuid: uuid.UUID, timestamp: datetime.datetime, dvas_id: str
) -> None:
payload = {
"uuid": str(uuid),
"dvasUpdatedAt": timestamp.isoformat(),
"dvasId": dvas_id,
}
self.post("files", payload)

def clean_dvas_info(self, uuid: uuid.UUID) -> None:
Expand Down
14 changes: 4 additions & 10 deletions src/processing/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def process_product(
)
utils.print_info(uuid, volatile, patch, upload, qc_result)
if processor.md_api.config.is_production and isinstance(params, ProductParams):
_update_dvas_metadata(processor, params)
_update_dvas_metadata(processor, UUID(uuid.product))


def _generate_filename(params: ProductParams | ModelParams) -> str:
Expand Down Expand Up @@ -492,15 +492,9 @@ def _get_input_files_for_voodoo(
return [str(path) for path in full_paths], uuids


def _update_dvas_metadata(processor: Processor, params: ProductParams) -> None:
metadata = processor.client.files(
site_id=params.site.id,
date=params.date,
product_id=params.product.id,
instrument_pid=params.instrument.pid if params.instrument else None,
)
if metadata:
processor.dvas.upload(metadata[0])
def _update_dvas_metadata(processor: Processor, uuid: UUID) -> None:
meta = processor.client.file(uuid)
processor.dvas.upload(meta)


def _check_response(metadata: list[ProductMetadata], product: str) -> None:
Expand Down