Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,5 @@ dmypy.json

# Mac
.DS_Store

.vscode/
107 changes: 103 additions & 4 deletions bin/router_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@

import django
django.setup()
import requests
from django.conf import settings as django_settings
from django.db import DataError, IntegrityError
from django.forms.models import model_to_dict
from django_markup.markup import formatter
from glue2.models import *
from resource_v4.models import *
from resource_v4.documents import *
from resource_v4.process import *
Expand Down Expand Up @@ -287,7 +289,7 @@ def SaveDaemonStdOut(self, path):
file = open(path, 'r')
lines = file.read()
file.close()
if not re.match("^started with pid \d+$", lines) and not re.match("^$", lines):
if not re.match(r"^started with pid \d+$", lines) and not re.match(r"^$", lines):
ts = datetime.strftime(datetime.now(), '%Y-%m-%d_%H:%M:%S')
newpath = '{}.{}'.format(path, ts)
self.logger.debug('Saving previous daemon stdout to {}'.format(newpath))
Expand Down Expand Up @@ -574,8 +576,6 @@ def Write_RSP_Support_Providers(self, content, contype, config):
self.PROCESSING_SECONDS[me] += (datetime.now(timezone.utc) - start_utc).total_seconds()
self.Log_STEP(me)
return(0, '')



####################################
def Write_Glue2_Network_Service(self, content, contype, config):
Expand Down Expand Up @@ -696,6 +696,106 @@ def Write_Glue2_Network_Service(self, content, contype, config):
self.Log_STEP(me)
return(0, '')

####################################
### Globus handlers ###
def Write_Glue2_Executable_Software_Globus(self, content, contype, config):
from utils.search import GlobusProcess
from utils.utility import generate_payloads

# Incoming Glue2 models from Glue2 Router API
application_handles = ApplicationHandle.objects.order_by(
'-CreationTime').select_related()

# Build resourceV4 payload from remote GLUE2 resources (simulate incoming GLUE2 models in router)
payload = generate_payloads(application_handles, logger=self.logger)

# Initiate a Globus Process
# Handles ingest, delete_by_query, and update.
globus_process = GlobusProcess(config=config)

# Process added/removed/updated items in ResourceV4Local table
if len(payload["added"]):
# Ingest new items into Globus Search Index
gmeta_list = {
"ingest_type": "GMetaList",
"ingest_data": {
"gmeta": []
}
}
for item in payload["added"]:
gmeta_entry = {
"subject": item["ID"],
"visible_to": ["public"],
"content": item["EntityJSON"]
}
gmeta_list["ingest_data"]["gmeta"].append(gmeta_entry)
globus_process.ingest(gmeta_list=gmeta_list)

# Bulk create new ResourceV4Local entries for added items in PostgreSQL
resource_v4_local_added = [
ResourceV4Local(**item) for item in payload["added"]
]
try:
ResourceV4Local.objects.bulk_create(resource_v4_local_added)
except Exception as err:
self.logger.warning(err)
return {"error": str(err)}

if len(payload["removed"]):
# Delete items from Globus Search Index by querying on the ID field
globus_process.delete_by_query(payload["removed"])

# Delete items from ResourceV4Local table in PostgreSQL
resource_v4_local_removed = ResourceV4Local.objects.filter(
LocalID__in=payload["removed"]
)
try:
resource_v4_local_removed.delete()
except Exception as err:
self.logger.warning(err)
return {"error": str(err)}

if len(payload["updated"]):
# Ingest updated items into Globus Search Index
gmeta_list = {
"ingest_type": "GMetaList",
"ingest_data": {
"gmeta": []
}
}

for item in payload["updated"]:
try:
resource = ResourceV4Local.objects.get(LocalID=item["LocalID"])
for key, value in item["changes"].items():
if key == "EntityJSON":
for entity_json_key, entity_json_value in value.items():
resource.EntityJSON[entity_json_key] = entity_json_value
else:
setattr(resource, key, value)
resource.save()

gmeta_entry = {
"subject": resource.ID,
"visible_to": ["public"],
"content": resource.EntityJSON
}
gmeta_list["ingest_data"]["gmeta"].append(gmeta_entry)
except Exception as err:
self.logger.warning(err)
return {"error": str(err)}
globus_process.ingest(gmeta_list=gmeta_list)

payload_summary = {
"run_date": datetime.now(timezone.utc).isoformat(),
"added": len(payload["added"]),
"removed": len(payload["removed"]),
"updated": len(payload["updated"]),
}
self.logger.info(payload_summary)
return [payload_summary]
### End Globus handlers ###
####################################

####################################
def Write_Glue2_Executable_Software(self, content, contype, config):
Expand Down Expand Up @@ -842,7 +942,6 @@ def Write_Glue2_Executable_Software(self, content, contype, config):
self.Log_STEP(me)
return(0, '')


#####################################################################
# Function for loading CIDER (CyberInfrastructure Description Repository) data
# Load CIDER's organization data to ResourceV4 tables (local, standard)
Expand Down
Empty file added utils/__init__.py
Empty file.
187 changes: 187 additions & 0 deletions utils/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import json
from typing import Iterable, List, Tuple, Any

import globus_sdk
import logging
import requests
from requests.exceptions import HTTPError

from globus_sdk.scopes import SearchScopes

logg2 = logging.getLogger("warehouse.logger")


class GlobusProcess():
def __init__(self, config, *args, **kwargs):
self.app = globus_sdk.ClientApp(
"ACCESS-CI Operations Warehouse Globus Service Client",
client_id=config.GLOBUS_CLIENT_ID,
client_secret=config.GLOBUS_CLIENT_SECRET
)

# Add scope requirements for the Authorizer
self.app.add_scope_requirements({
"search.api.globus.org": [SearchScopes.all]
})

# Get the authorization header for the Search PUT requests
# This is for the update_by_subject method which uses the
# Search API directly via requests.

# The update_by_subject method is not currently used, but may
# be useful for future reference.
authorizer = self.app.get_authorizer("search.api.globus.org")
self.authorization_header = authorizer.get_authorization_header()

# Initialize the SearchClient with the app and required scopes
# For ingest and delete_by_query operations we can use the SearchClient
# directly.
self.search_client = globus_sdk.SearchClient(
app=self.app, app_scopes=[SearchScopes.all]
)
self.search_endpoint = config.GLOBUS_SEARCH_INDEX_ID

def ingest(self, gmeta_list):
for i, (batch, size_bytes) in enumerate(
self.chunk_by_size(
gmeta_list["ingest_data"]["gmeta"],
max_chunk_kb=8000,
safety_ratio=0.9
)
):
logg2.info(
f"Batch {i}: "
f"{len(batch)} items | "
f"{size_bytes} bytes | "
)
try:
self.search_client.ingest(
self.search_endpoint,
{
"ingest_type": "GMetaList",
"ingest_data": {
"gmeta": batch
}
}
)
except globus_sdk.SearchAPIError as err:
logg2.warning({"error": str(err), "status": "400"})
return {"error": str(err), "status": "400"}
logg2.info(
f"""
Successfully ingested {len(gmeta_list['ingest_data']['gmeta'])}
items in {i+1} batches.
"""
)

def delete_by_query(self, local_ids):
try:
self.search_client.delete_by_query(
self.search_endpoint,
{
"@version": "delete_by_query#1.0.0",
"filters": [
{
"type": "match_any",
"field_name": "LocalID",
"values": local_ids,
}
],
}
)
except globus_sdk.SearchAPIError as err:
logg2.warning({"error": str(err), "status": "400"})
return {"error": str(err), "status": "400"}
logg2.info(f"Successfully deleted {len(local_ids)} items")
return {}

# Function is not currently used, but may be useful for
# future reference for future reference.
def update_by_subject(self, gmeta_list):
headers = {
"Authorization": self.authorization_header,
"Content-Type": "application/json"
}
url = f"https://search.api.globus.org/v1/index/{self.search_endpoint}/entry"

for gmeta_entry in gmeta_list["ingest_data"]["gmeta"]:
payload = {
"subject": gmeta_entry["subject"],
"visible_to": gmeta_entry["visible_to"],
"content": gmeta_entry["content"]
}

# There is no built-in method in the SearchClient for
# updating by subject, so we need to handle the auth
# header ourselves.
try:
response = requests.put(url, json=payload, headers=headers)
response.raise_for_status()
except HTTPError as http_err:
logg2.warning(f'HTTP error occurred: {http_err}')
except Exception as err:
logg2.warning(response.json())
logg2.warning(f'Other error occurred: {err}')

logg2.info(
f"""
Successfully updated
{len(gmeta_list['ingest_data']['gmeta'])} items
"""
)

# Utility functions
def chunk_by_size(
self,
items: Iterable[Any],
max_chunk_kb: int = 10000,
safety_ratio: float = 1.0,
) -> Iterable[Tuple[List[Any], int]]:
"""
Yield (chunk, size_bytes) where each chunk's serialized JSON size
does not exceed max_chunk_kb * safety_ratio.

Args:
items: Iterable of JSON-serializable objects
max_chunk_kb: Maximum chunk size in kilobytes (default 10,000 KB)
safety_ratio: Optional safety factor (e.g. 0.8 for 80%)

Yields:
Tuple of:
- List of items
- Serialized size in bytes
"""
max_bytes = int(max_chunk_kb * 1024 * safety_ratio)

chunk: List[Any] = []
current_size = 2 # accounts for opening + closing brackets []

for item in items:
item_bytes = json.dumps(
item,
separators=(",", ":"),
ensure_ascii=False,
).encode("utf-8")

item_size = len(item_bytes)

if item_size > max_bytes:
raise ValueError(
f"Single item size ({item_size} bytes) exceeds max chunk size ({max_bytes} bytes)"
)

additional_size = item_size
if chunk:
additional_size += 1 # comma separator

if chunk and current_size + additional_size > max_bytes:
yield chunk, current_size
chunk = []
current_size = 2
additional_size = item_size

chunk.append(item)
current_size += additional_size

if chunk:
yield chunk, current_size
Loading