Skip to content
Merged
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,26 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [9.12.0] - 2025-05-24
### Changed
- [#585](https://github.com/unity-sds/unity-data-services/pull/585) feat: add ram size in lambdas

## [9.11.9] - 2025-05-23
### Fixed
- [#582](https://github.com/unity-sds/unity-data-services/pull/582) fix: use correct schema

## [9.11.8] - 2025-05-21
### Fixed
- [#580](https://github.com/unity-sds/unity-data-services/pull/580) fix: update-archival-index-mapping

## [9.11.7] - 2025-05-21
### Fixed
- [#578](https://github.com/unity-sds/unity-data-services/pull/578) fix: sending sns to daac

## [9.11.6] - 2025-05-14
### Fixed
- [#575](https://github.com/unity-sds/unity-data-services/pull/575) fix: lib version bump

## [9.11.5] - 2025-04-24
### Fixed
- [#568](https://github.com/unity-sds/unity-data-services/pull/568) fix: case insensitive
Expand Down
34 changes: 30 additions & 4 deletions cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,37 @@ def get_cnm_response_json_file(self, potential_file, granule_id):
return None
if len(cnm_response_keys) > 1:
LOGGER.warning(f'more than 1 cnm response file: {cnm_response_keys}')
cnm_response_keys = cnm_response_keys[0]
# assuming the names are the same, and it has processing date in the filename, it is easier to reverse it
cnm_response_keys = sorted(cnm_response_keys)[-1] # sort and get the last one which is supposed to be the most recent one.
LOGGER.debug(f'cnm_response_keys: {cnm_response_keys}')
local_file = self.__s3.set_s3_url(f's3://{self.__s3.target_bucket}/{cnm_response_keys}').download('/tmp')
cnm_response_json = FileUtils.read_json(local_file)
FileUtils.remove_if_exists(local_file)
return cnm_response_json

@staticmethod
def revert_to_s3_url(input_url):
if input_url.startswith("s3://"):
return input_url
if input_url.startswith("http://") or input_url.startswith("https://"):
parts = input_url.split('/', 3)
if len(parts) < 4:
ValueError(f'invalid url: {input_url}')
path_parts = parts[3].split('/', 1)
if len(path_parts) != 2:
ValueError(f'invalid url: {input_url}')
bucket, key = path_parts
return f"s3://{bucket}/{key}"
raise ValueError(f'unknown schema: {input_url}')

def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
granule_files = uds_cnm_json['product']['files']
if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1:
return granule_files # TODO remove missing md5?
archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']}
result_files = []
for each_file in granule_files:
LOGGER.debug(f'each_file: {each_file}')
"""
{
"type": "data",
Expand All @@ -71,6 +88,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
if each_file['type'] not in archiving_types:
continue
file_extensions = archiving_types[each_file['type']]
each_file['uri'] = self.revert_to_s3_url(each_file['uri'])
if len(file_extensions) < 1:
result_files.append(each_file) # TODO remove missing md5?
temp_filename = each_file['name'].upper().strip()
Expand All @@ -79,28 +97,36 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
return result_files

def send_to_daac_internal(self, uds_cnm_json: dict):
LOGGER.debug(f'uds_cnm_json: {uds_cnm_json}')
granule_identifier = UdsCollections.decode_identifier(uds_cnm_json['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
self.__archive_index_logic.set_tenant_venue(granule_identifier.tenant, granule_identifier.venue)
daac_config = self.__archive_index_logic.percolate_document(uds_cnm_json['identifier'])
if daac_config is None or len(daac_config) < 1:
LOGGER.debug(f'uds_cnm_json is not configured for archival. uds_cnm_json: {uds_cnm_json}')
return
daac_config = daac_config[0] # TODO This is currently not supporting more than 1 daac.
result = JsonValidator(UdsArchiveConfigIndex.db_record_schema).validate(daac_config)
if result is not None:
raise ValueError(f'daac_config does not have valid schema. Pls re-add the daac config: {result} for {daac_config}')
try:
self.__sns.set_topic_arn(daac_config['daac_sns_topic_arn'])
daac_cnm_message = {
"collection": daac_config['daac_collection_name'],
"collection": {
'name': daac_config['daac_collection_name'],
'version': daac_config['daac_data_version'],
},
"identifier": uds_cnm_json['identifier'],
"submissionTime": f'{TimeUtils.get_current_time()}Z',
"provider": granule_identifier.tenant,
"version": "1.6.0", # TODO this is hardcoded?
"product": {
"name": granule_identifier.id,
"dataVersion": daac_config['daac_data_version'],
# "dataVersion": daac_config['daac_data_version'],
'files': self.__extract_files(uds_cnm_json, daac_config),
}
}
self.__sns.publish_message(json.dumps(daac_cnm_message))
LOGGER.debug(f'daac_cnm_message: {daac_cnm_message}')
self.__sns.set_external_role(daac_config['daac_role_arn'], daac_config['daac_role_session_name']).publish_message(json.dumps(daac_cnm_message), True)
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
'archive_status': 'cnm_s_success',
'archive_error_message': '',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ class GranulesIndexMapping:
"daac_data_version": {
"type": "keyword"
},
"daac_role_arn": {
"type": "keyword"
},
"daac_role_session_name": {
"type": "keyword"
},
"archiving_types": {
"type": "object",
"properties": {
Expand Down
4 changes: 4 additions & 0 deletions cumulus_lambda_functions/granules_to_es/granules_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ def start(self):
return
self.__cumulus_record = incoming_msg['record']
if len(self.__cumulus_record['files']) < 1:
LOGGER.debug(f'No files in cumulus record. Not inserting to ES')
# TODO ingest updating stage?
return
if 'status' not in self.__cumulus_record or self.__cumulus_record['status'].upper() != 'COMPLETED':
LOGGER.debug(f'missing status or it is NOT COMPLETED status. Not inserting to ES')
return
stac_input_meta = None
potential_files = self.__get_potential_files()
LOGGER.debug(f'potential_files: {potential_files}')
Expand Down
22 changes: 21 additions & 1 deletion cumulus_lambda_functions/lib/uds_db/archive_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,31 @@ class UdsArchiveConfigIndex:
basic_schema = {
'type': 'object',
"additionalProperties": False,
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'collection', 'ss_username', 'archiving_types'],
'required': ['daac_collection_id', 'daac_sns_topic_arn', 'daac_data_version', 'daac_role_arn', 'daac_role_session_name',
'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_id': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'daac_role_arn': {'type': 'string'},
'daac_role_session_name': {'type': 'string'},
'collection': {'type': 'string'},
'ss_username': {'type': 'string'},
'archiving_types': {'type': 'array', 'items': {'type': 'object'}},
}
}

db_record_schema = {
'type': 'object',
'required': ['daac_collection_name', 'daac_sns_topic_arn', 'daac_data_version', 'daac_role_arn',
'daac_role_session_name',
'collection', 'ss_username', 'archiving_types'],
'properties': {
'daac_collection_name': {'type': 'string'},
'daac_sns_topic_arn': {'type': 'string'},
'daac_data_version': {'type': 'string'},
'daac_role_arn': {'type': 'string'},
'daac_role_session_name': {'type': 'string'},
'collection': {'type': 'string'},
'ss_username': {'type': 'string'},
'archiving_types': {'type': 'array', 'items': {'type': 'object'}},
Expand Down
4 changes: 4 additions & 0 deletions cumulus_lambda_functions/uds_api/dapa/daac_archive_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ class DaacUpdateModel(BaseModel):
daac_collection_id: str
daac_data_version: Optional[str] = None
daac_sns_topic_arn: Optional[str] = None
daac_role_arn: Optional[str] = None
daac_role_session_name: Optional[str] = None
archiving_types: Optional[list[ArchivingTypesModel]] = None


class DaacAddModel(BaseModel):
daac_collection_id: str
daac_data_version: str
daac_sns_topic_arn: str
daac_role_arn: str
daac_role_session_name: str
archiving_types: Optional[list[ArchivingTypesModel]] = []


Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jsonschema==4.23.0
jsonschema-specifications==2023.12.1
lark==0.12.0
mangum==0.18.0
mdps-ds-lib==1.1.1.dev701
mdps-ds-lib==1.1.1.dev800
pydantic==2.9.2
pydantic_core==2.23.4
pygeofilter==0.2.4
Expand All @@ -37,4 +37,4 @@ typing_extensions==4.12.2
tzlocal==5.2
urllib3==1.26.11
uvicorn==0.30.6
xmltodict==0.13.0
xmltodict==0.13.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

setup(
name="cumulus_lambda_functions",
version="9.11.5",
version="9.12.0",
packages=find_packages(),
install_requires=install_requires,
package_data={
Expand Down
1 change: 1 addition & 0 deletions tf-module/unity-cumulus/daac_archiver.tf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ resource "aws_lambda_function" "daac_archiver_response" {
handler = "cumulus_lambda_functions.daac_archiver.lambda_function.lambda_handler_response"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand Down
2 changes: 2 additions & 0 deletions tf-module/unity-cumulus/granules_cnm_ingester.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ resource "aws_lambda_function" "granules_cnm_ingester" {
handler = "cumulus_lambda_functions.granules_cnm_ingester.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 512
reserved_concurrent_executions = var.granules_cnm_ingester__lambda_concurrency # TODO
environment {
variables = {
Expand Down Expand Up @@ -66,6 +67,7 @@ resource "aws_lambda_function" "granules_cnm_response_writer" {
handler = "cumulus_lambda_functions.granules_cnm_response_writer.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
reserved_concurrent_executions = var.granules_cnm_response_writer__lambda_concurrency # TODO
environment {
variables = {
Expand Down
6 changes: 5 additions & 1 deletion tf-module/unity-cumulus/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ resource "aws_lambda_function" "metadata_s4pa_generate_cmr" {
handler = "cumulus_lambda_functions.metadata_s4pa_generate_cmr.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand All @@ -65,6 +66,7 @@ resource "aws_lambda_function" "metadata_cas_generate_cmr" {
handler = "cumulus_lambda_functions.metadata_cas_generate_cmr.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand All @@ -86,6 +88,7 @@ resource "aws_lambda_function" "metadata_stac_generate_cmr" {
handler = "cumulus_lambda_functions.metadata_stac_generate_cmr.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand All @@ -112,6 +115,7 @@ resource "aws_lambda_function" "granules_to_es" {
handler = "cumulus_lambda_functions.granules_to_es.lambda_function.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 256
environment {
variables = {
LOG_LEVEL = var.log_level
Expand All @@ -137,7 +141,7 @@ resource "aws_lambda_function" "uds_api_1" {
handler = "cumulus_lambda_functions.uds_api.web_service.handler"
runtime = "python3.9"
timeout = 300

memory_size = 512
environment {
variables = {
CUMULUS_BASE = var.cumulus_base
Expand Down
25 changes: 25 additions & 0 deletions tf-module/unity-cumulus/uds_lambda_processing_role.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
data "aws_iam_role" "lambda_processing" {
name = split("/", var.lambda_processing_role_arn)[1]
}

resource "aws_iam_policy" "uds_lambda_processing_policy" {
name = "${var.prefix}-uds_lambda_processing_policy"
description = "IAM policy for Lambda to access S3 bucket and publish to SNS topic in another account"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"sts:AssumeRole",
],
"Resource": "arn:aws:iam::*:role/*"
},
]
})
}

resource "aws_iam_role_policy_attachment" "uds_lambda_processing_policy_attachment" {
role = data.aws_iam_role.lambda_processing.name
policy_arn = aws_iam_policy.uds_lambda_processing_policy.arn
}
Loading