Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class ClientApplication(object):
ACQUIRE_TOKEN_FOR_CLIENT_ID = "730"
ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832"
ACQUIRE_TOKEN_INTERACTIVE = "169"
ACQUIRE_TOKEN_BY_USER_FIC_ID = "950"
GET_ACCOUNTS_ID = "902"
REMOVE_ACCOUNT_ID = "903"

Expand Down Expand Up @@ -2572,3 +2573,62 @@ def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=No
response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
telemetry_context.update_telemetry(response)
return response

def acquire_token_by_user_federated_identity_credential(
self, scopes, assertion, username=None, user_object_id=None,
claims_challenge=None, **kwargs):
"""Acquires a user-scoped token using the ``user_fic`` grant type.

This method exchanges a federated identity credential (typically an
agent instance token from Leg 2 of the agent identity protocol) for
a user-scoped access token, enabling an agent to act on behalf of
a specific user.

:param list[str] scopes: Scopes required by downstream API (a resource).
:param str assertion:
The federated identity credential token (e.g. the instance token
obtained from Leg 2 of the agent identity flow).
:param str username:
The target user's UPN (User Principal Name).
Mutually exclusive with ``user_object_id``.
:param str user_object_id:
The target user's Object ID.
Mutually exclusive with ``username``.
:param claims_challenge:
The claims_challenge parameter requests specific claims requested by the resource provider
in the form of a claims_challenge directive in the www-authenticate header to be
returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
It is a string of a JSON object which contains lists of claims being requested from these locations.

:return: A dict representing the json response from Microsoft Entra:

- A successful response would contain "access_token" key,
- an error response would contain "error" and usually "error_description".
"""
# Input validation
if not assertion:
raise ValueError("assertion is required and must be non-empty")
if not username and not user_object_id:
raise ValueError(
"Either username or user_object_id must be provided")
if username and user_object_id:
raise ValueError(
"username and user_object_id are mutually exclusive")

telemetry_context = self._build_telemetry_context(
self.ACQUIRE_TOKEN_BY_USER_FIC_ID)
response = _clean_up(self.client.obtain_token_by_user_fic(
scope=self._decorate_scope(scopes),
assertion=assertion,
username=username,
user_object_id=user_object_id,
headers=telemetry_context.generate_headers(),
data=dict(
kwargs.pop("data", {}),
claims=_merge_claims_challenge_and_capabilities(
self._client_capabilities, claims_challenge)),
**kwargs))
if "access_token" in response:
response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
telemetry_context.update_telemetry(response)
return response
79 changes: 74 additions & 5 deletions msal/oauth2cli/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
except ImportError:
from urlparse import parse_qs, urlparse, urlunparse
from urllib import urlencode, quote_plus
import inspect
import logging
import warnings
import time
Expand Down Expand Up @@ -104,6 +105,11 @@ def __init__(
or a raw JWT assertion in bytes (which we will relay to http layer).
It can also be a callable (recommended),
so that we will do lazy creation of an assertion.

The callable may accept zero arguments (legacy) or one argument.
When it accepts one argument, it will receive a dict containing
``"client_id"``, ``"token_endpoint"``, and optionally ``"fmi_path"``
(when an FMI path is set on the current request).
client_assertion_type (str):
The type of your :attr:`client_assertion` parameter.
It is typically the value of :attr:`CLIENT_ASSERTION_TYPE_SAML2` or
Expand Down Expand Up @@ -168,6 +174,41 @@ def __init__(
# A workaround for requests not supporting session-wide timeout
self._http_client.request, timeout=timeout)

@staticmethod
def _accepts_context(func):
"""Check if a callable requires at least one positional argument.

Returns True only when the callable has a positional parameter
**without** a default value. This ensures that legacy zero-arg
callables — including ``lambda token=token: token`` patterns
where every positional param has a default — are still invoked
with no arguments.
"""
try:
sig = inspect.signature(func)
for p in sig.parameters.values():
if p.kind in (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
) and p.default is inspect.Parameter.empty:
return True
return False
except (ValueError, TypeError):
return False # Signature not inspectable; treat as zero-arg

def _invoke_assertion_callable(self, assertion_callable, data=None):
"""Invoke an assertion callable, passing context if it accepts one."""
if self._accepts_context(assertion_callable):
context = {
"client_id": self.client_id,
"token_endpoint": self.configuration.get(
"token_endpoint", ""),
}
if data and data.get("fmi_path"):
context["fmi_path"] = data["fmi_path"]
return assertion_callable(context)
return assertion_callable()

def _build_auth_request_params(self, response_type, **kwargs):
# response_type is a string defined in
# https://tools.ietf.org/html/rfc6749#section-3.1.1
Expand Down Expand Up @@ -198,11 +239,11 @@ def _obtain_token( # The verb "obtain" is influenced by OAUTH2 RFC 6749
# See https://tools.ietf.org/html/rfc7521#section-4.2
encoder = self.client_assertion_encoders.get(
self.default_body["client_assertion_type"], lambda a: a)
_data["client_assertion"] = encoder(
self.client_assertion() # Do lazy on-the-fly computation
if callable(self.client_assertion) else self.client_assertion
) # The type is bytes, which is preferable. See also:
# https://github.com/psf/requests/issues/4503#issuecomment-455001070
if callable(self.client_assertion):
raw = self._invoke_assertion_callable(self.client_assertion, data)
else:
raw = self.client_assertion
_data["client_assertion"] = encoder(raw)

_data.update(self.default_body) # It may contain authen parameters
_data.update(data or {}) # So the content in data param prevails
Expand Down Expand Up @@ -770,6 +811,34 @@ class initialization.
data.update(scope=scope)
return self._obtain_token("client_credentials", data=data, **kwargs)

def obtain_token_by_user_fic(
self, scope, assertion, username=None, user_object_id=None,
**kwargs):
"""Obtain token using the ``user_fic`` grant type.

This exchanges a federated identity credential (e.g. an agent
instance token) for a user-scoped access token.

:param scope: Scopes for the target resource (already decorated
with OIDC scopes by the caller).
:param str assertion: The federated identity credential token.
:param str username: The target user's UPN (mutually exclusive
with *user_object_id*).
:param str user_object_id: The target user's Object ID (mutually
exclusive with *username*).
"""
data = kwargs.pop("data", {})
data.update(
scope=scope,
user_federated_identity_credential=assertion,
client_info="1",
Comment on lines +831 to +834
)
if user_object_id:
data["user_id"] = str(user_object_id)
elif username:
data["username"] = username
return self._obtain_token("user_fic", data=data, **kwargs)

def __init__(self,
server_configuration, client_id,
on_obtaining_tokens=lambda event: None, # event is defined in _obtain_token(...)
Expand Down
6 changes: 6 additions & 0 deletions msal/token_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
"token_type",
"req_cnf",
"key_id",
# user_fic grant parameters — these are standard body params for the
# user_fic flow; FIC tokens use normal user cache keys (not extended).
"user_federated_identity_credential",
"user_id",
"client_info",
Comment on lines +70 to +72
})


Expand Down Expand Up @@ -301,6 +306,7 @@ def make_clean_copy(dictionary, sensitive_fields): # Masks sensitive info
event,
data=make_clean_copy(event.get("data", {}), (
"password", "client_secret", "refresh_token", "assertion",
"user_federated_identity_credential",
)),
response=make_clean_copy(event.get("response", {}), (
"id_token_claims", # Provided by broker
Expand Down
Loading
Loading