From 20014998e413be076918dd2840ecb3ef2694a49d Mon Sep 17 00:00:00 2001 From: Steve Turoscy Date: Wed, 15 Apr 2026 11:36:49 -0400 Subject: [PATCH 1/3] Adding globus --- .gitignore | 2 + bin/router_resource.py | 111 ++++++++++++++++++++++- utils/__init__.py | 0 utils/search.py | 188 +++++++++++++++++++++++++++++++++++++++ utils/utility.py | 195 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 493 insertions(+), 3 deletions(-) create mode 100644 utils/__init__.py create mode 100644 utils/search.py create mode 100644 utils/utility.py diff --git a/.gitignore b/.gitignore index b3b0c4b..17b0846 100644 --- a/.gitignore +++ b/.gitignore @@ -135,3 +135,5 @@ dmypy.json # Mac .DS_Store + +.vscode/ \ No newline at end of file diff --git a/bin/router_resource.py b/bin/router_resource.py index f834c66..fe6d9c0 100755 --- a/bin/router_resource.py +++ b/bin/router_resource.py @@ -45,6 +45,7 @@ import django django.setup() +import requests from django.conf import settings as django_settings from django.db import DataError, IntegrityError from django.forms.models import model_to_dict @@ -574,8 +575,6 @@ def Write_RSP_Support_Providers(self, content, contype, config): self.PROCESSING_SECONDS[me] += (datetime.now(timezone.utc) - start_utc).total_seconds() self.Log_STEP(me) return(0, '') - - #################################### def Write_Glue2_Network_Service(self, content, contype, config): @@ -696,6 +695,113 @@ def Write_Glue2_Network_Service(self, content, contype, config): self.Log_STEP(me) return(0, '') + #################################### + ### Globus handlers ### + def Write_Glue2_Executable_Software_Globus(self, content, contype, config): + from utils.search import GlobusProcess + from utils.utility import generate_payloads + + # Incoming Glue2 models from Glue2 Router API + application_handles = ApplicationHandle.objects.order_by( + '-CreationTime').selected_related() + + # Build resourceV4 payload from remote GLUE2 resources (simulate incoming GLUE2 models in router) + payload = generate_payloads(application_handles) + + # Initiate a Globus Process + # Handles ingest, delete_by_query, and update. + globus_process = GlobusProcess() + + # Process added/removed/updated items in ResourceV4Local table + if len(payload["added"]): + # Ingest new items into Globus Search Index + gmeta_list = { + "ingest_type": "GMetaList", + "ingest_data": { + "gmeta": [] + } + } + for item in payload["added"]: + gmeta_entry = { + "subject": item["ID"], + "visible_to": ["public"], + "content": item["EntityJSON"] + } + gmeta_list["ingest_data"]["gmeta"].append(gmeta_entry) + globus_process.ingest(gmeta_list=gmeta_list) + + # Bulk create new ResourceV4Local entries for added items in PostgreSQL + resource_v4_local_added = [ + ResourceV4Local(**item) for item in payload["added"] + ] + try: + ResourceV4Local.objects.bulk_create(resource_v4_local_added) + except Exception as err: + self.logger.warning(err) + return Response( + [{"error": str(err)}], + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + + if len(payload["removed"]): + # Delete items from Globus Search Index by querying on the ID field + globus_process.delete_by_query(payload["removed"]) + + # Delete items from ResourceV4Local table in PostgreSQL + resource_v4_local_removed = ResourceV4Local.objects.filter( + LocalID__in=payload["removed"] + ) + try: + resource_v4_local_removed.delete() + except Exception as err: + self.logger.warning(err) + return Response( + [{"error": str(err)}], + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + + if len(payload["updated"]): + # Ingest updated items into Globus Search Index + gmeta_list = { + "ingest_type": "GMetaList", + "ingest_data": { + "gmeta": [] + } + } + + for item in payload["updated"]: + try: + resource = ResourceV4Local.objects.get(LocalID=item["LocalID"]) + for key, value in item["changes"].items(): + if key == "EntityJSON": + for entity_json_key, entity_json_value in value.items(): + resource.EntityJSON[entity_json_key] = entity_json_value + else: + setattr(resource, key, value) + resource.save() + + gmeta_entry = { + "subject": resource.ID, + "visible_to": ["public"], + "content": resource.EntityJSON + } + gmeta_list["ingest_data"]["gmeta"].append(gmeta_entry) + except Exception as err: + self.logger.warning(err) + return Response( + [{"error": str(err)}], + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + globus_process.ingest(gmeta_list=gmeta_list) + + self.logger.info(payload) + return Response([{ + "added": len(payload["added"]), + "removed": len(payload["removed"]), + "updated": len(payload["updated"]), + }]) + ### End Globus handlers ### + #################################### #################################### def Write_Glue2_Executable_Software(self, content, contype, config): @@ -842,7 +948,6 @@ def Write_Glue2_Executable_Software(self, content, contype, config): self.Log_STEP(me) return(0, '') - ##################################################################### # Function for loading CIDER (CyberInfrastructure Description Repository) data # Load CIDER's organization data to ResourceV4 tables (local, standard) diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/search.py b/utils/search.py new file mode 100644 index 0000000..db093a3 --- /dev/null +++ b/utils/search.py @@ -0,0 +1,188 @@ +import json +from typing import Iterable, List, Tuple, Any + +import globus_sdk +import logging +import requests +from requests.exceptions import HTTPError + +from django.conf import settings +from globus_sdk.scopes import SearchScopes + +logg2 = logging.getLogger("warehouse.logger") + + +class GlobusProcess(): + def __init__(self, *args, **kwargs): + self.app = globus_sdk.ClientApp( + "ACCESS-CI Operations Warehouse Globus Service Client", + client_id=settings.GLOBUS_CLIENT_ID, + client_secret=settings.GLOBUS_CLIENT_SECRET + ) + + # Add scope requirements for the Authorizer + self.app.add_scope_requirements({ + "search.api.globus.org": [SearchScopes.all] + }) + + # Get the authorization header for the Search PUT requests + # This is for the update_by_subject method which uses the + # Search API directly via requests. + + # The update_by_subject method is not currently used, but may + # be useful for future reference. + authorizer = self.app.get_authorizer("search.api.globus.org") + self.authorization_header = authorizer.get_authorization_header() + + # Initialize the SearchClient with the app and required scopes + # For ingest and delete_by_query operations we can use the SearchClient + # directly. + self.search_client = globus_sdk.SearchClient( + app=self.app, app_scopes=[SearchScopes.all] + ) + self.search_endpoint = settings.GLOBUS_SEARCH_INDEX_ID + + def ingest(self, gmeta_list): + for i, (batch, size_bytes) in enumerate( + self.chunk_by_size( + gmeta_list["ingest_data"]["gmeta"], + max_chunk_kb=8000, + safety_ratio=0.9 + ) + ): + logg2.info( + f"Batch {i}: " + f"{len(batch)} items | " + f"{size_bytes} bytes | " + ) + try: + self.search_client.ingest( + self.search_endpoint, + { + "ingest_type": "GMetaList", + "ingest_data": { + "gmeta": batch + } + } + ) + except globus_sdk.SearchAPIError as err: + logg2.warning({"error": str(err), "status": "400"}) + return {"error": str(err), "status": "400"} + logg2.info( + f""" + Successfully ingested {len(gmeta_list['ingest_data']['gmeta'])} + items in {i+1} batches. + """ + ) + + def delete_by_query(self, local_ids): + try: + self.search_client.delete_by_query( + self.search_endpoint, + { + "@version": "delete_by_query#1.0.0", + "filters": [ + { + "type": "match_any", + "field_name": "LocalID", + "values": local_ids, + } + ], + } + ) + except globus_sdk.SearchAPIError as err: + logg2.warning({"error": str(err), "status": "400"}) + return {"error": str(err), "status": "400"} + logg2.info(f"Successfully deleted {len(local_ids)} items") + return {} + + # Function is not currently used, but may be useful for + # future reference for future reference. + def update_by_subject(self, gmeta_list): + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + url = f"https://search.api.globus.org/v1/index/{self.search_endpoint}/entry" + + for gmeta_entry in gmeta_list["ingest_data"]["gmeta"]: + payload = { + "subject": gmeta_entry["subject"], + "visible_to": gmeta_entry["visible_to"], + "content": gmeta_entry["content"] + } + + # There is no built-in method in the SearchClient for + # updating by subject, so we need to handle the auth + # header ourselves. + try: + response = requests.put(url, json=payload, headers=headers) + response.raise_for_status() + except HTTPError as http_err: + logg2.warning(f'HTTP error occurred: {http_err}') + except Exception as err: + logg2.warning(response.json()) + logg2.warning(f'Other error occurred: {err}') + + logg2.info( + f""" + Successfully updated + {len(gmeta_list['ingest_data']['gmeta'])} items + """ + ) + + # Utility functions + def chunk_by_size( + self, + items: Iterable[Any], + max_chunk_kb: int = 10000, + safety_ratio: float = 1.0, + ) -> Iterable[Tuple[List[Any], int]]: + """ + Yield (chunk, size_bytes) where each chunk's serialized JSON size + does not exceed max_chunk_kb * safety_ratio. + + Args: + items: Iterable of JSON-serializable objects + max_chunk_kb: Maximum chunk size in kilobytes (default 10,000 KB) + safety_ratio: Optional safety factor (e.g. 0.8 for 80%) + + Yields: + Tuple of: + - List of items + - Serialized size in bytes + """ + max_bytes = int(max_chunk_kb * 1024 * safety_ratio) + + chunk: List[Any] = [] + current_size = 2 # accounts for opening + closing brackets [] + + for item in items: + item_bytes = json.dumps( + item, + separators=(",", ":"), + ensure_ascii=False, + ).encode("utf-8") + + item_size = len(item_bytes) + + if item_size > max_bytes: + raise ValueError( + f"Single item size ({item_size} bytes) exceeds max chunk size ({max_bytes} bytes)" + ) + + additional_size = item_size + if chunk: + additional_size += 1 # comma separator + + if chunk and current_size + additional_size > max_bytes: + yield chunk, current_size + chunk = [] + current_size = 2 + additional_size = item_size + + chunk.append(item) + current_size += additional_size + + if chunk: + yield chunk, current_size diff --git a/utils/utility.py b/utils/utility.py new file mode 100644 index 0000000..0a8b896 --- /dev/null +++ b/utils/utility.py @@ -0,0 +1,195 @@ +def generate_payloads(application_handles): + import hashlib + + def chunk_dict(data_dict, chunk_size): + for i in range(0, len(data_dict), chunk_size): + yield data_dict[i:i + chunk_size] + + # Current list of resource_v4_local entries in DB + resource_v4_local_list_new = [] + resource_v4_local_list_old = ResourceV4Local.objects.filter( + Affiliation__exact="access-ci.org" + ).order_by("-CreationTime") + + for chunk in chunk_dict(application_handles, 250): + for application in chunk: + cider = CiderInfrastructure.objects.filter( + info_resourceid=application.ResourceID + ).first() + environment_id = application.ID + environment_id_utf8 = environment_id.encode('utf-8') + environment_id_hash = f"urn:ogf.org:glue2:access-ci.org:executable.software:{hashlib.md5(environment_id_utf8).hexdigest()}" + + validity = application.ApplicationEnvironment.Validity + if validity: + validity = str(validity.total_seconds()) + + # Build EntityJSON for the software entity + entity_json = { + "ID": environment_id_hash, + "Category": application.ApplicationEnvironment.EntityJSON.get("Category", None), + "CreationTime": application.ApplicationEnvironment.CreationTime.isoformat(), + "Default": application.ApplicationEnvironment.EntityJSON.get("Default", True), + "Description": application.ApplicationEnvironment.Description, + "HandleKey": application.Value, + "HandleType": application.Type, + "Keywords": application.ApplicationEnvironment.EntityJSON.get("Keywords", None), + "LocalID": environment_id, + "Name": application.ApplicationEnvironment.AppName, + "SupportContact": application.ApplicationEnvironment.EntityJSON.get("SupportContact", "https://support.access-ci.org/help-ticket"), + "SupportStatus": application.ApplicationEnvironment.EntityJSON.get("SupportStatus", "production"), + "URL": application.ApplicationEnvironment.EntityJSON.get("URL", None), + "Validity": validity, + "Version": application.ApplicationEnvironment.AppVersion, + + # Cider fields + "Info_GroupID": cider.other_attributes.get("groups", [])[0]["info_groupid"] if len(cider.other_attributes.get("groups", [])) else None, + "Info_GroupName": cider.other_attributes.get("groups", [])[0]["name"] if len(cider.other_attributes.get("groups", [])) else None, + "Info_ResourceID": cider.info_resourceid, + "Info_ResourceName": cider.resource_descriptive_name, + "Organization_ID": cider.info_siteid, + "Organization_Name": cider.other_attributes.get("organizations", [])[0]["organization_name"], + } + + # Build ResourceV4Local entry + resource_v4_local_entry = { + "ID": environment_id_hash, + "Affiliation": "access-ci.org", + "CatalogMetaURL": "urn:ogf.org:glue2:access-ci.org:catalog:glue2:executable.software", + "CreationTime": application.ApplicationEnvironment.CreationTime.isoformat(), + "LocalID": environment_id, + "LocalType": "GLUE2 Executable Software", + "LocalURL": f"https://operations-api.access-ci.org/wh2/glue2/v1/software_full/ID/{environment_id}/", + "Validity": validity, + "EntityJSON": entity_json, + } + resource_v4_local_list_new.append(resource_v4_local_entry) + + payload = { + # List of items that were added/removed/updated since last run + # Returns a payload with the above keys + **compare_dict_lists( + resource_v4_local_list_old.values(), + resource_v4_local_list_new, + ), + } + return payload + + +def compare_dict_lists( + old_list, + new_list, + id_key="LocalID", + ignore_fields=None, +): + """ + Compare two lists of dicts with: + - nested field diff output + - dot-notation ignore_fields support + - optimized for large datasets + - returns only the new value in 'changes' + """ + + if ignore_fields is None: + ignore_fields = [ + "CreationTime", + "ID", + "LocalID", + "Validity", + "EntityJSON.CreationTime", + "EntityJSON.ID", + "EntityJSON.LocalID", + "EntityJSON.Validity", + ] + + ignore_fields = set(ignore_fields) + + # ----------------------------- + # Fast ignore matcher + # ----------------------------- + def should_ignore(path): + return any(path == f or path.startswith(f + ".") for f in ignore_fields) + + # ----------------------------- + # Merge a change into a nested dictionary + # ----------------------------- + def merge_change(d, path_list, value): + key = path_list[0] + if len(path_list) == 1: + d[key] = value # only new value + else: + if key not in d: + d[key] = {} + merge_change(d[key], path_list[1:], value) + + # ----------------------------- + # Recursive diff engine + # ----------------------------- + def deep_diff_nested(old, new, path="", changes=None): + if changes is None: + changes = {} + + if old is new: + return changes + + if type(old) != type(new): + merge_change(changes, path.split(".") if path else [], new) + return changes + + if isinstance(old, dict): + all_keys = old.keys() | new.keys() + for key in all_keys: + current_path = f"{path}.{key}" if path else key + + if should_ignore(current_path): + continue + + if key not in old: + merge_change(changes, current_path.split("."), new[key]) + elif key not in new: + merge_change(changes, current_path.split("."), None) + else: + deep_diff_nested(old[key], new[key], current_path, changes) + + return changes + + if isinstance(old, list): + if len(old) != len(new): + merge_change(changes, path.split(".") if path else [], new) + return changes + for idx, (o, n) in enumerate(zip(old, new)): + deep_diff_nested(o, n, f"{path}[{idx}]", changes) + return changes + + # Primitive values + if old != new: + merge_change(changes, path.split(".") if path else [], new) + + return changes + + # ----------------------------- + # Lookup dicts for fast O(n) matching + # ----------------------------- + old_dict = {item[id_key]: item for item in old_list} + new_dict = {item[id_key]: item for item in new_list} + + old_ids = set(old_dict) + new_ids = set(new_dict) + + added = [new_dict[_id] for _id in new_ids - old_ids] + removed = list(old_ids - new_ids) + + updated = [] + for _id in old_ids & new_ids: + changes = deep_diff_nested(old_dict[_id], new_dict[_id]) + if changes: + updated.append({ + "LocalID": _id, + "changes": changes + }) + + return { + "added": added, + "removed": removed, + "updated": updated + } From 4849188dda9e88b967c3e44deabf2d8e2cb830f9 Mon Sep 17 00:00:00 2001 From: Steve Turoscy Date: Thu, 16 Apr 2026 10:32:34 -0400 Subject: [PATCH 2/3] Fixing logs --- bin/router_resource.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/bin/router_resource.py b/bin/router_resource.py index fe6d9c0..8a336b4 100755 --- a/bin/router_resource.py +++ b/bin/router_resource.py @@ -738,10 +738,7 @@ def Write_Glue2_Executable_Software_Globus(self, content, contype, config): ResourceV4Local.objects.bulk_create(resource_v4_local_added) except Exception as err: self.logger.warning(err) - return Response( - [{"error": str(err)}], - status=status.HTTP_500_INTERNAL_SERVER_ERROR - ) + return {"error": str(err)} if len(payload["removed"]): # Delete items from Globus Search Index by querying on the ID field @@ -755,10 +752,7 @@ def Write_Glue2_Executable_Software_Globus(self, content, contype, config): resource_v4_local_removed.delete() except Exception as err: self.logger.warning(err) - return Response( - [{"error": str(err)}], - status=status.HTTP_500_INTERNAL_SERVER_ERROR - ) + return {"error": str(err)} if len(payload["updated"]): # Ingest updated items into Globus Search Index @@ -788,18 +782,17 @@ def Write_Glue2_Executable_Software_Globus(self, content, contype, config): gmeta_list["ingest_data"]["gmeta"].append(gmeta_entry) except Exception as err: self.logger.warning(err) - return Response( - [{"error": str(err)}], - status=status.HTTP_500_INTERNAL_SERVER_ERROR - ) + return {"error": str(err)} globus_process.ingest(gmeta_list=gmeta_list) - self.logger.info(payload) - return Response([{ + payload_summary = { + "run_date": datetime.now(timezone.utc).isoformat(), "added": len(payload["added"]), "removed": len(payload["removed"]), "updated": len(payload["updated"]), - }]) + } + self.logger.info(payload_summary) + return [payload_summary] ### End Globus handlers ### #################################### From 0a1233a0a29d2b4801da20519e8ed5cda6900c93 Mon Sep 17 00:00:00 2001 From: Steve Turoscy Date: Wed, 20 May 2026 09:22:06 -0400 Subject: [PATCH 3/3] Bug fixes - https://access-ci.atlassian.net/browse/CTT-712?focusedCommentId=155093 --- bin/router_resource.py | 9 +++++---- utils/search.py | 9 ++++----- utils/utility.py | 9 ++++++++- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/bin/router_resource.py b/bin/router_resource.py index 8a336b4..4d70933 100755 --- a/bin/router_resource.py +++ b/bin/router_resource.py @@ -50,6 +50,7 @@ from django.db import DataError, IntegrityError from django.forms.models import model_to_dict from django_markup.markup import formatter +from glue2.models import * from resource_v4.models import * from resource_v4.documents import * from resource_v4.process import * @@ -288,7 +289,7 @@ def SaveDaemonStdOut(self, path): file = open(path, 'r') lines = file.read() file.close() - if not re.match("^started with pid \d+$", lines) and not re.match("^$", lines): + if not re.match(r"^started with pid \d+$", lines) and not re.match(r"^$", lines): ts = datetime.strftime(datetime.now(), '%Y-%m-%d_%H:%M:%S') newpath = '{}.{}'.format(path, ts) self.logger.debug('Saving previous daemon stdout to {}'.format(newpath)) @@ -703,14 +704,14 @@ def Write_Glue2_Executable_Software_Globus(self, content, contype, config): # Incoming Glue2 models from Glue2 Router API application_handles = ApplicationHandle.objects.order_by( - '-CreationTime').selected_related() + '-CreationTime').select_related() # Build resourceV4 payload from remote GLUE2 resources (simulate incoming GLUE2 models in router) - payload = generate_payloads(application_handles) + payload = generate_payloads(application_handles, logger=self.logger) # Initiate a Globus Process # Handles ingest, delete_by_query, and update. - globus_process = GlobusProcess() + globus_process = GlobusProcess(config=config) # Process added/removed/updated items in ResourceV4Local table if len(payload["added"]): diff --git a/utils/search.py b/utils/search.py index db093a3..518a61f 100644 --- a/utils/search.py +++ b/utils/search.py @@ -6,18 +6,17 @@ import requests from requests.exceptions import HTTPError -from django.conf import settings from globus_sdk.scopes import SearchScopes logg2 = logging.getLogger("warehouse.logger") class GlobusProcess(): - def __init__(self, *args, **kwargs): + def __init__(self, config, *args, **kwargs): self.app = globus_sdk.ClientApp( "ACCESS-CI Operations Warehouse Globus Service Client", - client_id=settings.GLOBUS_CLIENT_ID, - client_secret=settings.GLOBUS_CLIENT_SECRET + client_id=config.GLOBUS_CLIENT_ID, + client_secret=config.GLOBUS_CLIENT_SECRET ) # Add scope requirements for the Authorizer @@ -40,7 +39,7 @@ def __init__(self, *args, **kwargs): self.search_client = globus_sdk.SearchClient( app=self.app, app_scopes=[SearchScopes.all] ) - self.search_endpoint = settings.GLOBUS_SEARCH_INDEX_ID + self.search_endpoint = config.GLOBUS_SEARCH_INDEX_ID def ingest(self, gmeta_list): for i, (batch, size_bytes) in enumerate( diff --git a/utils/utility.py b/utils/utility.py index 0a8b896..83015a6 100644 --- a/utils/utility.py +++ b/utils/utility.py @@ -1,5 +1,9 @@ -def generate_payloads(application_handles): +def generate_payloads(application_handles, logger): import hashlib + + from cider_models import * + from resource_v4.models import * + def chunk_dict(data_dict, chunk_size): for i in range(0, len(data_dict), chunk_size): @@ -16,6 +20,9 @@ def chunk_dict(data_dict, chunk_size): cider = CiderInfrastructure.objects.filter( info_resourceid=application.ResourceID ).first() + if not cider: + logger.warning(f"No CiderInfrastructure found for ResourceID {application.ResourceID}") + continue environment_id = application.ID environment_id_utf8 = environment_id.encode('utf-8') environment_id_hash = f"urn:ogf.org:glue2:access-ci.org:executable.software:{hashlib.md5(environment_id_utf8).hexdigest()}"