Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion packages/api/src/microsoft/teams/api/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
"""

from .caller import CallerIds, CallerType
from .credentials import ClientCredentials, Credentials, ManagedIdentityCredentials, TokenCredentials
from .credentials import (
ClientCredentials,
Credentials,
FederatedIdentityCredentials,
ManagedIdentityCredentials,
TokenCredentials,
)
from .json_web_token import JsonWebToken, JsonWebTokenPayload
from .token import TokenProtocol

Expand All @@ -13,6 +19,7 @@
"CallerType",
"ClientCredentials",
"Credentials",
"FederatedIdentityCredentials",
"ManagedIdentityCredentials",
"TokenCredentials",
"TokenProtocol",
Expand Down
26 changes: 24 additions & 2 deletions packages/api/src/microsoft/teams/api/auth/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Licensed under the MIT License.
"""

from typing import Awaitable, Callable, Optional, Union
from typing import Awaitable, Callable, Literal, Optional, Union

from ..models import CustomBaseModel

Expand Down Expand Up @@ -56,5 +56,27 @@ class ManagedIdentityCredentials(CustomBaseModel):
"""


class FederatedIdentityCredentials(CustomBaseModel):
"""Credentials for authentication using Federated Identity Credentials with Managed Identity."""

client_id: str
"""
The client ID of the app registration.
"""
managed_identity_type: Literal["system", "user"]
"""
The type of managed identity: 'system' for system-assigned or 'user' for user-assigned.
"""
managed_identity_client_id: Optional[str] = None
"""
The client ID of the user-assigned managed identity.
Required when managed_identity_type is 'user'.
"""
tenant_id: Optional[str] = None
"""
The tenant ID.
"""


# Union type for credentials
Credentials = Union[ClientCredentials, TokenCredentials, ManagedIdentityCredentials]
Credentials = Union[ClientCredentials, TokenCredentials, ManagedIdentityCredentials, FederatedIdentityCredentials]
26 changes: 17 additions & 9 deletions packages/apps/src/microsoft/teams/apps/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ConversationAccount,
ConversationReference,
Credentials,
FederatedIdentityCredentials,
ManagedIdentityCredentials,
MessageActivityInput,
TokenCredentials,
Expand Down Expand Up @@ -298,26 +299,33 @@ def _init_credentials(self) -> Optional[Credentials]:
else:
self.log.debug(f"Using TENANT_ID: {tenant_id} (assuming single-tenant app)")

# - If client_id + client_secret : use ClientCredentials (standard client auth)
if client_id and client_secret:
self.log.debug("Using client secret for auth")
return ClientCredentials(client_id=client_id, client_secret=client_secret, tenant_id=tenant_id)

# - If client_id + token callable : use TokenCredentials (where token is a custom token provider)
if client_id and token:
return TokenCredentials(client_id=client_id, tenant_id=tenant_id, token=token)

# - If client_id but no client_secret : use ManagedIdentityCredentials (inferred)
if client_id:
# Validate that if managed_identity_client_id is provided, it must equal client_id
if managed_identity_client_id == "system":
self.log.debug("Using Federated Identity Credentials with system-assigned managed identity")
return FederatedIdentityCredentials(
client_id=client_id,
managed_identity_type="system",
managed_identity_client_id=None,
tenant_id=tenant_id,
)

if managed_identity_client_id and managed_identity_client_id != client_id:
raise ValueError(
"Federated Identity Credentials is not yet supported. "
"managed_identity_client_id must equal client_id."
self.log.debug("Using Federated Identity Credentials with user-assigned managed identity")
return FederatedIdentityCredentials(
client_id=client_id,
managed_identity_type="user",
managed_identity_client_id=managed_identity_client_id,
tenant_id=tenant_id,
)

self.log.debug("Using user-assigned managed identity for auth")
# Use managed_identity_client_id if provided, otherwise fall back to client_id
self.log.debug("Using user-assigned managed identity (direct)")
mi_client_id = managed_identity_client_id or client_id
return ManagedIdentityCredentials(
client_id=mi_client_id,
Expand Down
11 changes: 9 additions & 2 deletions packages/apps/src/microsoft/teams/apps/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class AppOptions(TypedDict, total=False):
managed_identity_client_id: Optional[str]
"""
The managed identity client ID for user-assigned managed identity.
Defaults to client_id if not provided.
Set to "system" for system-assigned managed identity (triggers Federated Identity Credentials).
If set to a different client ID than client_id, triggers Federated Identity Credentials with user-assigned MI.
If not set or equals client_id, uses direct managed identity (no federation).
"""

# Infrastructure
Expand Down Expand Up @@ -62,7 +64,12 @@ class InternalAppOptions:
token: Optional[Callable[[Union[str, list[str]], Optional[str]], Union[str, Awaitable[str]]]] = None
"""Custom token provider function. If provided with client_id (no client_secret), uses TokenCredentials."""
managed_identity_client_id: Optional[str] = None
"""The managed identity client ID for user-assigned managed identity. Defaults to client_id if not provided."""
"""
The managed identity client ID for user-assigned managed identity.
Set to "system" for system-assigned managed identity (triggers Federated Identity Credentials).
If set to a different client ID than client_id, triggers Federated Identity Credentials with user-assigned MI.
If not set or equals client_id, uses direct managed identity (no federation).
"""
logger: Optional[Logger] = None
storage: Optional[Storage[str, Any]] = None

Expand Down
219 changes: 158 additions & 61 deletions packages/apps/src/microsoft/teams/apps/token_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
JsonWebToken,
TokenProtocol,
)
from microsoft.teams.api.auth.credentials import ManagedIdentityCredentials, TokenCredentials
from microsoft.teams.api.auth.credentials import (
FederatedIdentityCredentials,
ManagedIdentityCredentials,
TokenCredentials,
)
from microsoft.teams.common import ConsoleLogger
from msal import (
ConfidentialClientApplication,
ManagedIdentityClient,
SystemAssignedManagedIdentity,
UserAssignedManagedIdentity,
)

Expand Down Expand Up @@ -77,74 +82,166 @@ async def _get_token(
if caller_name:
self._logger.debug(f"No credentials provided for {caller_name}")
return None
if isinstance(credentials, (ClientCredentials, ManagedIdentityCredentials)):
msal_client = self._get_msal_client(tenant_id)

# Handle different acquire_token_for_client signatures
if isinstance(msal_client, ManagedIdentityClient):
# ManagedIdentityClient expects resource as a keyword-only string parameter
scope = scope.removesuffix("/.default")
token_res: dict[str, Any] | None = await asyncio.to_thread(
lambda: msal_client.acquire_token_for_client(resource=scope)
)
else:
# ConfidentialClientApplication expects scopes as a list
token_res: dict[str, Any] | None = await asyncio.to_thread(
lambda: msal_client.acquire_token_for_client([scope])
)

if token_res.get("access_token", None):
access_token = token_res["access_token"]
return JsonWebToken(access_token)
else:
self._logger.debug(f"TokenRes: {token_res}")
error = token_res.get("error", "Error retrieving token")
if not isinstance(error, BaseException):
error = ValueError(error)
error_description = token_res.get("error_description", "Error retrieving token from MSAL")
self._logger.error(error_description)
raise error
if isinstance(credentials, ClientCredentials):
return await self._get_token_with_client_credentials(credentials, scope, tenant_id)
elif isinstance(credentials, ManagedIdentityCredentials):
return await self._get_token_with_managed_identity(credentials, scope)
elif isinstance(credentials, FederatedIdentityCredentials):
return await self._get_token_with_federated_identity(credentials, scope, tenant_id)
elif isinstance(credentials, TokenCredentials):
token = credentials.token(scope, tenant_id)
if isawaitable(token):
access_token = await token
else:
access_token = token
return await self._get_token_with_token_provider(credentials, scope, tenant_id)

return None

async def _get_token_with_client_credentials(
self,
credentials: ClientCredentials,
scope: str,
tenant_id: str,
) -> TokenProtocol:
"""Get token using ClientCredentials (client secret)."""
confidential_client = self._get_confidential_client(credentials, tenant_id)

# ConfidentialClientApplication expects scopes as a list
token_res: dict[str, Any] = await asyncio.to_thread(
lambda: confidential_client.acquire_token_for_client([scope])
)

return self._handle_token_response(token_res)

async def _get_token_with_managed_identity(
self,
credentials: ManagedIdentityCredentials,
scope: str,
) -> TokenProtocol:
"""Get token using ManagedIdentityCredentials (direct, no federation)."""
mi_client = self._get_managed_identity_client(credentials)

# ManagedIdentityClient expects resource as a keyword-only string parameter
resource = scope.removesuffix("/.default")
token_res: dict[str, Any] = await asyncio.to_thread(
lambda: mi_client.acquire_token_for_client(resource=resource)
)

return self._handle_token_response(token_res)

async def _get_token_with_federated_identity(
self,
credentials: FederatedIdentityCredentials,
scope: str,
tenant_id: str,
) -> TokenProtocol:
"""Get token using Federated Identity Credentials (two-step flow)."""

# Step 1: Get MI token from api://AzureADTokenExchange
mi_token = await self._acquire_managed_identity_token(credentials)

# Step 2: Use MI token as client_assertion to get final access token
confidential_client = ConfidentialClientApplication(
credentials.client_id,
client_credential={"client_assertion": mi_token},
authority=DEFAULT_TOKEN_AUTHORITY.format(tenant_id=tenant_id),
)

token_res: dict[str, Any] = await asyncio.to_thread(
lambda: confidential_client.acquire_token_for_client([scope])
)

return self._handle_token_response(token_res, error_prefix="FIC Step 2 failed")

async def _acquire_managed_identity_token(self, credentials: FederatedIdentityCredentials) -> str:
"""Acquire managed identity token for federated identity credentials."""
# Use shared method to get or create the managed identity client
mi_client = self._get_managed_identity_client(credentials)

mi_token_res: dict[str, Any] = await asyncio.to_thread(
lambda: mi_client.acquire_token_for_client(resource="api://AzureADTokenExchange")
)

if not mi_token_res.get("access_token"):
self._logger.error("FIC Step 1 failed: Could not acquire MI token")
error = mi_token_res.get("error", ValueError("Error retrieving MI token"))
if not isinstance(error, BaseException):
error = ValueError(error)
raise error

return mi_token_res["access_token"]

async def _get_token_with_token_provider(
self,
credentials: TokenCredentials,
scope: str,
tenant_id: str,
) -> TokenProtocol:
"""Get token using custom token provider function."""
token = credentials.token(scope, tenant_id)

if isawaitable(token):
access_token = await token
else:
access_token = token

return JsonWebToken(access_token)

def _handle_token_response(self, token_res: dict[str, Any], error_prefix: str = "") -> TokenProtocol:
"""Handle token response from MSAL client."""
if token_res.get("access_token", None):
access_token = token_res["access_token"]
return JsonWebToken(access_token)
else:
error_msg = f"{error_prefix}: " if error_prefix else ""
self._logger.error(f"{error_msg}Could not acquire access token")
self._logger.debug(f"TokenRes: {token_res}")

error = token_res.get("error", "Error retrieving token")
if not isinstance(error, BaseException):
error = ValueError(error)

error_description = token_res.get("error_description", "Error retrieving token from MSAL")
self._logger.error(error_description)
raise error

def _get_confidential_client(self, credentials: ClientCredentials, tenant_id: str) -> ConfidentialClientApplication:
"""Get or create ConfidentialClientApplication for ClientCredentials."""
# Check if client already exists in cache
cached_client = self._confidential_clients_by_tenant.get(tenant_id)
if cached_client:
return cached_client

client: ConfidentialClientApplication = ConfidentialClientApplication(
credentials.client_id,
client_credential=credentials.client_secret,
authority=f"https://login.microsoftonline.com/{tenant_id}",
)
self._confidential_clients_by_tenant[tenant_id] = client
return client

def _get_msal_client(self, tenant_id: str) -> ConfidentialClientApplication | ManagedIdentityClient:
credentials = self._credentials
def _get_managed_identity_client(
self, credentials: ManagedIdentityCredentials | FederatedIdentityCredentials
) -> ManagedIdentityClient:
"""Get or create ManagedIdentityClient for ManagedIdentityCredentials or FederatedIdentityCredentials."""
# Check if client already exists in cache

# Create the appropriate client based on credential type
if isinstance(credentials, ClientCredentials):
# Check if client already exists in cache for this tenant
cached_client = self._confidential_clients_by_tenant.get(tenant_id)
if cached_client:
return cached_client

client: ConfidentialClientApplication = ConfidentialClientApplication(
credentials.client_id,
client_credential=credentials.client_secret,
authority=f"https://login.microsoftonline.com/{tenant_id}",
)
self._confidential_clients_by_tenant[tenant_id] = client
return client
elif isinstance(credentials, ManagedIdentityCredentials):
# ManagedIdentityClient is tenant-agnostic, cache single instance
if self._managed_identity_client:
return self._managed_identity_client
# ManagedIdentityClient is tenant-agnostic, cache single instance
if self._managed_identity_client:
return self._managed_identity_client

# Create user-assigned managed identity
# Determine managed identity type
if isinstance(credentials, FederatedIdentityCredentials):
if credentials.managed_identity_type == "system":
managed_identity = SystemAssignedManagedIdentity()
else: # "user"
mi_client_id = credentials.managed_identity_client_id or credentials.client_id
managed_identity = UserAssignedManagedIdentity(client_id=mi_client_id)
else: # ManagedIdentityCredentials
# ManagedIdentityCredentials only supports user-assigned
managed_identity = UserAssignedManagedIdentity(client_id=credentials.client_id)

self._managed_identity_client = ManagedIdentityClient(
managed_identity,
http_client=requests.Session(),
)
return self._managed_identity_client
else:
raise ValueError(f"Unsupported credential type: {type(credentials)}")
self._managed_identity_client = ManagedIdentityClient(
managed_identity,
http_client=requests.Session(),
)
return self._managed_identity_client

def _resolve_tenant_id(self, tenant_id: str | None, default_tenant_id: str):
return tenant_id or (self._credentials.tenant_id if self._credentials else False) or default_tenant_id
Loading