From 28719f134a107e5e5ea22af1637f1e5e49ae12b2 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Mon, 19 May 2025 11:55:00 -0700 Subject: [PATCH 1/6] fix: require role arn and session name in daac config --- .../daac_archiver/daac_archiver_logic.py | 31 +++++++++++++++++-- .../lib/uds_db/archive_index.py | 5 ++- .../uds_api/dapa/daac_archive_crud.py | 2 ++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py index 8b372809..0419a9b8 100644 --- a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py +++ b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py @@ -50,6 +50,21 @@ def get_cnm_response_json_file(self, potential_file, granule_id): FileUtils.remove_if_exists(local_file) return cnm_response_json + @staticmethod + def revert_to_s3_url(input_url): + if input_url.startswith("s3://"): + return input_url + if input_url.startswith("http://") or input_url.startswith("https://"): + parts = input_url.split('/', 3) + if len(parts) < 4: + ValueError(f'invalid url: {input_url}') + path_parts = parts[3].split('/', 1) + if len(path_parts) != 2: + ValueError(f'invalid url: {input_url}') + bucket, key = path_parts + return f"s3://{bucket}/{key}" + raise ValueError(f'unknown schema: {input_url}') + def __extract_files(self, uds_cnm_json: dict, daac_config: dict): granule_files = uds_cnm_json['product']['files'] if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1: @@ -57,6 +72,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict): archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']} result_files = [] for each_file in granule_files: + print(f'each_file: {each_file}') """ { "type": "data", @@ -71,6 +87,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict): if each_file['type'] not in archiving_types: continue file_extensions = archiving_types[each_file['type']] + each_file['uri'] = self.revert_to_s3_url(each_file['uri']) if len(file_extensions) < 1: result_files.append(each_file) # TODO remove missing md5? temp_filename = each_file['name'].upper().strip() @@ -86,21 +103,29 @@ def send_to_daac_internal(self, uds_cnm_json: dict): LOGGER.debug(f'uds_cnm_json is not configured for archival. uds_cnm_json: {uds_cnm_json}') return daac_config = daac_config[0] # TODO This is currently not supporting more than 1 daac. + result = JsonValidator(UdsArchiveConfigIndex.basic_schema).validate(daac_config) + if result is not None: + raise ValueError(f'daac_config does not have valid schema. Pls re-add the daac config: {result} for {daac_config}') try: self.__sns.set_topic_arn(daac_config['daac_sns_topic_arn']) daac_cnm_message = { - "collection": daac_config['daac_collection_name'], + "collection": { + 'name': daac_config['daac_collection_name'], + 'version': daac_config['daac_data_version'], + }, "identifier": uds_cnm_json['identifier'], "submissionTime": f'{TimeUtils.get_current_time()}Z', "provider": granule_identifier.tenant, "version": "1.6.0", # TODO this is hardcoded? "product": { "name": granule_identifier.id, - "dataVersion": daac_config['daac_data_version'], + # "dataVersion": daac_config['daac_data_version'], 'files': self.__extract_files(uds_cnm_json, daac_config), } } - self.__sns.publish_message(json.dumps(daac_cnm_message)) + print(f'uds_cnm_json: {uds_cnm_json}') + print(f'daac_cnm_message: {daac_cnm_message}') + self.__sns.set_external_role(daac_config['daac_role_arn'], daac_config['daac_role_session_name']).publish_message(json.dumps(daac_cnm_message), True) self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, { 'archive_status': 'cnm_s_success', 'archive_error_message': '', diff --git a/cumulus_lambda_functions/lib/uds_db/archive_index.py b/cumulus_lambda_functions/lib/uds_db/archive_index.py index b42eea72..e73c0337 100644 --- a/cumulus_lambda_functions/lib/uds_db/archive_index.py +++ b/cumulus_lambda_functions/lib/uds_db/archive_index.py @@ -15,11 +15,14 @@ class UdsArchiveConfigIndex: basic_schema = { 'type': 'object', "additionalProperties": False, - 'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'collection', 'ss_username', 'archiving_types'], + 'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'daac_role_arn', 'daac_role_session_name', + 'collection', 'ss_username', 'archiving_types'], 'properties': { 'daac_collection_id': {'type': 'string'}, 'daac_sns_topic_arn': {'type': 'string'}, 'daac_data_version': {'type': 'string'}, + 'daac_role_arn': {'type': 'string'}, + 'daac_role_session_name': {'type': 'string'}, 'collection': {'type': 'string'}, 'ss_username': {'type': 'string'}, 'archiving_types': {'type': 'array', 'items': {'type': 'object'}}, diff --git a/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py b/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py index 2998840e..62c9f6b2 100644 --- a/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py +++ b/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py @@ -26,6 +26,8 @@ class DaacAddModel(BaseModel): daac_collection_id: str daac_data_version: str daac_sns_topic_arn: str + daac_role_arn: str + daac_role_session_name: str archiving_types: Optional[list[ArchivingTypesModel]] = [] From 51223d285a229c5054ffdd7f5965134032c6eba8 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Mon, 19 May 2025 11:57:59 -0700 Subject: [PATCH 2/6] fix: updating "update" schema as well --- cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py b/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py index 62c9f6b2..347e2e98 100644 --- a/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py +++ b/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py @@ -19,6 +19,8 @@ class DaacUpdateModel(BaseModel): daac_collection_id: str daac_data_version: Optional[str] = None daac_sns_topic_arn: Optional[str] = None + daac_role_arn: Optional[str] = None + daac_role_session_name: Optional[str] = None archiving_types: Optional[list[ArchivingTypesModel]] = None From fd33cb7b110d7daa266741c6a05b6d701d89f60a Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 20 May 2025 16:12:37 -0700 Subject: [PATCH 3/6] fix: attempt to retrieve the most recent file --- cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py index 0419a9b8..5d211e09 100644 --- a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py +++ b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py @@ -43,7 +43,8 @@ def get_cnm_response_json_file(self, potential_file, granule_id): return None if len(cnm_response_keys) > 1: LOGGER.warning(f'more than 1 cnm response file: {cnm_response_keys}') - cnm_response_keys = cnm_response_keys[0] + # assuming the names are the same, and it has processing date in the filename, it is easier to reverse it + cnm_response_keys = sorted(cnm_response_keys)[-1] # sort and get the last one which is supposed to be the most recent one. LOGGER.debug(f'cnm_response_keys: {cnm_response_keys}') local_file = self.__s3.set_s3_url(f's3://{self.__s3.target_bucket}/{cnm_response_keys}').download('/tmp') cnm_response_json = FileUtils.read_json(local_file) From d1174867eadcad0565076addeaa3743867adad5d Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 20 May 2025 16:18:41 -0700 Subject: [PATCH 4/6] chore: update log statements --- .../daac_archiver/daac_archiver_logic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py index 5d211e09..1623defc 100644 --- a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py +++ b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py @@ -73,7 +73,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict): archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']} result_files = [] for each_file in granule_files: - print(f'each_file: {each_file}') + LOGGER.debug(f'each_file: {each_file}') """ { "type": "data", @@ -97,6 +97,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict): return result_files def send_to_daac_internal(self, uds_cnm_json: dict): + LOGGER.debug(f'uds_cnm_json: {uds_cnm_json}') granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this. self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue) daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier']) @@ -124,8 +125,7 @@ def send_to_daac_internal(self, uds_cnm_json: dict): 'files': self.__extract_files(uds_cnm_json, daac_config), } } - print(f'uds_cnm_json: {uds_cnm_json}') - print(f'daac_cnm_message: {daac_cnm_message}') + LOGGER.debug(f'daac_cnm_message: {daac_cnm_message}') self.__sns.set_external_role(daac_config['daac_role_arn'], daac_config['daac_role_session_name']).publish_message(json.dumps(daac_cnm_message), True) self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, { 'archive_status': 'cnm_s_success', From ce2cb02cb61717bc846f3f8c4613b95e66e45bec Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Wed, 21 May 2025 06:25:04 -0700 Subject: [PATCH 5/6] feat: add additional policy to lambda role --- .../uds_lambda_processing_role.tf | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 tf-module/unity-cumulus/uds_lambda_processing_role.tf diff --git a/tf-module/unity-cumulus/uds_lambda_processing_role.tf b/tf-module/unity-cumulus/uds_lambda_processing_role.tf new file mode 100644 index 00000000..edab31ae --- /dev/null +++ b/tf-module/unity-cumulus/uds_lambda_processing_role.tf @@ -0,0 +1,25 @@ +data "aws_iam_role" "lambda_processing" { + name = split("/", var.lambda_processing_role_arn)[1] +} + +resource "aws_iam_policy" "uds_lambda_processing_policy" { + name = "${var.prefix}-uds_lambda_processing_policy" + description = "IAM policy for Lambda to access S3 bucket and publish to SNS topic in another account" + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow", + Action = [ + "sts:AssumeRole", + ], + "Resource": "arn:aws:iam::*:role/*" + }, + ] + }) +} + +resource "aws_iam_role_policy_attachment" "uds_lambda_processing_policy_attachment" { + role = data.aws_iam_role.lambda_processing.name + policy_arn = aws_iam_policy.uds_lambda_processing_policy.arn +} From 6d9dadfb43260bcd417c9a93019431539e263314 Mon Sep 17 00:00:00 2001 From: wphyojpl <38299756+wphyojpl@users.noreply.github.com> Date: Wed, 21 May 2025 08:43:00 -0700 Subject: [PATCH 6/6] Update requirements.txt --- requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 5aee6a7e..16bde7d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ jsonschema==4.23.0 jsonschema-specifications==2023.12.1 lark==0.12.0 mangum==0.18.0 -mdps-ds-lib==1.1.1.dev702 +mdps-ds-lib==1.1.1.dev800 pydantic==2.9.2 pydantic_core==2.23.4 pygeofilter==0.2.4 @@ -37,4 +37,4 @@ typing_extensions==4.12.2 tzlocal==5.2 urllib3==1.26.11 uvicorn==0.30.6 -xmltodict==0.13.0 \ No newline at end of file +xmltodict==0.13.0