diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a53be2d..e71a9060 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,26 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [9.12.0] - 2025-05-24 +### Changed +- [#585](https://github.com/unity-sds/unity-data-services/pull/585) feat: add ram size in lambdas + +## [9.11.9] - 2025-05-23 +### Fixed +- [#582](https://github.com/unity-sds/unity-data-services/pull/582) fix: use correct schema + +## [9.11.8] - 2025-05-21 +### Fixed +- [#580](https://github.com/unity-sds/unity-data-services/pull/580) fix: update-archival-index-mapping + +## [9.11.7] - 2025-05-21 +### Fixed +- [#578](https://github.com/unity-sds/unity-data-services/pull/578) fix: sending sns to daac + +## [9.11.6] - 2025-05-14 +### Fixed +- [#575](https://github.com/unity-sds/unity-data-services/pull/575) fix: lib version bump + ## [9.11.5] - 2025-04-24 ### Fixed - [#568](https://github.com/unity-sds/unity-data-services/pull/568) fix: case insensitive diff --git a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py index 8b372809..c0df7e2a 100644 --- a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py +++ b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py @@ -43,13 +43,29 @@ def get_cnm_response_json_file(self, potential_file, granule_id): return None if len(cnm_response_keys) > 1: LOGGER.warning(f'more than 1 cnm response file: {cnm_response_keys}') - cnm_response_keys = cnm_response_keys[0] + # assuming the names are the same, and it has processing date in the filename, it is easier to reverse it + cnm_response_keys = sorted(cnm_response_keys)[-1] # sort and get the last one which is supposed to be the most recent one. LOGGER.debug(f'cnm_response_keys: {cnm_response_keys}') local_file = self.__s3.set_s3_url(f's3://{self.__s3.target_bucket}/{cnm_response_keys}').download('/tmp') cnm_response_json = FileUtils.read_json(local_file) FileUtils.remove_if_exists(local_file) return cnm_response_json + @staticmethod + def revert_to_s3_url(input_url): + if input_url.startswith("s3://"): + return input_url + if input_url.startswith("http://") or input_url.startswith("https://"): + parts = input_url.split('/', 3) + if len(parts) < 4: + ValueError(f'invalid url: {input_url}') + path_parts = parts[3].split('/', 1) + if len(path_parts) != 2: + ValueError(f'invalid url: {input_url}') + bucket, key = path_parts + return f"s3://{bucket}/{key}" + raise ValueError(f'unknown schema: {input_url}') + def __extract_files(self, uds_cnm_json: dict, daac_config: dict): granule_files = uds_cnm_json['product']['files'] if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1: @@ -57,6 +73,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict): archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']} result_files = [] for each_file in granule_files: + LOGGER.debug(f'each_file: {each_file}') """ { "type": "data", @@ -71,6 +88,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict): if each_file['type'] not in archiving_types: continue file_extensions = archiving_types[each_file['type']] + each_file['uri'] = self.revert_to_s3_url(each_file['uri']) if len(file_extensions) < 1: result_files.append(each_file) # TODO remove missing md5? temp_filename = each_file['name'].upper().strip() @@ -79,6 +97,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict): return result_files def send_to_daac_internal(self, uds_cnm_json: dict): + LOGGER.debug(f'uds_cnm_json: {uds_cnm_json}') granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this. self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue) daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier']) @@ -86,21 +105,28 @@ def send_to_daac_internal(self, uds_cnm_json: dict): LOGGER.debug(f'uds_cnm_json is not configured for archival. uds_cnm_json: {uds_cnm_json}') return daac_config = daac_config[0] # TODO This is currently not supporting more than 1 daac. + result = JsonValidator(UdsArchiveConfigIndex.db_record_schema).validate(daac_config) + if result is not None: + raise ValueError(f'daac_config does not have valid schema. Pls re-add the daac config: {result} for {daac_config}') try: self.__sns.set_topic_arn(daac_config['daac_sns_topic_arn']) daac_cnm_message = { - "collection": daac_config['daac_collection_name'], + "collection": { + 'name': daac_config['daac_collection_name'], + 'version': daac_config['daac_data_version'], + }, "identifier": uds_cnm_json['identifier'], "submissionTime": f'{TimeUtils.get_current_time()}Z', "provider": granule_identifier.tenant, "version": "1.6.0", # TODO this is hardcoded? "product": { "name": granule_identifier.id, - "dataVersion": daac_config['daac_data_version'], + # "dataVersion": daac_config['daac_data_version'], 'files': self.__extract_files(uds_cnm_json, daac_config), } } - self.__sns.publish_message(json.dumps(daac_cnm_message)) + LOGGER.debug(f'daac_cnm_message: {daac_cnm_message}') + self.__sns.set_external_role(daac_config['daac_role_arn'], daac_config['daac_role_session_name']).publish_message(json.dumps(daac_cnm_message), True) self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, { 'archive_status': 'cnm_s_success', 'archive_error_message': '', diff --git a/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py b/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py index 89b4ae1a..8b5ece1c 100644 --- a/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py +++ b/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py @@ -9,6 +9,12 @@ class GranulesIndexMapping: "daac_data_version": { "type": "keyword" }, + "daac_role_arn": { + "type": "keyword" + }, + "daac_role_session_name": { + "type": "keyword" + }, "archiving_types": { "type": "object", "properties": { diff --git a/cumulus_lambda_functions/granules_to_es/granules_indexer.py b/cumulus_lambda_functions/granules_to_es/granules_indexer.py index be2fe3de..5e98f4c2 100644 --- a/cumulus_lambda_functions/granules_to_es/granules_indexer.py +++ b/cumulus_lambda_functions/granules_to_es/granules_indexer.py @@ -78,8 +78,12 @@ def start(self): return self.__cumulus_record = incoming_msg['record'] if len(self.__cumulus_record['files']) < 1: + LOGGER.debug(f'No files in cumulus record. Not inserting to ES') # TODO ingest updating stage? return + if 'status' not in self.__cumulus_record or self.__cumulus_record['status'].upper() != 'COMPLETED': + LOGGER.debug(f'missing status or it is NOT COMPLETED status. Not inserting to ES') + return stac_input_meta = None potential_files = self.__get_potential_files() LOGGER.debug(f'potential_files: {potential_files}') diff --git a/cumulus_lambda_functions/lib/uds_db/archive_index.py b/cumulus_lambda_functions/lib/uds_db/archive_index.py index b42eea72..a25a9d31 100644 --- a/cumulus_lambda_functions/lib/uds_db/archive_index.py +++ b/cumulus_lambda_functions/lib/uds_db/archive_index.py @@ -15,11 +15,31 @@ class UdsArchiveConfigIndex: basic_schema = { 'type': 'object', "additionalProperties": False, - 'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'collection', 'ss_username', 'archiving_types'], + 'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'daac_role_arn', 'daac_role_session_name', + 'collection', 'ss_username', 'archiving_types'], 'properties': { 'daac_collection_id': {'type': 'string'}, 'daac_sns_topic_arn': {'type': 'string'}, 'daac_data_version': {'type': 'string'}, + 'daac_role_arn': {'type': 'string'}, + 'daac_role_session_name': {'type': 'string'}, + 'collection': {'type': 'string'}, + 'ss_username': {'type': 'string'}, + 'archiving_types': {'type': 'array', 'items': {'type': 'object'}}, + } + } + + db_record_schema = { + 'type': 'object', + 'required': ['daac_collection_name', 'daac_sns_topic_arn', 'daac_data_version', 'daac_role_arn', + 'daac_role_session_name', + 'collection', 'ss_username', 'archiving_types'], + 'properties': { + 'daac_collection_name': {'type': 'string'}, + 'daac_sns_topic_arn': {'type': 'string'}, + 'daac_data_version': {'type': 'string'}, + 'daac_role_arn': {'type': 'string'}, + 'daac_role_session_name': {'type': 'string'}, 'collection': {'type': 'string'}, 'ss_username': {'type': 'string'}, 'archiving_types': {'type': 'array', 'items': {'type': 'object'}}, diff --git a/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py b/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py index 2998840e..347e2e98 100644 --- a/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py +++ b/cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py @@ -19,6 +19,8 @@ class DaacUpdateModel(BaseModel): daac_collection_id: str daac_data_version: Optional[str] = None daac_sns_topic_arn: Optional[str] = None + daac_role_arn: Optional[str] = None + daac_role_session_name: Optional[str] = None archiving_types: Optional[list[ArchivingTypesModel]] = None @@ -26,6 +28,8 @@ class DaacAddModel(BaseModel): daac_collection_id: str daac_data_version: str daac_sns_topic_arn: str + daac_role_arn: str + daac_role_session_name: str archiving_types: Optional[list[ArchivingTypesModel]] = [] diff --git a/requirements.txt b/requirements.txt index 4eb32bfb..16bde7d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ jsonschema==4.23.0 jsonschema-specifications==2023.12.1 lark==0.12.0 mangum==0.18.0 -mdps-ds-lib==1.1.1.dev701 +mdps-ds-lib==1.1.1.dev800 pydantic==2.9.2 pydantic_core==2.23.4 pygeofilter==0.2.4 @@ -37,4 +37,4 @@ typing_extensions==4.12.2 tzlocal==5.2 urllib3==1.26.11 uvicorn==0.30.6 -xmltodict==0.13.0 \ No newline at end of file +xmltodict==0.13.0 diff --git a/setup.py b/setup.py index ed30fe9d..5e37f08d 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ setup( name="cumulus_lambda_functions", - version="9.11.5", + version="9.12.0", packages=find_packages(), install_requires=install_requires, package_data={ diff --git a/tf-module/unity-cumulus/daac_archiver.tf b/tf-module/unity-cumulus/daac_archiver.tf index 3a82f0ab..dc2931e1 100644 --- a/tf-module/unity-cumulus/daac_archiver.tf +++ b/tf-module/unity-cumulus/daac_archiver.tf @@ -38,6 +38,7 @@ resource "aws_lambda_function" "daac_archiver_response" { handler = "cumulus_lambda_functions.daac_archiver.lambda_function.lambda_handler_response" runtime = "python3.9" timeout = 300 + memory_size = 256 environment { variables = { LOG_LEVEL = var.log_level diff --git a/tf-module/unity-cumulus/granules_cnm_ingester.tf b/tf-module/unity-cumulus/granules_cnm_ingester.tf index f240261f..c8ebc725 100644 --- a/tf-module/unity-cumulus/granules_cnm_ingester.tf +++ b/tf-module/unity-cumulus/granules_cnm_ingester.tf @@ -6,6 +6,7 @@ resource "aws_lambda_function" "granules_cnm_ingester" { handler = "cumulus_lambda_functions.granules_cnm_ingester.lambda_function.lambda_handler" runtime = "python3.9" timeout = 300 + memory_size = 512 reserved_concurrent_executions = var.granules_cnm_ingester__lambda_concurrency # TODO environment { variables = { @@ -66,6 +67,7 @@ resource "aws_lambda_function" "granules_cnm_response_writer" { handler = "cumulus_lambda_functions.granules_cnm_response_writer.lambda_function.lambda_handler" runtime = "python3.9" timeout = 300 + memory_size = 256 reserved_concurrent_executions = var.granules_cnm_response_writer__lambda_concurrency # TODO environment { variables = { diff --git a/tf-module/unity-cumulus/main.tf b/tf-module/unity-cumulus/main.tf index 4b31672a..a673ece0 100644 --- a/tf-module/unity-cumulus/main.tf +++ b/tf-module/unity-cumulus/main.tf @@ -43,6 +43,7 @@ resource "aws_lambda_function" "metadata_s4pa_generate_cmr" { handler = "cumulus_lambda_functions.metadata_s4pa_generate_cmr.lambda_function.lambda_handler" runtime = "python3.9" timeout = 300 + memory_size = 256 environment { variables = { LOG_LEVEL = var.log_level @@ -65,6 +66,7 @@ resource "aws_lambda_function" "metadata_cas_generate_cmr" { handler = "cumulus_lambda_functions.metadata_cas_generate_cmr.lambda_function.lambda_handler" runtime = "python3.9" timeout = 300 + memory_size = 256 environment { variables = { LOG_LEVEL = var.log_level @@ -86,6 +88,7 @@ resource "aws_lambda_function" "metadata_stac_generate_cmr" { handler = "cumulus_lambda_functions.metadata_stac_generate_cmr.lambda_function.lambda_handler" runtime = "python3.9" timeout = 300 + memory_size = 256 environment { variables = { LOG_LEVEL = var.log_level @@ -112,6 +115,7 @@ resource "aws_lambda_function" "granules_to_es" { handler = "cumulus_lambda_functions.granules_to_es.lambda_function.lambda_handler" runtime = "python3.9" timeout = 300 + memory_size = 256 environment { variables = { LOG_LEVEL = var.log_level @@ -137,7 +141,7 @@ resource "aws_lambda_function" "uds_api_1" { handler = "cumulus_lambda_functions.uds_api.web_service.handler" runtime = "python3.9" timeout = 300 - + memory_size = 512 environment { variables = { CUMULUS_BASE = var.cumulus_base diff --git a/tf-module/unity-cumulus/uds_lambda_processing_role.tf b/tf-module/unity-cumulus/uds_lambda_processing_role.tf new file mode 100644 index 00000000..edab31ae --- /dev/null +++ b/tf-module/unity-cumulus/uds_lambda_processing_role.tf @@ -0,0 +1,25 @@ +data "aws_iam_role" "lambda_processing" { + name = split("/", var.lambda_processing_role_arn)[1] +} + +resource "aws_iam_policy" "uds_lambda_processing_policy" { + name = "${var.prefix}-uds_lambda_processing_policy" + description = "IAM policy for Lambda to access S3 bucket and publish to SNS topic in another account" + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow", + Action = [ + "sts:AssumeRole", + ], + "Resource": "arn:aws:iam::*:role/*" + }, + ] + }) +} + +resource "aws_iam_role_policy_attachment" "uds_lambda_processing_policy_attachment" { + role = data.aws_iam_role.lambda_processing.name + policy_arn = aws_iam_policy.uds_lambda_processing_policy.arn +}