Skip to content

ProofPointPS New actions - Release Quarantined email, Forward Quarantined email and Download Quarantined email#982

Open
Sahithya1357 wants to merge 2 commits into
mainfrom
proofpointps_new_actions
Open

ProofPointPS New actions - Release Quarantined email, Forward Quarantined email and Download Quarantined email#982
Sahithya1357 wants to merge 2 commits into
mainfrom
proofpointps_new_actions

Conversation

@Sahithya1357

@Sahithya1357 Sahithya1357 commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Proofpoint PS FR for new actions.

Bug ID: b/509660833

Checklist:

Please ensure you have completed the following items before submitting your PR.
This helps us review your contribution faster and more efficiently.

General Checks:

  • I have read and followed the project's contributing.md guide.
  • My code follows the project's coding style guidelines.
  • I have performed a self-review of my own code.
  • My changes do not introduce any new warnings.
  • My changes pass all existing tests.
  • I have added new tests where appropriate to cover my changes. (If applicable)
  • I have updated the documentation where necessary (e.g., README, API docs). (If applicable)

Open-Source Specific Checks:

  • My changes do not introduce any Personally Identifiable Information (PII) or sensitive customer data.
  • My changes do not expose any internal-only code examples, configurations, or URLs.
  • All code examples, comments, and messages are generic and suitable for a public repository.
  • I understand that any internal context or sensitive details related to this work are handled separately in internal systems (Buganizer for Google team members).

For Google Team Members and Reviewers Only:

  • I have included the Buganizer ID in the PR title or description (e.g., "Internal Buganizer ID: 123456789" or "Related Buganizer: go/buganizer/123456789").
  • I have ensured that all internal discussions and PII related to this work remain in Buganizer.
  • I have tagged the PR with one or more labels that reflect the pull request purpose.

@Sahithya1357 Sahithya1357 requested a review from a team as a code owner June 30, 2026 12:01
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Proofpoint Email Protection integration by introducing new management actions for quarantined emails and refactoring the core API client architecture. These changes improve maintainability, error handling, and testability while expanding the functional capabilities of the integration.

Highlights

  • New Actions Added: Added three new actions: Release Quarantined Email, Forward Quarantined Email, and Download Quarantined Email.
  • Refactored Core Logic: Replaced the legacy ProofPointPSManager with a more robust ProofPointPSApiClient and introduced dedicated modules for authentication, data parsing, and error handling.
  • Cleanup and Modernization: Removed legacy actions (EnrichEntities, Ping) and updated the integration to use modern TIPCommon utilities and improved project configuration.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions

Copy link
Copy Markdown

Marketplace Validation Failed

Click to view the full report

Validation Report

🧩 Integrations

proof_point_ps

Validation Name Details
⚠️ Integration Version Bump The project.toml Version must be incremented by exactly 1.0.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces several new actions for the ProofPointPS integration, including 'Download Quarantined Email', 'Forward Quarantined Email', and 'Release Quarantined Email', along with their corresponding YAML definitions and unit tests. The implementation includes a new API client and authentication module to support these operations. My review identified that the batch processing logic in the new actions should be refactored to handle partial successes rather than failing the entire batch on a single error. Additionally, I noted that some utility functions are currently unused and should be removed, and that docstrings require updates to comply with the repository's exception documentation standards.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +178 to +223
if missing_guids:
raise ProofPointPSError(
"The following message guids were not found in Proofpoint: "
f"{', '.join(list(set(missing_guids)))}."
)

if existing_file_guids or failed_guids_errors:
error_messages = []
if existing_file_guids:
guids_str = ", ".join([g[0] for g in existing_file_guids])
if len(existing_file_guids) == 1:
error_messages.append(
f"GUID {existing_file_guids[0][0]} failed during execution. "
f"Error: File '{existing_file_guids[0][1]}' already exists. "
"Please change the path or set parameter 'Overwrite' to True."
)
else:
error_messages.append(
f"GUIDs {guids_str} failed during execution. "
"Error: File already exists. "
"Please change the path or set parameter 'Overwrite' to True."
)
for guid, err in failed_guids_errors:
error_messages.append(
f"GUID {guid} failed during execution. Error: {err}"
)

raise ProofPointPSError(
f"Failed to download quarantined email(s): {' '.join(error_messages)}"
)

if self.params.save_to_case_wall:
for guid, target_file_path in downloaded_files:
self.soar_action.add_attachment(
file_path=str(target_file_path),
description=(
f"Quarantined email raw content for Message GUID {guid}."
),
)

self.json_results = {"success": successful_records}
self.result_value = True
self.output_message = (
"Successfully downloaded quarantined email raw content "
f"for Message GUID(s): {', '.join(successful_guids)}."
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation uses an all-or-nothing approach for batch processing. If any GUID is not found, a file already exists, or a download fails, the entire action is halted by raising an exception. This prevents the reporting of any successful downloads that may have occurred.

For a better user experience in batch operations, I recommend refactoring the logic to handle partial success. The action should attempt to process all GUIDs, and then provide a final summary of which operations succeeded and which failed, along with the reasons for failure.

        output_messages = []
        self.result_value = False

        if successful_guids:
            self.result_value = True
            self.json_results = {"success": successful_records}
            output_messages.append(
                "Successfully downloaded quarantined email raw content "
                f"for Message GUID(s): {', '.join(successful_guids)}."
            )
            if self.params.save_to_case_wall:
                for guid, target_file_path in downloaded_files:
                    self.soar_action.add_attachment(
                        file_path=str(target_file_path),
                        description=(
                            f"Quarantined email raw content for Message GUID {guid}."
                        ),
                    )

        error_list = []
        if missing_guids:
            error_list.append(
                "The following message guids were not found in Proofpoint: "
                f"{', '.join(list(set(missing_guids)))}"
            )

        if existing_file_guids:
            guids_str = ", ".join([g[0] for g in existing_file_guids])
            error_list.append(
                f"GUIDs {guids_str} failed as files already exist. "
                "Set 'Overwrite' to True to allow overwriting."
            )

        for guid, err in failed_guids_errors:
            error_list.append(f"GUID {guid} failed during download. Error: {err}")

        if error_list:
            output_messages.append(f"Failures: {' '.join(error_list)}")

        self.output_message = "\n".join(output_messages)

        if not self.result_value:
            self.execution_state = self.soar_action.EXECUTION_STATE_FAILED

Comment on lines +102 to +134
successful_records = []
successful_guids = []

records_map = {r.guid: r for r in records if r.guid}
records_map.update({r.localguid: r for r in records if r.localguid})

for guid in guids:
try:
self.api_client.execute_quarantine_action(
action="forward",
folder=folder_name,
localguid=guid,
deletedfolder=deleted_folder,
subject=self.params.new_subject,
appendoldsubject=self.params.append_old_subject,
from_address=self.params.from_address,
headerfrom=self.params.header_from,
to_address=self.params.to_address,
comment=self.params.comment,
)
record = records_map.get(guid)
if record:
successful_records.append(record.to_json())
successful_guids.append(guid)
except ProofPointPSHTTPError as e:
if "deletedfolder" in str(e):
raise ProofPointPSError(
f"Deleted folder '{deleted_folder}' does not exist."
)
raise ProofPointPSError(
f"Failed to forward quarantined email(s): GUID {guid} failed during execution. "
f"Error: {e}."
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This action processes a batch of GUIDs but raises an exception and stops on the first failure. This prevents the rest of the GUIDs from being processed. For a better user experience, the action should attempt to process all GUIDs, and then report a consolidated summary of successful and failed operations.

        successful_records = []
        successful_guids = []
        failed_guids = {}

        records_map = {r.guid: r for r in records if r.guid}
        records_map.update({r.localguid: r for r in records if r.localguid})

        for guid in guids:
            try:
                self.api_client.execute_quarantine_action(
                    action="forward",
                    folder=folder_name,
                    localguid=guid,
                    deletedfolder=deleted_folder,
                    subject=self.params.new_subject,
                    appendoldsubject=self.params.append_old_subject,
                    from_address=self.params.from_address,
                    headerfrom=self.params.header_from,
                    to_address=self.params.to_address,
                    comment=self.params.comment,
                )
                record = records_map.get(guid)
                if record:
                    successful_records.append(record.to_json())
                successful_guids.append(guid)
            except ProofPointPSHTTPError as e:
                if "deletedfolder" in str(e):
                    raise ProofPointPSError(
                        f"Deleted folder '{deleted_folder}' does not exist."
                    )
                failed_guids[guid] = str(e)

        output_messages = []
        if successful_guids:
            self.result_value = True
            self.json_results = {"success": successful_records}
            output_messages.append(
                f"Successfully forwarded quarantined email(s): {', '.join(successful_guids)}."
            )

        if failed_guids:
            failures_str = '; '.join([f"GUID {g}: {e}" for g, e in failed_guids.items()])
            output_messages.append(f"Failed to forward the following GUIDs: {failures_str}")

        if not output_messages:
            self.output_message = "No emails were forwarded."
            self.result_value = True
        else:
            self.output_message = "\n".join(output_messages)

        if not successful_guids and failed_guids:
            self.result_value = False
            self.execution_state = self.soar_action.EXECUTION_STATE_FAILED

Comment on lines +90 to +120
successful_records = []
successful_guids = []

records_map = {r.guid: r for r in records if r.guid}
records_map.update({r.localguid: r for r in records if r.localguid})

for guid in guids:
try:
self.api_client.execute_quarantine_action(
action="release",
folder=folder_name,
localguid=guid,
deletedfolder=deleted_folder,
scan=self.params.scan,
brandtemplate=self.params.brand_template,
securitypolicy=self.params.security_policy,
)
record = records_map.get(guid)
if record:
successful_records.append(record.to_json())
successful_guids.append(guid)
except ProofPointPSHTTPError as e:
if "deletedfolder" in str(e):
raise ProofPointPSError(
f"Deleted folder '{deleted_folder}' does not exist."
)

raise ProofPointPSError(
f"Failed to release quarantined email(s): GUID {guid} failed during execution. "
f"Error: {e}."
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This action processes a batch of GUIDs but raises an exception and stops on the first failure. This prevents the rest of the GUIDs from being processed. For a better user experience, the action should attempt to process all GUIDs, and then report a consolidated summary of successful and failed operations.

        successful_records = []
        successful_guids = []
        failed_guids = {}

        records_map = {r.guid: r for r in records if r.guid}
        records_map.update({r.localguid: r for r in records if r.localguid})

        for guid in guids:
            try:
                self.api_client.execute_quarantine_action(
                    action="release",
                    folder=folder_name,
                    localguid=guid,
                    deletedfolder=deleted_folder,
                    scan=self.params.scan,
                    brandtemplate=self.params.brand_template,
                    securitypolicy=self.params.security_policy,
                )
                record = records_map.get(guid)
                if record:
                    successful_records.append(record.to_json())
                successful_guids.append(guid)
            except ProofPointPSHTTPError as e:
                if "deletedfolder" in str(e):
                    raise ProofPointPSError(
                        f"Deleted folder '{deleted_folder}' does not exist."
                    )
                failed_guids[guid] = str(e)

        output_messages = []
        if successful_guids:
            self.result_value = True
            self.json_results = {"success": successful_records}
            output_messages.append(
                f"Successfully released quarantined email(s): {', '.join(successful_guids)}."
            )

        if failed_guids:
            failures_str = '; '.join([f"GUID {g}: {e}" for g, e in failed_guids.items()])
            output_messages.append(f"Failed to release the following GUIDs: {failures_str}")

        if not output_messages:
            self.output_message = "No emails were released."
            self.result_value = True
        else:
            self.output_message = "\n".join(output_messages)

        if not successful_guids and failed_guids:
            self.result_value = False
            self.execution_state = self.soar_action.EXECUTION_STATE_FAILED

Comment on lines +95 to +96
except Exception:
return "Email"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a generic Exception can hide underlying issues and make debugging difficult. It's better to catch more specific exceptions. If a broad exception is necessary as a fallback, the error should be logged to provide visibility into what went wrong.

Suggested change
except Exception:
return "Email"
except Exception as e:
self.soar_action.LOGGER.warning(f"Failed to parse email subject: {e}")
return "Email"

Comment on lines +186 to +218
def get_record_by_guid(
self,
guid: str,
folder: str | None = None,
sender: str | None = "*",
) -> QuarantineRecord | None:
"""Retrieve a quarantined message record by GUID and optional folder constraint.

Args:
guid: The Message GUID to search for.
folder: Optional folder name to search in.
sender: Optional sender email address to search by.

Returns:
The QuarantineRecord if found, None otherwise.

"""
start_date = (
datetime.datetime.utcnow() - datetime.timedelta(days=30)
).strftime(TIME_FORMAT)
end_date = datetime.datetime.utcnow().strftime(TIME_FORMAT)

records = self.search(
sender=sender or "*",
folder=folder,
start_date=start_date,
end_date=end_date,
)
for r in records:
if r.guid == guid or r.localguid == guid:
return r

return None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The method get_record_by_guid does not appear to be used by any of the actions in this pull request. If it is not used by other actions in the integration, it should be removed to reduce dead code and improve maintainability.

Comment on lines +44 to +154
def parse_iso8601_to_utc_datetime(time_str: str) -> datetime:
"""Parse ISO8601 string into a UTC naive datetime.

Args:
time_str: The datetime string to parse.

Returns:
A datetime object in UTC timezone without tzinfo.

Raises:
ValueError: If the string cannot be parsed.

"""
time_str = time_str.strip()
if time_str.endswith("Z"):
time_str = time_str[:-1] + "+00:00"

try:
datetime_obj = datetime.fromisoformat(time_str)
except ValueError as exc:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
try:
datetime_obj = datetime.strptime(time_str, fmt)
break
except ValueError:
continue
else:
msg = f"Invalid datetime format: {time_str}"
raise ValueError(msg) from exc

if datetime_obj.tzinfo is not None:
datetime_obj = datetime_obj.astimezone(UTC).replace(tzinfo=None)
return datetime_obj


def calculate_time_range(
time_frame: str,
start_time_str: str | None = None,
end_time_str: str | None = None,
) -> tuple[datetime, datetime]:
"""Calculate start/end datetimes based on timeframe and custom range.

Args:
time_frame: The lookback period choice.
start_time_str: Custom start time string.
end_time_str: Custom end time string.

Returns:
A tuple of start and end naive UTC datetime objects.

Raises:
InvalidParameterError: If the parameters are invalid.

"""
now = datetime.now(UTC)

if time_frame == "Custom":
if not start_time_str:
msg = "Start Time is required when Time Frame is set to 'Custom'."
raise InvalidParameterError(msg)

try:
start = parse_iso8601_to_utc_datetime(start_time_str)
except ValueError as e:
msg = f"Invalid 'Start Time' format: {e}"
raise InvalidParameterError(msg) from e

if end_time_str:
try:
end = parse_iso8601_to_utc_datetime(end_time_str)
except ValueError as e:
msg = f"Invalid 'End Time' format: {e}"
raise InvalidParameterError(msg) from e
else:
end = now

if start.tzinfo is not None:
start = start.astimezone(UTC).replace(tzinfo=None)
if end.tzinfo is not None:
end = end.astimezone(UTC).replace(tzinfo=None)

if start >= end:
msg = "Start Time must be before End Time."
raise InvalidParameterError(msg)

return start, end

if start_time_str or end_time_str:
msg = (
"Start Time or End Time can only be provided when 'Custom' is "
"selected for the Time Frame parameter."
)
raise InvalidParameterError(msg)

delta_map = {
"Last Hour": timedelta(hours=1),
"Last 6 Hours": timedelta(hours=6),
"Last 24 Hours": timedelta(hours=24),
"Last Week": timedelta(days=7),
}
delta = delta_map.get(time_frame)
if not delta:
msg = f"Unsupported Time Frame: {time_frame}"
raise InvalidParameterError(msg)

end = now
start = end - delta

start = start.replace(tzinfo=None)
end = end.replace(tzinfo=None)
return start, end

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The functions parse_iso8601_to_utc_datetime and calculate_time_range are not used by any of the new actions in this pull request. If this code is not intended for use by other existing actions, it should be removed to avoid maintaining dead code.

Comment on lines +94 to +107
def _validate_folder_and_guids(
self,
guids: list[str],
folder_name: str,
fail_on_missing_guid: bool = True,
) -> list[QuarantineRecord] | None:
"""Pre-validate GUIDs before executing any action.

Distinguishes between GUIDs that do not exist globally vs. GUIDs that exist but
are not present in the specified folder.

Returns a list of QuarantineRecord objects if all exist.
Otherwise raises ProofPointPSError.
"""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The repository style guide (line 93) mandates documenting all intentionally raised exceptions in the Raises section of the docstring. This method raises ProofPointPSError but does not document it.

This issue is present in multiple new methods across this pull request (e.g., in api_client.py and the _perform_action methods in the new actions). Please ensure all docstrings are updated to comply with the style guide.

References
  1. Explicitly document all exceptions that the function may intentionally raise in a Raises section in the docstring. (link)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant