Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ uvx scc_mcp
uvx secops_soar_mcp --integrations CSV,OKTA
```

If SecOps SOAR fails during startup with a TLS certificate verification error
on macOS, install the CA list bundled with `certifi` for your Python version:

```bash
/Applications/Python\ 3.12/Install\ Certificates.command
```

With environment variables:

```bash
Expand Down
52 changes: 49 additions & 3 deletions server/secops-soar/secops_soar_mcp/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# limitations under the License.
"""Bindings for the SOAR client."""

from collections import deque
import os
import ssl

import dotenv
from logger_utils import get_logger
Expand All @@ -29,12 +31,56 @@
valid_scopes = set()


def _is_certificate_verification_error(error: BaseException | None) -> bool:
if error is None:
return False
pending = deque([error])
seen: set[int] = set()
while pending:
current = pending.popleft()
if id(current) in seen:
continue
seen.add(id(current))
message = str(current).lower()
if isinstance(current, ssl.SSLCertVerificationError):
return True
if (
isinstance(current, ssl.SSLError)
and "certificate verify failed" in message
):
return True
if "certificate verify failed" in message:
return True
for related in (current.__cause__, current.__context__):
if isinstance(related, BaseException):
pending.append(related)
for arg in getattr(current, "args", ()):
if isinstance(arg, BaseException):
pending.append(arg)
return False


def _valid_scopes_error_message(error: BaseException | None) -> str:
if _is_certificate_verification_error(error):
return (
"Failed to fetch valid scopes from SOAR because TLS certificate "
"verification failed. If you are using the Python.org macOS "
"installer, run the Install Certificates.command for your Python "
"version, for example: "
"`/Applications/Python\\ 3.12/Install\\ Certificates.command`. "
"You can also point Python at certifi's CA bundle with "
"`SSL_CERT_FILE=$(python -m certifi)`."
)
return (
"Failed to fetch valid scopes from SOAR, please make sure you have "
"configured the right SOAR credentials. Shutting down..."
)


async def _get_valid_scopes():
valid_scopes_list = await http_client.get(consts.Endpoints.GET_SCOPES)
if valid_scopes_list is None:
raise RuntimeError(
"Failed to fetch valid scopes from SOAR, please make sure you have configured the right SOAR credentials. Shutting down..."
)
raise RuntimeError(_valid_scopes_error_message(http_client.last_error))
return set(valid_scopes_list)


Expand Down
35 changes: 35 additions & 0 deletions server/secops-soar/secops_soar_mcp/bindings_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for SecOps SOAR binding startup errors."""

import ssl

from secops_soar_mcp import bindings


def test_valid_scopes_error_message_identifies_certifi_issue():
error = ssl.SSLCertVerificationError("certificate verify failed")

message = bindings._valid_scopes_error_message(error)

assert "TLS certificate verification failed" in message
assert "Install Certificates.command" in message
assert "certifi" in message


def test_valid_scopes_error_message_preserves_credentials_hint():
message = bindings._valid_scopes_error_message(RuntimeError("401 unauthorized"))

assert "configured the right SOAR credentials" in message
10 changes: 10 additions & 0 deletions server/secops-soar/secops_soar_mcp/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, base_url: str, app_key: str):
self.base_url = base_url
self.app_key = app_key
self._session = None
self.last_error: Exception | None = None

def _get_session(self) -> aiohttp.ClientSession:
if self._session is None:
Expand Down Expand Up @@ -56,15 +57,18 @@ async def get(
The response as a JSON object, or None if an error occurred.
"""
headers = await self._get_headers()
self.last_error = None
try:
async with self._get_session().get(
self.base_url + endpoint, params=params, headers=headers
) as response:
response.raise_for_status() # Raise an exception for 4xx/5xx responses
return await response.json()
except aiohttp.ClientResponseError as e:
self.last_error = e
logger.debug("HTTP error occurred: %s", e)
except Exception as e:
self.last_error = e
logger.debug("An error occurred: %s", e)
return None

Expand All @@ -85,6 +89,7 @@ async def post(
The response as a JSON object, or None if an error occurred.
"""
headers = await self._get_headers()
self.last_error = None
try:
async with self._get_session().post(
self.base_url + endpoint, json=req, params=params, headers=headers
Expand All @@ -94,8 +99,10 @@ async def post(
decoded_data = data.decode("utf-8")
return json.loads(decoded_data)
except aiohttp.ClientResponseError as e:
self.last_error = e
logger.debug("HTTP error occurred: %s", e)
except Exception as e:
self.last_error = e
logger.debug("An error occurred: %s", e)
return None

Expand All @@ -116,15 +123,18 @@ async def patch(
The response as a JSON object, or None if an error occurred.
"""
headers = await self._get_headers()
self.last_error = None
try:
async with self._get_session().patch(
self.base_url + endpoint, json=req, params=params, headers=headers
) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientResponseError as e:
self.last_error = e
logger.debug("HTTP error occurred: %s", e)
except Exception as e:
self.last_error = e
logger.debug("An error occurred: %s", e)
return None

Expand Down