diff --git a/.gitignore b/.gitignore index b3b0c4b..17b0846 100644 --- a/.gitignore +++ b/.gitignore @@ -135,3 +135,5 @@ dmypy.json # Mac .DS_Store + +.vscode/ \ No newline at end of file diff --git a/bin/router_resource.py b/bin/router_resource.py index f834c66..4d70933 100755 --- a/bin/router_resource.py +++ b/bin/router_resource.py @@ -45,10 +45,12 @@ import django django.setup() +import requests from django.conf import settings as django_settings from django.db import DataError, IntegrityError from django.forms.models import model_to_dict from django_markup.markup import formatter +from glue2.models import * from resource_v4.models import * from resource_v4.documents import * from resource_v4.process import * @@ -287,7 +289,7 @@ def SaveDaemonStdOut(self, path): file = open(path, 'r') lines = file.read() file.close() - if not re.match("^started with pid \d+$", lines) and not re.match("^$", lines): + if not re.match(r"^started with pid \d+$", lines) and not re.match(r"^$", lines): ts = datetime.strftime(datetime.now(), '%Y-%m-%d_%H:%M:%S') newpath = '{}.{}'.format(path, ts) self.logger.debug('Saving previous daemon stdout to {}'.format(newpath)) @@ -574,8 +576,6 @@ def Write_RSP_Support_Providers(self, content, contype, config): self.PROCESSING_SECONDS[me] += (datetime.now(timezone.utc) - start_utc).total_seconds() self.Log_STEP(me) return(0, '') - - #################################### def Write_Glue2_Network_Service(self, content, contype, config): @@ -696,6 +696,106 @@ def Write_Glue2_Network_Service(self, content, contype, config): self.Log_STEP(me) return(0, '') + #################################### + ### Globus handlers ### + def Write_Glue2_Executable_Software_Globus(self, content, contype, config): + from utils.search import GlobusProcess + from utils.utility import generate_payloads + + # Incoming Glue2 models from Glue2 Router API + application_handles = ApplicationHandle.objects.order_by( + '-CreationTime').select_related() + + # Build resourceV4 payload from remote GLUE2 resources (simulate incoming GLUE2 models in router) + payload = generate_payloads(application_handles, logger=self.logger) + + # Initiate a Globus Process + # Handles ingest, delete_by_query, and update. + globus_process = GlobusProcess(config=config) + + # Process added/removed/updated items in ResourceV4Local table + if len(payload["added"]): + # Ingest new items into Globus Search Index + gmeta_list = { + "ingest_type": "GMetaList", + "ingest_data": { + "gmeta": [] + } + } + for item in payload["added"]: + gmeta_entry = { + "subject": item["ID"], + "visible_to": ["public"], + "content": item["EntityJSON"] + } + gmeta_list["ingest_data"]["gmeta"].append(gmeta_entry) + globus_process.ingest(gmeta_list=gmeta_list) + + # Bulk create new ResourceV4Local entries for added items in PostgreSQL + resource_v4_local_added = [ + ResourceV4Local(**item) for item in payload["added"] + ] + try: + ResourceV4Local.objects.bulk_create(resource_v4_local_added) + except Exception as err: + self.logger.warning(err) + return {"error": str(err)} + + if len(payload["removed"]): + # Delete items from Globus Search Index by querying on the ID field + globus_process.delete_by_query(payload["removed"]) + + # Delete items from ResourceV4Local table in PostgreSQL + resource_v4_local_removed = ResourceV4Local.objects.filter( + LocalID__in=payload["removed"] + ) + try: + resource_v4_local_removed.delete() + except Exception as err: + self.logger.warning(err) + return {"error": str(err)} + + if len(payload["updated"]): + # Ingest updated items into Globus Search Index + gmeta_list = { + "ingest_type": "GMetaList", + "ingest_data": { + "gmeta": [] + } + } + + for item in payload["updated"]: + try: + resource = ResourceV4Local.objects.get(LocalID=item["LocalID"]) + for key, value in item["changes"].items(): + if key == "EntityJSON": + for entity_json_key, entity_json_value in value.items(): + resource.EntityJSON[entity_json_key] = entity_json_value + else: + setattr(resource, key, value) + resource.save() + + gmeta_entry = { + "subject": resource.ID, + "visible_to": ["public"], + "content": resource.EntityJSON + } + gmeta_list["ingest_data"]["gmeta"].append(gmeta_entry) + except Exception as err: + self.logger.warning(err) + return {"error": str(err)} + globus_process.ingest(gmeta_list=gmeta_list) + + payload_summary = { + "run_date": datetime.now(timezone.utc).isoformat(), + "added": len(payload["added"]), + "removed": len(payload["removed"]), + "updated": len(payload["updated"]), + } + self.logger.info(payload_summary) + return [payload_summary] + ### End Globus handlers ### + #################################### #################################### def Write_Glue2_Executable_Software(self, content, contype, config): @@ -842,7 +942,6 @@ def Write_Glue2_Executable_Software(self, content, contype, config): self.Log_STEP(me) return(0, '') - ##################################################################### # Function for loading CIDER (CyberInfrastructure Description Repository) data # Load CIDER's organization data to ResourceV4 tables (local, standard) diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/search.py b/utils/search.py new file mode 100644 index 0000000..518a61f --- /dev/null +++ b/utils/search.py @@ -0,0 +1,187 @@ +import json +from typing import Iterable, List, Tuple, Any + +import globus_sdk +import logging +import requests +from requests.exceptions import HTTPError + +from globus_sdk.scopes import SearchScopes + +logg2 = logging.getLogger("warehouse.logger") + + +class GlobusProcess(): + def __init__(self, config, *args, **kwargs): + self.app = globus_sdk.ClientApp( + "ACCESS-CI Operations Warehouse Globus Service Client", + client_id=config.GLOBUS_CLIENT_ID, + client_secret=config.GLOBUS_CLIENT_SECRET + ) + + # Add scope requirements for the Authorizer + self.app.add_scope_requirements({ + "search.api.globus.org": [SearchScopes.all] + }) + + # Get the authorization header for the Search PUT requests + # This is for the update_by_subject method which uses the + # Search API directly via requests. + + # The update_by_subject method is not currently used, but may + # be useful for future reference. + authorizer = self.app.get_authorizer("search.api.globus.org") + self.authorization_header = authorizer.get_authorization_header() + + # Initialize the SearchClient with the app and required scopes + # For ingest and delete_by_query operations we can use the SearchClient + # directly. + self.search_client = globus_sdk.SearchClient( + app=self.app, app_scopes=[SearchScopes.all] + ) + self.search_endpoint = config.GLOBUS_SEARCH_INDEX_ID + + def ingest(self, gmeta_list): + for i, (batch, size_bytes) in enumerate( + self.chunk_by_size( + gmeta_list["ingest_data"]["gmeta"], + max_chunk_kb=8000, + safety_ratio=0.9 + ) + ): + logg2.info( + f"Batch {i}: " + f"{len(batch)} items | " + f"{size_bytes} bytes | " + ) + try: + self.search_client.ingest( + self.search_endpoint, + { + "ingest_type": "GMetaList", + "ingest_data": { + "gmeta": batch + } + } + ) + except globus_sdk.SearchAPIError as err: + logg2.warning({"error": str(err), "status": "400"}) + return {"error": str(err), "status": "400"} + logg2.info( + f""" + Successfully ingested {len(gmeta_list['ingest_data']['gmeta'])} + items in {i+1} batches. + """ + ) + + def delete_by_query(self, local_ids): + try: + self.search_client.delete_by_query( + self.search_endpoint, + { + "@version": "delete_by_query#1.0.0", + "filters": [ + { + "type": "match_any", + "field_name": "LocalID", + "values": local_ids, + } + ], + } + ) + except globus_sdk.SearchAPIError as err: + logg2.warning({"error": str(err), "status": "400"}) + return {"error": str(err), "status": "400"} + logg2.info(f"Successfully deleted {len(local_ids)} items") + return {} + + # Function is not currently used, but may be useful for + # future reference for future reference. + def update_by_subject(self, gmeta_list): + headers = { + "Authorization": self.authorization_header, + "Content-Type": "application/json" + } + url = f"https://search.api.globus.org/v1/index/{self.search_endpoint}/entry" + + for gmeta_entry in gmeta_list["ingest_data"]["gmeta"]: + payload = { + "subject": gmeta_entry["subject"], + "visible_to": gmeta_entry["visible_to"], + "content": gmeta_entry["content"] + } + + # There is no built-in method in the SearchClient for + # updating by subject, so we need to handle the auth + # header ourselves. + try: + response = requests.put(url, json=payload, headers=headers) + response.raise_for_status() + except HTTPError as http_err: + logg2.warning(f'HTTP error occurred: {http_err}') + except Exception as err: + logg2.warning(response.json()) + logg2.warning(f'Other error occurred: {err}') + + logg2.info( + f""" + Successfully updated + {len(gmeta_list['ingest_data']['gmeta'])} items + """ + ) + + # Utility functions + def chunk_by_size( + self, + items: Iterable[Any], + max_chunk_kb: int = 10000, + safety_ratio: float = 1.0, + ) -> Iterable[Tuple[List[Any], int]]: + """ + Yield (chunk, size_bytes) where each chunk's serialized JSON size + does not exceed max_chunk_kb * safety_ratio. + + Args: + items: Iterable of JSON-serializable objects + max_chunk_kb: Maximum chunk size in kilobytes (default 10,000 KB) + safety_ratio: Optional safety factor (e.g. 0.8 for 80%) + + Yields: + Tuple of: + - List of items + - Serialized size in bytes + """ + max_bytes = int(max_chunk_kb * 1024 * safety_ratio) + + chunk: List[Any] = [] + current_size = 2 # accounts for opening + closing brackets [] + + for item in items: + item_bytes = json.dumps( + item, + separators=(",", ":"), + ensure_ascii=False, + ).encode("utf-8") + + item_size = len(item_bytes) + + if item_size > max_bytes: + raise ValueError( + f"Single item size ({item_size} bytes) exceeds max chunk size ({max_bytes} bytes)" + ) + + additional_size = item_size + if chunk: + additional_size += 1 # comma separator + + if chunk and current_size + additional_size > max_bytes: + yield chunk, current_size + chunk = [] + current_size = 2 + additional_size = item_size + + chunk.append(item) + current_size += additional_size + + if chunk: + yield chunk, current_size diff --git a/utils/utility.py b/utils/utility.py new file mode 100644 index 0000000..83015a6 --- /dev/null +++ b/utils/utility.py @@ -0,0 +1,202 @@ +def generate_payloads(application_handles, logger): + import hashlib + + from cider_models import * + from resource_v4.models import * + + + def chunk_dict(data_dict, chunk_size): + for i in range(0, len(data_dict), chunk_size): + yield data_dict[i:i + chunk_size] + + # Current list of resource_v4_local entries in DB + resource_v4_local_list_new = [] + resource_v4_local_list_old = ResourceV4Local.objects.filter( + Affiliation__exact="access-ci.org" + ).order_by("-CreationTime") + + for chunk in chunk_dict(application_handles, 250): + for application in chunk: + cider = CiderInfrastructure.objects.filter( + info_resourceid=application.ResourceID + ).first() + if not cider: + logger.warning(f"No CiderInfrastructure found for ResourceID {application.ResourceID}") + continue + environment_id = application.ID + environment_id_utf8 = environment_id.encode('utf-8') + environment_id_hash = f"urn:ogf.org:glue2:access-ci.org:executable.software:{hashlib.md5(environment_id_utf8).hexdigest()}" + + validity = application.ApplicationEnvironment.Validity + if validity: + validity = str(validity.total_seconds()) + + # Build EntityJSON for the software entity + entity_json = { + "ID": environment_id_hash, + "Category": application.ApplicationEnvironment.EntityJSON.get("Category", None), + "CreationTime": application.ApplicationEnvironment.CreationTime.isoformat(), + "Default": application.ApplicationEnvironment.EntityJSON.get("Default", True), + "Description": application.ApplicationEnvironment.Description, + "HandleKey": application.Value, + "HandleType": application.Type, + "Keywords": application.ApplicationEnvironment.EntityJSON.get("Keywords", None), + "LocalID": environment_id, + "Name": application.ApplicationEnvironment.AppName, + "SupportContact": application.ApplicationEnvironment.EntityJSON.get("SupportContact", "https://support.access-ci.org/help-ticket"), + "SupportStatus": application.ApplicationEnvironment.EntityJSON.get("SupportStatus", "production"), + "URL": application.ApplicationEnvironment.EntityJSON.get("URL", None), + "Validity": validity, + "Version": application.ApplicationEnvironment.AppVersion, + + # Cider fields + "Info_GroupID": cider.other_attributes.get("groups", [])[0]["info_groupid"] if len(cider.other_attributes.get("groups", [])) else None, + "Info_GroupName": cider.other_attributes.get("groups", [])[0]["name"] if len(cider.other_attributes.get("groups", [])) else None, + "Info_ResourceID": cider.info_resourceid, + "Info_ResourceName": cider.resource_descriptive_name, + "Organization_ID": cider.info_siteid, + "Organization_Name": cider.other_attributes.get("organizations", [])[0]["organization_name"], + } + + # Build ResourceV4Local entry + resource_v4_local_entry = { + "ID": environment_id_hash, + "Affiliation": "access-ci.org", + "CatalogMetaURL": "urn:ogf.org:glue2:access-ci.org:catalog:glue2:executable.software", + "CreationTime": application.ApplicationEnvironment.CreationTime.isoformat(), + "LocalID": environment_id, + "LocalType": "GLUE2 Executable Software", + "LocalURL": f"https://operations-api.access-ci.org/wh2/glue2/v1/software_full/ID/{environment_id}/", + "Validity": validity, + "EntityJSON": entity_json, + } + resource_v4_local_list_new.append(resource_v4_local_entry) + + payload = { + # List of items that were added/removed/updated since last run + # Returns a payload with the above keys + **compare_dict_lists( + resource_v4_local_list_old.values(), + resource_v4_local_list_new, + ), + } + return payload + + +def compare_dict_lists( + old_list, + new_list, + id_key="LocalID", + ignore_fields=None, +): + """ + Compare two lists of dicts with: + - nested field diff output + - dot-notation ignore_fields support + - optimized for large datasets + - returns only the new value in 'changes' + """ + + if ignore_fields is None: + ignore_fields = [ + "CreationTime", + "ID", + "LocalID", + "Validity", + "EntityJSON.CreationTime", + "EntityJSON.ID", + "EntityJSON.LocalID", + "EntityJSON.Validity", + ] + + ignore_fields = set(ignore_fields) + + # ----------------------------- + # Fast ignore matcher + # ----------------------------- + def should_ignore(path): + return any(path == f or path.startswith(f + ".") for f in ignore_fields) + + # ----------------------------- + # Merge a change into a nested dictionary + # ----------------------------- + def merge_change(d, path_list, value): + key = path_list[0] + if len(path_list) == 1: + d[key] = value # only new value + else: + if key not in d: + d[key] = {} + merge_change(d[key], path_list[1:], value) + + # ----------------------------- + # Recursive diff engine + # ----------------------------- + def deep_diff_nested(old, new, path="", changes=None): + if changes is None: + changes = {} + + if old is new: + return changes + + if type(old) != type(new): + merge_change(changes, path.split(".") if path else [], new) + return changes + + if isinstance(old, dict): + all_keys = old.keys() | new.keys() + for key in all_keys: + current_path = f"{path}.{key}" if path else key + + if should_ignore(current_path): + continue + + if key not in old: + merge_change(changes, current_path.split("."), new[key]) + elif key not in new: + merge_change(changes, current_path.split("."), None) + else: + deep_diff_nested(old[key], new[key], current_path, changes) + + return changes + + if isinstance(old, list): + if len(old) != len(new): + merge_change(changes, path.split(".") if path else [], new) + return changes + for idx, (o, n) in enumerate(zip(old, new)): + deep_diff_nested(o, n, f"{path}[{idx}]", changes) + return changes + + # Primitive values + if old != new: + merge_change(changes, path.split(".") if path else [], new) + + return changes + + # ----------------------------- + # Lookup dicts for fast O(n) matching + # ----------------------------- + old_dict = {item[id_key]: item for item in old_list} + new_dict = {item[id_key]: item for item in new_list} + + old_ids = set(old_dict) + new_ids = set(new_dict) + + added = [new_dict[_id] for _id in new_ids - old_ids] + removed = list(old_ids - new_ids) + + updated = [] + for _id in old_ids & new_ids: + changes = deep_diff_nested(old_dict[_id], new_dict[_id]) + if changes: + updated.append({ + "LocalID": _id, + "changes": changes + }) + + return { + "added": added, + "removed": removed, + "updated": updated + }