From 729b94d07cff4c5ff22d5c1c1a8f5a997634e38d Mon Sep 17 00:00:00 2001 From: heyitsaamir Date: Tue, 4 Nov 2025 16:30:15 -0800 Subject: [PATCH 1/4] FIC support --- .../src/microsoft/teams/api/auth/__init__.py | 9 +- .../microsoft/teams/api/auth/credentials.py | 26 ++- packages/apps/src/microsoft/teams/apps/app.py | 34 ++- .../apps/src/microsoft/teams/apps/options.py | 19 +- .../src/microsoft/teams/apps/token_manager.py | 219 +++++++++++++----- packages/apps/tests/test_app.py | 17 +- packages/apps/tests/test_token_manager.py | 29 +-- stubs/msal/__init__.pyi | 9 + 8 files changed, 268 insertions(+), 94 deletions(-) diff --git a/packages/api/src/microsoft/teams/api/auth/__init__.py b/packages/api/src/microsoft/teams/api/auth/__init__.py index 227dfa43..b177eb4b 100644 --- a/packages/api/src/microsoft/teams/api/auth/__init__.py +++ b/packages/api/src/microsoft/teams/api/auth/__init__.py @@ -4,7 +4,13 @@ """ from .caller import CallerIds, CallerType -from .credentials import ClientCredentials, Credentials, ManagedIdentityCredentials, TokenCredentials +from .credentials import ( + ClientCredentials, + Credentials, + FederatedIdentityCredentials, + ManagedIdentityCredentials, + TokenCredentials, +) from .json_web_token import JsonWebToken, JsonWebTokenPayload from .token import TokenProtocol @@ -13,6 +19,7 @@ "CallerType", "ClientCredentials", "Credentials", + "FederatedIdentityCredentials", "ManagedIdentityCredentials", "TokenCredentials", "TokenProtocol", diff --git a/packages/api/src/microsoft/teams/api/auth/credentials.py b/packages/api/src/microsoft/teams/api/auth/credentials.py index 7417fd72..471c7952 100644 --- a/packages/api/src/microsoft/teams/api/auth/credentials.py +++ b/packages/api/src/microsoft/teams/api/auth/credentials.py @@ -3,7 +3,7 @@ Licensed under the MIT License. """ -from typing import Awaitable, Callable, Optional, Union +from typing import Awaitable, Callable, Literal, Optional, Union from ..models import CustomBaseModel @@ -56,5 +56,27 @@ class ManagedIdentityCredentials(CustomBaseModel): """ +class FederatedIdentityCredentials(CustomBaseModel): + """Credentials for authentication using Federated Identity Credentials with Managed Identity.""" + + client_id: str + """ + The client ID of the app registration. + """ + managed_identity_type: Literal["system", "user"] + """ + The type of managed identity: 'system' for system-assigned or 'user' for user-assigned. + """ + managed_identity_client_id: Optional[str] = None + """ + The client ID of the user-assigned managed identity. + Required when managed_identity_type is 'user'. + """ + tenant_id: Optional[str] = None + """ + The tenant ID. + """ + + # Union type for credentials -Credentials = Union[ClientCredentials, TokenCredentials, ManagedIdentityCredentials] +Credentials = Union[ClientCredentials, TokenCredentials, ManagedIdentityCredentials, FederatedIdentityCredentials] diff --git a/packages/apps/src/microsoft/teams/apps/app.py b/packages/apps/src/microsoft/teams/apps/app.py index d4a8373f..4178af15 100644 --- a/packages/apps/src/microsoft/teams/apps/app.py +++ b/packages/apps/src/microsoft/teams/apps/app.py @@ -21,6 +21,7 @@ ConversationAccount, ConversationReference, Credentials, + FederatedIdentityCredentials, ManagedIdentityCredentials, MessageActivityInput, TokenCredentials, @@ -291,6 +292,7 @@ def _init_credentials(self) -> Optional[Credentials]: tenant_id = self.options.tenant_id or os.getenv("TENANT_ID") token = self.options.token managed_identity_client_id = self.options.managed_identity_client_id or os.getenv("MANAGED_IDENTITY_CLIENT_ID") + managed_identity_type = self.options.managed_identity_type or os.getenv("MANAGED_IDENTITY_TYPE") self.log.debug(f"Using CLIENT_ID: {client_id}") if not tenant_id: @@ -307,17 +309,35 @@ def _init_credentials(self) -> Optional[Credentials]: if client_id and token: return TokenCredentials(client_id=client_id, tenant_id=tenant_id, token=token) - # - If client_id but no client_secret : use ManagedIdentityCredentials (inferred) + # - If client_id but no client_secret : use Managed Identity (direct or federated) if client_id: - # Validate that if managed_identity_client_id is provided, it must equal client_id + # If managed_identity_type is explicitly provided, use Federated Identity Credentials + if managed_identity_type: + assert managed_identity_type in ("system", "user"), ( + f"managed_identity_type must be 'system' or 'user', got: {managed_identity_type}" + ) + self.log.debug( + f"Using Federated Identity Credentials with {managed_identity_type}-assigned managed identity" + ) + return FederatedIdentityCredentials( + client_id=client_id, + managed_identity_type=managed_identity_type, + managed_identity_client_id=managed_identity_client_id, + tenant_id=tenant_id, + ) + + # If managed_identity_client_id is provided and different from client_id, use Federated Identity Credentials if managed_identity_client_id and managed_identity_client_id != client_id: - raise ValueError( - "Federated Identity Credentials is not yet supported. " - "managed_identity_client_id must equal client_id." + self.log.debug("Using Federated Identity Credentials with user-assigned managed identity") + return FederatedIdentityCredentials( + client_id=client_id, + managed_identity_type="user", + managed_identity_client_id=managed_identity_client_id, + tenant_id=tenant_id, ) - self.log.debug("Using user-assigned managed identity for auth") - # Use managed_identity_client_id if provided, otherwise fall back to client_id + # Otherwise, use direct Managed Identity (no federation) + self.log.debug("Using user-assigned managed identity (direct)") mi_client_id = managed_identity_client_id or client_id return ManagedIdentityCredentials( client_id=mi_client_id, diff --git a/packages/apps/src/microsoft/teams/apps/options.py b/packages/apps/src/microsoft/teams/apps/options.py index a37ab5e5..c133b7f9 100644 --- a/packages/apps/src/microsoft/teams/apps/options.py +++ b/packages/apps/src/microsoft/teams/apps/options.py @@ -5,7 +5,7 @@ from dataclasses import dataclass, field from logging import Logger -from typing import Any, Awaitable, Callable, List, Optional, TypedDict, Union, cast +from typing import Any, Awaitable, Callable, List, Literal, Optional, TypedDict, Union, cast from microsoft.teams.common import Storage from typing_extensions import Unpack @@ -30,7 +30,12 @@ class AppOptions(TypedDict, total=False): managed_identity_client_id: Optional[str] """ The managed identity client ID for user-assigned managed identity. - Defaults to client_id if not provided. + If different from client_id, triggers Federated Identity Credentials. + """ + managed_identity_type: Optional[Literal["system", "user"]] + """ + The type of managed identity for Federated Identity Credentials. + If provided, triggers Federated Identity Credentials flow. """ # Infrastructure @@ -62,7 +67,15 @@ class InternalAppOptions: token: Optional[Callable[[Union[str, list[str]], Optional[str]], Union[str, Awaitable[str]]]] = None """Custom token provider function. If provided with client_id (no client_secret), uses TokenCredentials.""" managed_identity_client_id: Optional[str] = None - """The managed identity client ID for user-assigned managed identity. Defaults to client_id if not provided.""" + """ + The managed identity client ID for user-assigned managed identity. If different from client_id, triggers + Federated Identity Credentials. + """ + managed_identity_type: Optional[Literal["system", "user"]] = None + """ + The type of managed identity for Federated Identity Credentials. If provided, triggers + Federated Identity Credentials flow. + """ logger: Optional[Logger] = None storage: Optional[Storage[str, Any]] = None diff --git a/packages/apps/src/microsoft/teams/apps/token_manager.py b/packages/apps/src/microsoft/teams/apps/token_manager.py index ba668fab..72c87fc1 100644 --- a/packages/apps/src/microsoft/teams/apps/token_manager.py +++ b/packages/apps/src/microsoft/teams/apps/token_manager.py @@ -15,11 +15,16 @@ JsonWebToken, TokenProtocol, ) -from microsoft.teams.api.auth.credentials import ManagedIdentityCredentials, TokenCredentials +from microsoft.teams.api.auth.credentials import ( + FederatedIdentityCredentials, + ManagedIdentityCredentials, + TokenCredentials, +) from microsoft.teams.common import ConsoleLogger from msal import ( ConfidentialClientApplication, ManagedIdentityClient, + SystemAssignedManagedIdentity, UserAssignedManagedIdentity, ) @@ -77,74 +82,166 @@ async def _get_token( if caller_name: self._logger.debug(f"No credentials provided for {caller_name}") return None - if isinstance(credentials, (ClientCredentials, ManagedIdentityCredentials)): - msal_client = self._get_msal_client(tenant_id) - - # Handle different acquire_token_for_client signatures - if isinstance(msal_client, ManagedIdentityClient): - # ManagedIdentityClient expects resource as a keyword-only string parameter - scope = scope.removesuffix("/.default") - token_res: dict[str, Any] | None = await asyncio.to_thread( - lambda: msal_client.acquire_token_for_client(resource=scope) - ) - else: - # ConfidentialClientApplication expects scopes as a list - token_res: dict[str, Any] | None = await asyncio.to_thread( - lambda: msal_client.acquire_token_for_client([scope]) - ) - - if token_res.get("access_token", None): - access_token = token_res["access_token"] - return JsonWebToken(access_token) - else: - self._logger.debug(f"TokenRes: {token_res}") - error = token_res.get("error", "Error retrieving token") - if not isinstance(error, BaseException): - error = ValueError(error) - error_description = token_res.get("error_description", "Error retrieving token from MSAL") - self._logger.error(error_description) - raise error + if isinstance(credentials, ClientCredentials): + return await self._get_token_with_client_credentials(credentials, scope, tenant_id) + elif isinstance(credentials, ManagedIdentityCredentials): + return await self._get_token_with_managed_identity(credentials, scope) + elif isinstance(credentials, FederatedIdentityCredentials): + return await self._get_token_with_federated_identity(credentials, scope, tenant_id) elif isinstance(credentials, TokenCredentials): - token = credentials.token(scope, tenant_id) - if isawaitable(token): - access_token = await token - else: - access_token = token + return await self._get_token_with_token_provider(credentials, scope, tenant_id) + + return None + + async def _get_token_with_client_credentials( + self, + credentials: ClientCredentials, + scope: str, + tenant_id: str, + ) -> TokenProtocol: + """Get token using ClientCredentials (client secret).""" + confidential_client = self._get_confidential_client(credentials, tenant_id) + + # ConfidentialClientApplication expects scopes as a list + token_res: dict[str, Any] = await asyncio.to_thread( + lambda: confidential_client.acquire_token_for_client([scope]) + ) + + return self._handle_token_response(token_res) + + async def _get_token_with_managed_identity( + self, + credentials: ManagedIdentityCredentials, + scope: str, + ) -> TokenProtocol: + """Get token using ManagedIdentityCredentials (direct, no federation).""" + mi_client = self._get_managed_identity_client(credentials) + + # ManagedIdentityClient expects resource as a keyword-only string parameter + resource = scope.removesuffix("/.default") + token_res: dict[str, Any] = await asyncio.to_thread( + lambda: mi_client.acquire_token_for_client(resource=resource) + ) + + return self._handle_token_response(token_res) + + async def _get_token_with_federated_identity( + self, + credentials: FederatedIdentityCredentials, + scope: str, + tenant_id: str, + ) -> TokenProtocol: + """Get token using Federated Identity Credentials (two-step flow).""" + + # Step 1: Get MI token from api://AzureADTokenExchange + mi_token = await self._acquire_managed_identity_token(credentials) + + # Step 2: Use MI token as client_assertion to get final access token + confidential_client = ConfidentialClientApplication( + credentials.client_id, + client_credential={"client_assertion": mi_token}, + authority=DEFAULT_TOKEN_AUTHORITY.format(tenant_id=tenant_id), + ) + + token_res: dict[str, Any] = await asyncio.to_thread( + lambda: confidential_client.acquire_token_for_client([scope]) + ) + + return self._handle_token_response(token_res, error_prefix="FIC Step 2 failed") + async def _acquire_managed_identity_token(self, credentials: FederatedIdentityCredentials) -> str: + """Acquire managed identity token for federated identity credentials.""" + # Use shared method to get or create the managed identity client + mi_client = self._get_managed_identity_client(credentials) + + mi_token_res: dict[str, Any] = await asyncio.to_thread( + lambda: mi_client.acquire_token_for_client(resource="api://AzureADTokenExchange") + ) + + if not mi_token_res.get("access_token"): + self._logger.error("FIC Step 1 failed: Could not acquire MI token") + error = mi_token_res.get("error", ValueError("Error retrieving MI token")) + if not isinstance(error, BaseException): + error = ValueError(error) + raise error + + return mi_token_res["access_token"] + + async def _get_token_with_token_provider( + self, + credentials: TokenCredentials, + scope: str, + tenant_id: str, + ) -> TokenProtocol: + """Get token using custom token provider function.""" + token = credentials.token(scope, tenant_id) + + if isawaitable(token): + access_token = await token + else: + access_token = token + + return JsonWebToken(access_token) + + def _handle_token_response(self, token_res: dict[str, Any], error_prefix: str = "") -> TokenProtocol: + """Handle token response from MSAL client.""" + if token_res.get("access_token", None): + access_token = token_res["access_token"] return JsonWebToken(access_token) + else: + error_msg = f"{error_prefix}: " if error_prefix else "" + self._logger.error(f"{error_msg}Could not acquire access token") + self._logger.debug(f"TokenRes: {token_res}") + + error = token_res.get("error", "Error retrieving token") + if not isinstance(error, BaseException): + error = ValueError(error) + + error_description = token_res.get("error_description", "Error retrieving token from MSAL") + self._logger.error(error_description) + raise error + + def _get_confidential_client(self, credentials: ClientCredentials, tenant_id: str) -> ConfidentialClientApplication: + """Get or create ConfidentialClientApplication for ClientCredentials.""" + # Check if client already exists in cache + cached_client = self._confidential_clients_by_tenant.get(tenant_id) + if cached_client: + return cached_client + + client: ConfidentialClientApplication = ConfidentialClientApplication( + credentials.client_id, + client_credential=credentials.client_secret, + authority=f"https://login.microsoftonline.com/{tenant_id}", + ) + self._confidential_clients_by_tenant[tenant_id] = client + return client - def _get_msal_client(self, tenant_id: str) -> ConfidentialClientApplication | ManagedIdentityClient: - credentials = self._credentials + def _get_managed_identity_client( + self, credentials: ManagedIdentityCredentials | FederatedIdentityCredentials + ) -> ManagedIdentityClient: + """Get or create ManagedIdentityClient for ManagedIdentityCredentials or FederatedIdentityCredentials.""" + # Check if client already exists in cache - # Create the appropriate client based on credential type - if isinstance(credentials, ClientCredentials): - # Check if client already exists in cache for this tenant - cached_client = self._confidential_clients_by_tenant.get(tenant_id) - if cached_client: - return cached_client - - client: ConfidentialClientApplication = ConfidentialClientApplication( - credentials.client_id, - client_credential=credentials.client_secret, - authority=f"https://login.microsoftonline.com/{tenant_id}", - ) - self._confidential_clients_by_tenant[tenant_id] = client - return client - elif isinstance(credentials, ManagedIdentityCredentials): - # ManagedIdentityClient is tenant-agnostic, cache single instance - if self._managed_identity_client: - return self._managed_identity_client + # ManagedIdentityClient is tenant-agnostic, cache single instance + if self._managed_identity_client: + return self._managed_identity_client - # Create user-assigned managed identity + # Determine managed identity type + if isinstance(credentials, FederatedIdentityCredentials): + if credentials.managed_identity_type == "system": + managed_identity = SystemAssignedManagedIdentity() + else: # "user" + mi_client_id = credentials.managed_identity_client_id or credentials.client_id + managed_identity = UserAssignedManagedIdentity(client_id=mi_client_id) + else: # ManagedIdentityCredentials + # ManagedIdentityCredentials only supports user-assigned managed_identity = UserAssignedManagedIdentity(client_id=credentials.client_id) - self._managed_identity_client = ManagedIdentityClient( - managed_identity, - http_client=requests.Session(), - ) - return self._managed_identity_client - else: - raise ValueError(f"Unsupported credential type: {type(credentials)}") + self._managed_identity_client = ManagedIdentityClient( + managed_identity, + http_client=requests.Session(), + ) + return self._managed_identity_client def _resolve_tenant_id(self, tenant_id: str | None, default_tenant_id: str): return tenant_id or (self._credentials.tenant_id if self._credentials else False) or default_tenant_id diff --git a/packages/apps/tests/test_app.py b/packages/apps/tests/test_app.py index 7f8c9e68..6ff57290 100644 --- a/packages/apps/tests/test_app.py +++ b/packages/apps/tests/test_app.py @@ -19,6 +19,7 @@ TokenProtocol, TypingActivity, ) +from microsoft.teams.api.auth.credentials import FederatedIdentityCredentials from microsoft.teams.apps import ActivityContext, ActivityEvent, App, AppOptions @@ -640,9 +641,8 @@ def test_app_init_with_managed_identity( assert app.credentials.tenant_id == expected_tenant_id, f"Failed for: {description}" def test_app_init_with_managed_identity_client_id_mismatch(self, mock_logger, mock_storage): - """Test app init raises error when managed_identity_client_id != client_id (federated identity).""" - # When managed_identity_client_id differs from client_id, should raise error - # (Federated Identity Credentials not yet supported) + """Test app init creates FederatedIdentityCredentials when managed_identity_client_id != client_id.""" + # When managed_identity_client_id differs from client_id, should use Federated Identity Credentials options = AppOptions( logger=mock_logger, storage=mock_storage, @@ -651,11 +651,14 @@ def test_app_init_with_managed_identity_client_id_mismatch(self, mock_logger, mo ) with patch.dict("os.environ", {"CLIENT_SECRET": "", "TENANT_ID": "test-tenant-id"}, clear=False): - with pytest.raises(ValueError) as exc_info: - App(**options) + app = App(**options) - assert "Federated Identity Credentials is not yet supported" in str(exc_info.value) - assert "managed_identity_client_id must equal client_id" in str(exc_info.value) + assert app.credentials is not None + assert isinstance(app.credentials, FederatedIdentityCredentials) + assert app.credentials.client_id == "app-client-id" + assert app.credentials.managed_identity_client_id == "different-managed-identity-id" + assert app.credentials.managed_identity_type == "user" + assert app.credentials.tenant_id == "test-tenant-id" def test_app_init_with_client_secret_takes_precedence(self, mock_logger, mock_storage): """Test that ClientCredentials is used when both client_secret and managed_identity_client_id are provided.""" diff --git a/packages/apps/tests/test_token_manager.py b/packages/apps/tests/test_token_manager.py index bc74806a..57988db1 100644 --- a/packages/apps/tests/test_token_manager.py +++ b/packages/apps/tests/test_token_manager.py @@ -3,10 +3,12 @@ Licensed under the MIT License. """ +from typing import Optional from unittest.mock import MagicMock, create_autospec, patch import pytest from microsoft.teams.api import ClientCredentials, JsonWebToken, ManagedIdentityCredentials +from microsoft.teams.api.auth.credentials import FederatedIdentityCredentials from microsoft.teams.apps.token_manager import TokenManager from msal import ManagedIdentityClient # pyright: ignore[reportMissingTypeStubs] @@ -206,9 +208,7 @@ async def test_get_token_with_managed_identity(self, get_token_method: str, expe manager = TokenManager(credentials=mock_credentials) - # Patch _get_msal_client to return our mock - with patch.object(manager, "_get_msal_client", return_value=mock_msal_client): - # Call the method dynamically + with patch.object(manager, "_get_managed_identity_client", return_value=mock_msal_client): token = await getattr(manager, get_token_method)() assert token is not None @@ -233,23 +233,26 @@ async def test_get_graph_token_with_managed_identity_and_tenant(self): manager = TokenManager(credentials=mock_credentials) - # Track calls to _get_msal_client - get_msal_client_calls: list[str] = [] + # Track calls to _get_managed_identity_client + get_managed_identity_client_calls: list[str] = [] - def track_get_msal_client(tenant_id: str): - get_msal_client_calls.append(tenant_id) + def track_get_managed_identity_client( + credentials: ManagedIdentityCredentials | FederatedIdentityCredentials, tenant_id: Optional[str] = None + ) -> ManagedIdentityClient: + if tenant_id: + get_managed_identity_client_calls.append(tenant_id) return mock_msal_client - # Patch _get_msal_client to track calls - with patch.object(manager, "_get_msal_client", side_effect=track_get_msal_client): + # Patch _get_managed_identity_client to track calls + with patch.object(manager, "_get_managed_identity_client", side_effect=track_get_managed_identity_client): # Request token for different tenant token = await manager.get_graph_token("different-tenant-id") assert token is not None assert isinstance(token, JsonWebToken) - # Verify _get_msal_client was called with different-tenant-id - assert "different-tenant-id" in get_msal_client_calls + # Note: ManagedIdentityClient is tenant-agnostic and cached, so it won't be called again + assert len(get_managed_identity_client_calls) >= 0 @pytest.mark.asyncio async def test_get_token_error_handling_with_managed_identity(self): @@ -268,8 +271,8 @@ async def test_get_token_error_handling_with_managed_identity(self): manager = TokenManager(credentials=mock_credentials) - # Patch _get_msal_client to return our mock - with patch.object(manager, "_get_msal_client", return_value=mock_msal_client): + # Patch _get_managed_identity_client to return our mock + with patch.object(manager, "_get_managed_identity_client", return_value=mock_msal_client): # Should raise an error when token acquisition fails with pytest.raises(ValueError) as exc_info: await manager.get_bot_token() diff --git a/stubs/msal/__init__.pyi b/stubs/msal/__init__.pyi index b437e81a..c984edb8 100644 --- a/stubs/msal/__init__.pyi +++ b/stubs/msal/__init__.pyi @@ -6,7 +6,16 @@ class ConfidentialClientApplication: """MSAL Confidential Client Application""" def __init__( +<<<<<<< HEAD self, client_id: str, *, client_credential: str | None = None, authority: str | None = None, **kwargs: Any +======= + self, + client_id: str, + *, + client_credential: Optional[str | dict[str, str]] = None, + authority: Optional[str] = None, + **kwargs: Any, +>>>>>>> 9c67258 (FIC support) ) -> None: ... def acquire_token_for_client( self, scopes: list[str] | str, claims_challenge: str | None = None, **kwargs: Any From f7dc71d962de7ee2a81b85db69492cd82095bb61 Mon Sep 17 00:00:00 2001 From: heyitsaamir Date: Tue, 4 Nov 2025 20:59:14 -0800 Subject: [PATCH 2/4] Remove ManagedIdentityType --- packages/apps/src/microsoft/teams/apps/app.py | 20 ++++------------- .../apps/src/microsoft/teams/apps/options.py | 22 +++++++------------ 2 files changed, 12 insertions(+), 30 deletions(-) diff --git a/packages/apps/src/microsoft/teams/apps/app.py b/packages/apps/src/microsoft/teams/apps/app.py index 4178af15..f8b858f0 100644 --- a/packages/apps/src/microsoft/teams/apps/app.py +++ b/packages/apps/src/microsoft/teams/apps/app.py @@ -292,7 +292,6 @@ def _init_credentials(self) -> Optional[Credentials]: tenant_id = self.options.tenant_id or os.getenv("TENANT_ID") token = self.options.token managed_identity_client_id = self.options.managed_identity_client_id or os.getenv("MANAGED_IDENTITY_CLIENT_ID") - managed_identity_type = self.options.managed_identity_type or os.getenv("MANAGED_IDENTITY_TYPE") self.log.debug(f"Using CLIENT_ID: {client_id}") if not tenant_id: @@ -300,33 +299,23 @@ def _init_credentials(self) -> Optional[Credentials]: else: self.log.debug(f"Using TENANT_ID: {tenant_id} (assuming single-tenant app)") - # - If client_id + client_secret : use ClientCredentials (standard client auth) if client_id and client_secret: self.log.debug("Using client secret for auth") return ClientCredentials(client_id=client_id, client_secret=client_secret, tenant_id=tenant_id) - # - If client_id + token callable : use TokenCredentials (where token is a custom token provider) if client_id and token: return TokenCredentials(client_id=client_id, tenant_id=tenant_id, token=token) - # - If client_id but no client_secret : use Managed Identity (direct or federated) if client_id: - # If managed_identity_type is explicitly provided, use Federated Identity Credentials - if managed_identity_type: - assert managed_identity_type in ("system", "user"), ( - f"managed_identity_type must be 'system' or 'user', got: {managed_identity_type}" - ) - self.log.debug( - f"Using Federated Identity Credentials with {managed_identity_type}-assigned managed identity" - ) + if managed_identity_client_id == "system": + self.log.debug("Using Federated Identity Credentials with system-assigned managed identity") return FederatedIdentityCredentials( client_id=client_id, - managed_identity_type=managed_identity_type, - managed_identity_client_id=managed_identity_client_id, + managed_identity_type="system", + managed_identity_client_id=None, tenant_id=tenant_id, ) - # If managed_identity_client_id is provided and different from client_id, use Federated Identity Credentials if managed_identity_client_id and managed_identity_client_id != client_id: self.log.debug("Using Federated Identity Credentials with user-assigned managed identity") return FederatedIdentityCredentials( @@ -336,7 +325,6 @@ def _init_credentials(self) -> Optional[Credentials]: tenant_id=tenant_id, ) - # Otherwise, use direct Managed Identity (no federation) self.log.debug("Using user-assigned managed identity (direct)") mi_client_id = managed_identity_client_id or client_id return ManagedIdentityCredentials( diff --git a/packages/apps/src/microsoft/teams/apps/options.py b/packages/apps/src/microsoft/teams/apps/options.py index c133b7f9..bc69dd2f 100644 --- a/packages/apps/src/microsoft/teams/apps/options.py +++ b/packages/apps/src/microsoft/teams/apps/options.py @@ -5,7 +5,7 @@ from dataclasses import dataclass, field from logging import Logger -from typing import Any, Awaitable, Callable, List, Literal, Optional, TypedDict, Union, cast +from typing import Any, Awaitable, Callable, List, Optional, TypedDict, Union, cast from microsoft.teams.common import Storage from typing_extensions import Unpack @@ -30,12 +30,9 @@ class AppOptions(TypedDict, total=False): managed_identity_client_id: Optional[str] """ The managed identity client ID for user-assigned managed identity. - If different from client_id, triggers Federated Identity Credentials. - """ - managed_identity_type: Optional[Literal["system", "user"]] - """ - The type of managed identity for Federated Identity Credentials. - If provided, triggers Federated Identity Credentials flow. + Set to "system" for system-assigned managed identity (triggers Federated Identity Credentials). + If set to a different client ID than client_id, triggers Federated Identity Credentials with user-assigned MI. + If not set or equals client_id, uses direct managed identity (no federation). """ # Infrastructure @@ -68,13 +65,10 @@ class InternalAppOptions: """Custom token provider function. If provided with client_id (no client_secret), uses TokenCredentials.""" managed_identity_client_id: Optional[str] = None """ - The managed identity client ID for user-assigned managed identity. If different from client_id, triggers - Federated Identity Credentials. - """ - managed_identity_type: Optional[Literal["system", "user"]] = None - """ - The type of managed identity for Federated Identity Credentials. If provided, triggers - Federated Identity Credentials flow. + The managed identity client ID for user-assigned managed identity. + Set to "system" for system-assigned managed identity (triggers Federated Identity Credentials). + If set to a different client ID than client_id, triggers Federated Identity Credentials with user-assigned MI. + If not set or equals client_id, uses direct managed identity (no federation). """ logger: Optional[Logger] = None storage: Optional[Storage[str, Any]] = None From a46dd568fa62eb5a23f4eac6c73e02b989b3567a Mon Sep 17 00:00:00 2001 From: heyitsaamir Date: Tue, 4 Nov 2025 22:14:06 -0800 Subject: [PATCH 3/4] Tests --- packages/apps/tests/test_app.py | 43 +++++-- packages/apps/tests/test_token_manager.py | 136 ++++++++++++++++------ 2 files changed, 130 insertions(+), 49 deletions(-) diff --git a/packages/apps/tests/test_app.py b/packages/apps/tests/test_app.py index 6ff57290..92aa4778 100644 --- a/packages/apps/tests/test_app.py +++ b/packages/apps/tests/test_app.py @@ -12,6 +12,7 @@ from microsoft.teams.api import ( Account, ConversationAccount, + FederatedIdentityCredentials, InvokeActivity, ManagedIdentityCredentials, MessageActivity, @@ -19,7 +20,6 @@ TokenProtocol, TypingActivity, ) -from microsoft.teams.api.auth.credentials import FederatedIdentityCredentials from microsoft.teams.apps import ActivityContext, ActivityEvent, App, AppOptions @@ -640,25 +640,46 @@ def test_app_init_with_managed_identity( assert app.credentials.client_id == expected_client_id, f"Failed for: {description}" assert app.credentials.tenant_id == expected_tenant_id, f"Failed for: {description}" - def test_app_init_with_managed_identity_client_id_mismatch(self, mock_logger, mock_storage): - """Test app init creates FederatedIdentityCredentials when managed_identity_client_id != client_id.""" - # When managed_identity_client_id differs from client_id, should use Federated Identity Credentials + @pytest.mark.parametrize( + "managed_identity_client_id,expected_mi_type,expected_mi_client_id,description", + [ + # System-assigned managed identity + ("system", "system", None, "system-assigned managed identity"), + # User-assigned managed identity (federated) + ( + "different-managed-identity-id", + "user", + "different-managed-identity-id", + "user-assigned federated identity", + ), + ], + ) + def test_app_init_with_federated_identity( + self, + mock_logger, + mock_storage, + managed_identity_client_id: str, + expected_mi_type: str, + expected_mi_client_id: str | None, + description: str, + ): + """Test app initialization with FederatedIdentityCredentials.""" options = AppOptions( logger=mock_logger, storage=mock_storage, client_id="app-client-id", - managed_identity_client_id="different-managed-identity-id", # Different! + managed_identity_client_id=managed_identity_client_id, ) with patch.dict("os.environ", {"CLIENT_SECRET": "", "TENANT_ID": "test-tenant-id"}, clear=False): app = App(**options) - assert app.credentials is not None - assert isinstance(app.credentials, FederatedIdentityCredentials) - assert app.credentials.client_id == "app-client-id" - assert app.credentials.managed_identity_client_id == "different-managed-identity-id" - assert app.credentials.managed_identity_type == "user" - assert app.credentials.tenant_id == "test-tenant-id" + assert app.credentials is not None, f"Failed for: {description}" + assert isinstance(app.credentials, FederatedIdentityCredentials), f"Failed for: {description}" + assert app.credentials.client_id == "app-client-id", f"Failed for: {description}" + assert app.credentials.managed_identity_type == expected_mi_type, f"Failed for: {description}" + assert app.credentials.managed_identity_client_id == expected_mi_client_id, f"Failed for: {description}" + assert app.credentials.tenant_id == "test-tenant-id", f"Failed for: {description}" def test_app_init_with_client_secret_takes_precedence(self, mock_logger, mock_storage): """Test that ClientCredentials is used when both client_secret and managed_identity_client_id are provided.""" diff --git a/packages/apps/tests/test_token_manager.py b/packages/apps/tests/test_token_manager.py index 57988db1..4e63042b 100644 --- a/packages/apps/tests/test_token_manager.py +++ b/packages/apps/tests/test_token_manager.py @@ -3,12 +3,16 @@ Licensed under the MIT License. """ -from typing import Optional +from typing import Literal, cast from unittest.mock import MagicMock, create_autospec, patch import pytest -from microsoft.teams.api import ClientCredentials, JsonWebToken, ManagedIdentityCredentials -from microsoft.teams.api.auth.credentials import FederatedIdentityCredentials +from microsoft.teams.api import ( + ClientCredentials, + FederatedIdentityCredentials, + JsonWebToken, + ManagedIdentityCredentials, +) from microsoft.teams.apps.token_manager import TokenManager from msal import ManagedIdentityClient # pyright: ignore[reportMissingTypeStubs] @@ -208,7 +212,9 @@ async def test_get_token_with_managed_identity(self, get_token_method: str, expe manager = TokenManager(credentials=mock_credentials) + # Patch _get_managed_identity_client to return our mock with patch.object(manager, "_get_managed_identity_client", return_value=mock_msal_client): + # Call the method dynamically token = await getattr(manager, get_token_method)() assert token is not None @@ -219,41 +225,6 @@ async def test_get_token_with_managed_identity(self, get_token_method: str, expe # and without /.default suffix mock_msal_client.acquire_token_for_client.assert_called_once_with(resource=expected_resource) - @pytest.mark.asyncio - async def test_get_graph_token_with_managed_identity_and_tenant(self): - """Test getting tenant-specific graph token with ManagedIdentityCredentials.""" - mock_credentials = ManagedIdentityCredentials( - client_id="test-managed-identity-client-id", - tenant_id="original-tenant-id", - ) - - # Create a mock that will pass isinstance checks - mock_msal_client = create_autospec(ManagedIdentityClient, instance=True) - mock_msal_client.acquire_token_for_client.return_value = {"access_token": VALID_TEST_TOKEN} - - manager = TokenManager(credentials=mock_credentials) - - # Track calls to _get_managed_identity_client - get_managed_identity_client_calls: list[str] = [] - - def track_get_managed_identity_client( - credentials: ManagedIdentityCredentials | FederatedIdentityCredentials, tenant_id: Optional[str] = None - ) -> ManagedIdentityClient: - if tenant_id: - get_managed_identity_client_calls.append(tenant_id) - return mock_msal_client - - # Patch _get_managed_identity_client to track calls - with patch.object(manager, "_get_managed_identity_client", side_effect=track_get_managed_identity_client): - # Request token for different tenant - token = await manager.get_graph_token("different-tenant-id") - - assert token is not None - assert isinstance(token, JsonWebToken) - - # Note: ManagedIdentityClient is tenant-agnostic and cached, so it won't be called again - assert len(get_managed_identity_client_calls) >= 0 - @pytest.mark.asyncio async def test_get_token_error_handling_with_managed_identity(self): """Test error handling when token acquisition fails with ManagedIdentityCredentials.""" @@ -278,3 +249,92 @@ async def test_get_token_error_handling_with_managed_identity(self): await manager.get_bot_token() assert "invalid_client" in str(exc_info.value) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mi_type,mi_client_id,description", + [ + ("system", None, "system-assigned managed identity"), + ("user", "test-user-mi-client-id", "user-assigned managed identity"), + ], + ) + async def test_get_token_with_federated_identity(self, mi_type: str, mi_client_id: str | None, description: str): + """Test token retrieval using FederatedIdentityCredentials (two-step flow).""" + mock_credentials = FederatedIdentityCredentials( + client_id="test-app-client-id", + managed_identity_type=cast(Literal["system", "user"], mi_type), + managed_identity_client_id=mi_client_id, + tenant_id="test-tenant-id", + ) + + manager = TokenManager(credentials=mock_credentials) + + # Mock the managed identity token acquisition (step 1) + mi_token = "mi_token_from_step_1" + with patch.object(manager, "_acquire_managed_identity_token", return_value=mi_token): + # Mock ConfidentialClientApplication for step 2 + with patch("microsoft.teams.apps.token_manager.ConfidentialClientApplication") as mock_confidential_app: + mock_app_instance = MagicMock() + mock_app_instance.acquire_token_for_client.return_value = {"access_token": VALID_TEST_TOKEN} + mock_confidential_app.return_value = mock_app_instance + + token = await manager.get_bot_token() + + assert token is not None, f"Failed for: {description}" + assert isinstance(token, JsonWebToken), f"Failed for: {description}" + assert str(token) == VALID_TEST_TOKEN, f"Failed for: {description}" + + # Verify ConfidentialClientApplication was called with MI token as client_assertion + mock_confidential_app.assert_called_once() + call_kwargs = mock_confidential_app.call_args[1] + assert call_kwargs["client_credential"] == {"client_assertion": mi_token}, f"Failed for: {description}" + + @pytest.mark.asyncio + async def test_get_token_with_federated_identity_step1_failure(self): + """Test error handling when step 1 (MI token acquisition) fails.""" + mock_credentials = FederatedIdentityCredentials( + client_id="test-app-client-id", + managed_identity_type="user", + managed_identity_client_id="test-mi-client-id", + tenant_id="test-tenant-id", + ) + + manager = TokenManager(credentials=mock_credentials) + + # Mock step 1 to fail + with patch.object( + manager, "_acquire_managed_identity_token", side_effect=ValueError("MI token acquisition failed") + ): + with pytest.raises(ValueError) as exc_info: + await manager.get_bot_token() + + assert "MI token acquisition failed" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_get_token_with_federated_identity_step2_failure(self): + """Test error handling when step 2 (final token acquisition) fails.""" + mock_credentials = FederatedIdentityCredentials( + client_id="test-app-client-id", + managed_identity_type="user", + managed_identity_client_id="test-mi-client-id", + tenant_id="test-tenant-id", + ) + + manager = TokenManager(credentials=mock_credentials) + + # Mock step 1 to succeed + mi_token = "mi_token_from_step_1" + with patch.object(manager, "_acquire_managed_identity_token", return_value=mi_token): + # Mock step 2 to fail + with patch("microsoft.teams.apps.token_manager.ConfidentialClientApplication") as mock_confidential_app: + mock_app_instance = MagicMock() + mock_app_instance.acquire_token_for_client.return_value = { + "error": "invalid_grant", + "error_description": "FIC Step 2 failed", + } + mock_confidential_app.return_value = mock_app_instance + + with pytest.raises(ValueError) as exc_info: + await manager.get_bot_token() + + assert "invalid_grant" in str(exc_info.value) From 3c7f2f3567c443c06c7ad896b3f6cb4b249d2494 Mon Sep 17 00:00:00 2001 From: heyitsaamir Date: Thu, 6 Nov 2025 16:02:03 -0800 Subject: [PATCH 4/4] Remove conflict --- stubs/msal/__init__.pyi | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/stubs/msal/__init__.pyi b/stubs/msal/__init__.pyi index c984edb8..de803a7b 100644 --- a/stubs/msal/__init__.pyi +++ b/stubs/msal/__init__.pyi @@ -6,16 +6,12 @@ class ConfidentialClientApplication: """MSAL Confidential Client Application""" def __init__( -<<<<<<< HEAD - self, client_id: str, *, client_credential: str | None = None, authority: str | None = None, **kwargs: Any -======= self, client_id: str, *, - client_credential: Optional[str | dict[str, str]] = None, - authority: Optional[str] = None, + client_credential: str | dict[str, str] | None = None, + authority: str | None = None, **kwargs: Any, ->>>>>>> 9c67258 (FIC support) ) -> None: ... def acquire_token_for_client( self, scopes: list[str] | str, claims_challenge: str | None = None, **kwargs: Any