diff --git a/packages/api/src/microsoft/teams/api/auth/__init__.py b/packages/api/src/microsoft/teams/api/auth/__init__.py index 227dfa43..b177eb4b 100644 --- a/packages/api/src/microsoft/teams/api/auth/__init__.py +++ b/packages/api/src/microsoft/teams/api/auth/__init__.py @@ -4,7 +4,13 @@ """ from .caller import CallerIds, CallerType -from .credentials import ClientCredentials, Credentials, ManagedIdentityCredentials, TokenCredentials +from .credentials import ( + ClientCredentials, + Credentials, + FederatedIdentityCredentials, + ManagedIdentityCredentials, + TokenCredentials, +) from .json_web_token import JsonWebToken, JsonWebTokenPayload from .token import TokenProtocol @@ -13,6 +19,7 @@ "CallerType", "ClientCredentials", "Credentials", + "FederatedIdentityCredentials", "ManagedIdentityCredentials", "TokenCredentials", "TokenProtocol", diff --git a/packages/api/src/microsoft/teams/api/auth/credentials.py b/packages/api/src/microsoft/teams/api/auth/credentials.py index 7417fd72..471c7952 100644 --- a/packages/api/src/microsoft/teams/api/auth/credentials.py +++ b/packages/api/src/microsoft/teams/api/auth/credentials.py @@ -3,7 +3,7 @@ Licensed under the MIT License. """ -from typing import Awaitable, Callable, Optional, Union +from typing import Awaitable, Callable, Literal, Optional, Union from ..models import CustomBaseModel @@ -56,5 +56,27 @@ class ManagedIdentityCredentials(CustomBaseModel): """ +class FederatedIdentityCredentials(CustomBaseModel): + """Credentials for authentication using Federated Identity Credentials with Managed Identity.""" + + client_id: str + """ + The client ID of the app registration. + """ + managed_identity_type: Literal["system", "user"] + """ + The type of managed identity: 'system' for system-assigned or 'user' for user-assigned. + """ + managed_identity_client_id: Optional[str] = None + """ + The client ID of the user-assigned managed identity. + Required when managed_identity_type is 'user'. + """ + tenant_id: Optional[str] = None + """ + The tenant ID. + """ + + # Union type for credentials -Credentials = Union[ClientCredentials, TokenCredentials, ManagedIdentityCredentials] +Credentials = Union[ClientCredentials, TokenCredentials, ManagedIdentityCredentials, FederatedIdentityCredentials] diff --git a/packages/apps/src/microsoft/teams/apps/app.py b/packages/apps/src/microsoft/teams/apps/app.py index d4a8373f..f8b858f0 100644 --- a/packages/apps/src/microsoft/teams/apps/app.py +++ b/packages/apps/src/microsoft/teams/apps/app.py @@ -21,6 +21,7 @@ ConversationAccount, ConversationReference, Credentials, + FederatedIdentityCredentials, ManagedIdentityCredentials, MessageActivityInput, TokenCredentials, @@ -298,26 +299,33 @@ def _init_credentials(self) -> Optional[Credentials]: else: self.log.debug(f"Using TENANT_ID: {tenant_id} (assuming single-tenant app)") - # - If client_id + client_secret : use ClientCredentials (standard client auth) if client_id and client_secret: self.log.debug("Using client secret for auth") return ClientCredentials(client_id=client_id, client_secret=client_secret, tenant_id=tenant_id) - # - If client_id + token callable : use TokenCredentials (where token is a custom token provider) if client_id and token: return TokenCredentials(client_id=client_id, tenant_id=tenant_id, token=token) - # - If client_id but no client_secret : use ManagedIdentityCredentials (inferred) if client_id: - # Validate that if managed_identity_client_id is provided, it must equal client_id + if managed_identity_client_id == "system": + self.log.debug("Using Federated Identity Credentials with system-assigned managed identity") + return FederatedIdentityCredentials( + client_id=client_id, + managed_identity_type="system", + managed_identity_client_id=None, + tenant_id=tenant_id, + ) + if managed_identity_client_id and managed_identity_client_id != client_id: - raise ValueError( - "Federated Identity Credentials is not yet supported. " - "managed_identity_client_id must equal client_id." + self.log.debug("Using Federated Identity Credentials with user-assigned managed identity") + return FederatedIdentityCredentials( + client_id=client_id, + managed_identity_type="user", + managed_identity_client_id=managed_identity_client_id, + tenant_id=tenant_id, ) - self.log.debug("Using user-assigned managed identity for auth") - # Use managed_identity_client_id if provided, otherwise fall back to client_id + self.log.debug("Using user-assigned managed identity (direct)") mi_client_id = managed_identity_client_id or client_id return ManagedIdentityCredentials( client_id=mi_client_id, diff --git a/packages/apps/src/microsoft/teams/apps/options.py b/packages/apps/src/microsoft/teams/apps/options.py index a37ab5e5..bc69dd2f 100644 --- a/packages/apps/src/microsoft/teams/apps/options.py +++ b/packages/apps/src/microsoft/teams/apps/options.py @@ -30,7 +30,9 @@ class AppOptions(TypedDict, total=False): managed_identity_client_id: Optional[str] """ The managed identity client ID for user-assigned managed identity. - Defaults to client_id if not provided. + Set to "system" for system-assigned managed identity (triggers Federated Identity Credentials). + If set to a different client ID than client_id, triggers Federated Identity Credentials with user-assigned MI. + If not set or equals client_id, uses direct managed identity (no federation). """ # Infrastructure @@ -62,7 +64,12 @@ class InternalAppOptions: token: Optional[Callable[[Union[str, list[str]], Optional[str]], Union[str, Awaitable[str]]]] = None """Custom token provider function. If provided with client_id (no client_secret), uses TokenCredentials.""" managed_identity_client_id: Optional[str] = None - """The managed identity client ID for user-assigned managed identity. Defaults to client_id if not provided.""" + """ + The managed identity client ID for user-assigned managed identity. + Set to "system" for system-assigned managed identity (triggers Federated Identity Credentials). + If set to a different client ID than client_id, triggers Federated Identity Credentials with user-assigned MI. + If not set or equals client_id, uses direct managed identity (no federation). + """ logger: Optional[Logger] = None storage: Optional[Storage[str, Any]] = None diff --git a/packages/apps/src/microsoft/teams/apps/token_manager.py b/packages/apps/src/microsoft/teams/apps/token_manager.py index ba668fab..72c87fc1 100644 --- a/packages/apps/src/microsoft/teams/apps/token_manager.py +++ b/packages/apps/src/microsoft/teams/apps/token_manager.py @@ -15,11 +15,16 @@ JsonWebToken, TokenProtocol, ) -from microsoft.teams.api.auth.credentials import ManagedIdentityCredentials, TokenCredentials +from microsoft.teams.api.auth.credentials import ( + FederatedIdentityCredentials, + ManagedIdentityCredentials, + TokenCredentials, +) from microsoft.teams.common import ConsoleLogger from msal import ( ConfidentialClientApplication, ManagedIdentityClient, + SystemAssignedManagedIdentity, UserAssignedManagedIdentity, ) @@ -77,74 +82,166 @@ async def _get_token( if caller_name: self._logger.debug(f"No credentials provided for {caller_name}") return None - if isinstance(credentials, (ClientCredentials, ManagedIdentityCredentials)): - msal_client = self._get_msal_client(tenant_id) - - # Handle different acquire_token_for_client signatures - if isinstance(msal_client, ManagedIdentityClient): - # ManagedIdentityClient expects resource as a keyword-only string parameter - scope = scope.removesuffix("/.default") - token_res: dict[str, Any] | None = await asyncio.to_thread( - lambda: msal_client.acquire_token_for_client(resource=scope) - ) - else: - # ConfidentialClientApplication expects scopes as a list - token_res: dict[str, Any] | None = await asyncio.to_thread( - lambda: msal_client.acquire_token_for_client([scope]) - ) - - if token_res.get("access_token", None): - access_token = token_res["access_token"] - return JsonWebToken(access_token) - else: - self._logger.debug(f"TokenRes: {token_res}") - error = token_res.get("error", "Error retrieving token") - if not isinstance(error, BaseException): - error = ValueError(error) - error_description = token_res.get("error_description", "Error retrieving token from MSAL") - self._logger.error(error_description) - raise error + if isinstance(credentials, ClientCredentials): + return await self._get_token_with_client_credentials(credentials, scope, tenant_id) + elif isinstance(credentials, ManagedIdentityCredentials): + return await self._get_token_with_managed_identity(credentials, scope) + elif isinstance(credentials, FederatedIdentityCredentials): + return await self._get_token_with_federated_identity(credentials, scope, tenant_id) elif isinstance(credentials, TokenCredentials): - token = credentials.token(scope, tenant_id) - if isawaitable(token): - access_token = await token - else: - access_token = token + return await self._get_token_with_token_provider(credentials, scope, tenant_id) + + return None + + async def _get_token_with_client_credentials( + self, + credentials: ClientCredentials, + scope: str, + tenant_id: str, + ) -> TokenProtocol: + """Get token using ClientCredentials (client secret).""" + confidential_client = self._get_confidential_client(credentials, tenant_id) + + # ConfidentialClientApplication expects scopes as a list + token_res: dict[str, Any] = await asyncio.to_thread( + lambda: confidential_client.acquire_token_for_client([scope]) + ) + + return self._handle_token_response(token_res) + + async def _get_token_with_managed_identity( + self, + credentials: ManagedIdentityCredentials, + scope: str, + ) -> TokenProtocol: + """Get token using ManagedIdentityCredentials (direct, no federation).""" + mi_client = self._get_managed_identity_client(credentials) + + # ManagedIdentityClient expects resource as a keyword-only string parameter + resource = scope.removesuffix("/.default") + token_res: dict[str, Any] = await asyncio.to_thread( + lambda: mi_client.acquire_token_for_client(resource=resource) + ) + + return self._handle_token_response(token_res) + + async def _get_token_with_federated_identity( + self, + credentials: FederatedIdentityCredentials, + scope: str, + tenant_id: str, + ) -> TokenProtocol: + """Get token using Federated Identity Credentials (two-step flow).""" + + # Step 1: Get MI token from api://AzureADTokenExchange + mi_token = await self._acquire_managed_identity_token(credentials) + + # Step 2: Use MI token as client_assertion to get final access token + confidential_client = ConfidentialClientApplication( + credentials.client_id, + client_credential={"client_assertion": mi_token}, + authority=DEFAULT_TOKEN_AUTHORITY.format(tenant_id=tenant_id), + ) + + token_res: dict[str, Any] = await asyncio.to_thread( + lambda: confidential_client.acquire_token_for_client([scope]) + ) + + return self._handle_token_response(token_res, error_prefix="FIC Step 2 failed") + async def _acquire_managed_identity_token(self, credentials: FederatedIdentityCredentials) -> str: + """Acquire managed identity token for federated identity credentials.""" + # Use shared method to get or create the managed identity client + mi_client = self._get_managed_identity_client(credentials) + + mi_token_res: dict[str, Any] = await asyncio.to_thread( + lambda: mi_client.acquire_token_for_client(resource="api://AzureADTokenExchange") + ) + + if not mi_token_res.get("access_token"): + self._logger.error("FIC Step 1 failed: Could not acquire MI token") + error = mi_token_res.get("error", ValueError("Error retrieving MI token")) + if not isinstance(error, BaseException): + error = ValueError(error) + raise error + + return mi_token_res["access_token"] + + async def _get_token_with_token_provider( + self, + credentials: TokenCredentials, + scope: str, + tenant_id: str, + ) -> TokenProtocol: + """Get token using custom token provider function.""" + token = credentials.token(scope, tenant_id) + + if isawaitable(token): + access_token = await token + else: + access_token = token + + return JsonWebToken(access_token) + + def _handle_token_response(self, token_res: dict[str, Any], error_prefix: str = "") -> TokenProtocol: + """Handle token response from MSAL client.""" + if token_res.get("access_token", None): + access_token = token_res["access_token"] return JsonWebToken(access_token) + else: + error_msg = f"{error_prefix}: " if error_prefix else "" + self._logger.error(f"{error_msg}Could not acquire access token") + self._logger.debug(f"TokenRes: {token_res}") + + error = token_res.get("error", "Error retrieving token") + if not isinstance(error, BaseException): + error = ValueError(error) + + error_description = token_res.get("error_description", "Error retrieving token from MSAL") + self._logger.error(error_description) + raise error + + def _get_confidential_client(self, credentials: ClientCredentials, tenant_id: str) -> ConfidentialClientApplication: + """Get or create ConfidentialClientApplication for ClientCredentials.""" + # Check if client already exists in cache + cached_client = self._confidential_clients_by_tenant.get(tenant_id) + if cached_client: + return cached_client + + client: ConfidentialClientApplication = ConfidentialClientApplication( + credentials.client_id, + client_credential=credentials.client_secret, + authority=f"https://login.microsoftonline.com/{tenant_id}", + ) + self._confidential_clients_by_tenant[tenant_id] = client + return client - def _get_msal_client(self, tenant_id: str) -> ConfidentialClientApplication | ManagedIdentityClient: - credentials = self._credentials + def _get_managed_identity_client( + self, credentials: ManagedIdentityCredentials | FederatedIdentityCredentials + ) -> ManagedIdentityClient: + """Get or create ManagedIdentityClient for ManagedIdentityCredentials or FederatedIdentityCredentials.""" + # Check if client already exists in cache - # Create the appropriate client based on credential type - if isinstance(credentials, ClientCredentials): - # Check if client already exists in cache for this tenant - cached_client = self._confidential_clients_by_tenant.get(tenant_id) - if cached_client: - return cached_client - - client: ConfidentialClientApplication = ConfidentialClientApplication( - credentials.client_id, - client_credential=credentials.client_secret, - authority=f"https://login.microsoftonline.com/{tenant_id}", - ) - self._confidential_clients_by_tenant[tenant_id] = client - return client - elif isinstance(credentials, ManagedIdentityCredentials): - # ManagedIdentityClient is tenant-agnostic, cache single instance - if self._managed_identity_client: - return self._managed_identity_client + # ManagedIdentityClient is tenant-agnostic, cache single instance + if self._managed_identity_client: + return self._managed_identity_client - # Create user-assigned managed identity + # Determine managed identity type + if isinstance(credentials, FederatedIdentityCredentials): + if credentials.managed_identity_type == "system": + managed_identity = SystemAssignedManagedIdentity() + else: # "user" + mi_client_id = credentials.managed_identity_client_id or credentials.client_id + managed_identity = UserAssignedManagedIdentity(client_id=mi_client_id) + else: # ManagedIdentityCredentials + # ManagedIdentityCredentials only supports user-assigned managed_identity = UserAssignedManagedIdentity(client_id=credentials.client_id) - self._managed_identity_client = ManagedIdentityClient( - managed_identity, - http_client=requests.Session(), - ) - return self._managed_identity_client - else: - raise ValueError(f"Unsupported credential type: {type(credentials)}") + self._managed_identity_client = ManagedIdentityClient( + managed_identity, + http_client=requests.Session(), + ) + return self._managed_identity_client def _resolve_tenant_id(self, tenant_id: str | None, default_tenant_id: str): return tenant_id or (self._credentials.tenant_id if self._credentials else False) or default_tenant_id diff --git a/packages/apps/tests/test_app.py b/packages/apps/tests/test_app.py index 7f8c9e68..92aa4778 100644 --- a/packages/apps/tests/test_app.py +++ b/packages/apps/tests/test_app.py @@ -12,6 +12,7 @@ from microsoft.teams.api import ( Account, ConversationAccount, + FederatedIdentityCredentials, InvokeActivity, ManagedIdentityCredentials, MessageActivity, @@ -639,23 +640,46 @@ def test_app_init_with_managed_identity( assert app.credentials.client_id == expected_client_id, f"Failed for: {description}" assert app.credentials.tenant_id == expected_tenant_id, f"Failed for: {description}" - def test_app_init_with_managed_identity_client_id_mismatch(self, mock_logger, mock_storage): - """Test app init raises error when managed_identity_client_id != client_id (federated identity).""" - # When managed_identity_client_id differs from client_id, should raise error - # (Federated Identity Credentials not yet supported) + @pytest.mark.parametrize( + "managed_identity_client_id,expected_mi_type,expected_mi_client_id,description", + [ + # System-assigned managed identity + ("system", "system", None, "system-assigned managed identity"), + # User-assigned managed identity (federated) + ( + "different-managed-identity-id", + "user", + "different-managed-identity-id", + "user-assigned federated identity", + ), + ], + ) + def test_app_init_with_federated_identity( + self, + mock_logger, + mock_storage, + managed_identity_client_id: str, + expected_mi_type: str, + expected_mi_client_id: str | None, + description: str, + ): + """Test app initialization with FederatedIdentityCredentials.""" options = AppOptions( logger=mock_logger, storage=mock_storage, client_id="app-client-id", - managed_identity_client_id="different-managed-identity-id", # Different! + managed_identity_client_id=managed_identity_client_id, ) with patch.dict("os.environ", {"CLIENT_SECRET": "", "TENANT_ID": "test-tenant-id"}, clear=False): - with pytest.raises(ValueError) as exc_info: - App(**options) + app = App(**options) - assert "Federated Identity Credentials is not yet supported" in str(exc_info.value) - assert "managed_identity_client_id must equal client_id" in str(exc_info.value) + assert app.credentials is not None, f"Failed for: {description}" + assert isinstance(app.credentials, FederatedIdentityCredentials), f"Failed for: {description}" + assert app.credentials.client_id == "app-client-id", f"Failed for: {description}" + assert app.credentials.managed_identity_type == expected_mi_type, f"Failed for: {description}" + assert app.credentials.managed_identity_client_id == expected_mi_client_id, f"Failed for: {description}" + assert app.credentials.tenant_id == "test-tenant-id", f"Failed for: {description}" def test_app_init_with_client_secret_takes_precedence(self, mock_logger, mock_storage): """Test that ClientCredentials is used when both client_secret and managed_identity_client_id are provided.""" diff --git a/packages/apps/tests/test_token_manager.py b/packages/apps/tests/test_token_manager.py index bc74806a..4e63042b 100644 --- a/packages/apps/tests/test_token_manager.py +++ b/packages/apps/tests/test_token_manager.py @@ -3,10 +3,16 @@ Licensed under the MIT License. """ +from typing import Literal, cast from unittest.mock import MagicMock, create_autospec, patch import pytest -from microsoft.teams.api import ClientCredentials, JsonWebToken, ManagedIdentityCredentials +from microsoft.teams.api import ( + ClientCredentials, + FederatedIdentityCredentials, + JsonWebToken, + ManagedIdentityCredentials, +) from microsoft.teams.apps.token_manager import TokenManager from msal import ManagedIdentityClient # pyright: ignore[reportMissingTypeStubs] @@ -206,8 +212,8 @@ async def test_get_token_with_managed_identity(self, get_token_method: str, expe manager = TokenManager(credentials=mock_credentials) - # Patch _get_msal_client to return our mock - with patch.object(manager, "_get_msal_client", return_value=mock_msal_client): + # Patch _get_managed_identity_client to return our mock + with patch.object(manager, "_get_managed_identity_client", return_value=mock_msal_client): # Call the method dynamically token = await getattr(manager, get_token_method)() @@ -220,58 +226,115 @@ async def test_get_token_with_managed_identity(self, get_token_method: str, expe mock_msal_client.acquire_token_for_client.assert_called_once_with(resource=expected_resource) @pytest.mark.asyncio - async def test_get_graph_token_with_managed_identity_and_tenant(self): - """Test getting tenant-specific graph token with ManagedIdentityCredentials.""" + async def test_get_token_error_handling_with_managed_identity(self): + """Test error handling when token acquisition fails with ManagedIdentityCredentials.""" mock_credentials = ManagedIdentityCredentials( client_id="test-managed-identity-client-id", - tenant_id="original-tenant-id", + tenant_id="test-tenant-id", ) - # Create a mock that will pass isinstance checks + # Create a mock that returns an error mock_msal_client = create_autospec(ManagedIdentityClient, instance=True) - mock_msal_client.acquire_token_for_client.return_value = {"access_token": VALID_TEST_TOKEN} + mock_msal_client.acquire_token_for_client.return_value = { + "error": "invalid_client", + "error_description": "Invalid managed identity configuration", + } manager = TokenManager(credentials=mock_credentials) - # Track calls to _get_msal_client - get_msal_client_calls: list[str] = [] + # Patch _get_managed_identity_client to return our mock + with patch.object(manager, "_get_managed_identity_client", return_value=mock_msal_client): + # Should raise an error when token acquisition fails + with pytest.raises(ValueError) as exc_info: + await manager.get_bot_token() - def track_get_msal_client(tenant_id: str): - get_msal_client_calls.append(tenant_id) - return mock_msal_client + assert "invalid_client" in str(exc_info.value) - # Patch _get_msal_client to track calls - with patch.object(manager, "_get_msal_client", side_effect=track_get_msal_client): - # Request token for different tenant - token = await manager.get_graph_token("different-tenant-id") + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mi_type,mi_client_id,description", + [ + ("system", None, "system-assigned managed identity"), + ("user", "test-user-mi-client-id", "user-assigned managed identity"), + ], + ) + async def test_get_token_with_federated_identity(self, mi_type: str, mi_client_id: str | None, description: str): + """Test token retrieval using FederatedIdentityCredentials (two-step flow).""" + mock_credentials = FederatedIdentityCredentials( + client_id="test-app-client-id", + managed_identity_type=cast(Literal["system", "user"], mi_type), + managed_identity_client_id=mi_client_id, + tenant_id="test-tenant-id", + ) - assert token is not None - assert isinstance(token, JsonWebToken) + manager = TokenManager(credentials=mock_credentials) + + # Mock the managed identity token acquisition (step 1) + mi_token = "mi_token_from_step_1" + with patch.object(manager, "_acquire_managed_identity_token", return_value=mi_token): + # Mock ConfidentialClientApplication for step 2 + with patch("microsoft.teams.apps.token_manager.ConfidentialClientApplication") as mock_confidential_app: + mock_app_instance = MagicMock() + mock_app_instance.acquire_token_for_client.return_value = {"access_token": VALID_TEST_TOKEN} + mock_confidential_app.return_value = mock_app_instance + + token = await manager.get_bot_token() - # Verify _get_msal_client was called with different-tenant-id - assert "different-tenant-id" in get_msal_client_calls + assert token is not None, f"Failed for: {description}" + assert isinstance(token, JsonWebToken), f"Failed for: {description}" + assert str(token) == VALID_TEST_TOKEN, f"Failed for: {description}" + + # Verify ConfidentialClientApplication was called with MI token as client_assertion + mock_confidential_app.assert_called_once() + call_kwargs = mock_confidential_app.call_args[1] + assert call_kwargs["client_credential"] == {"client_assertion": mi_token}, f"Failed for: {description}" @pytest.mark.asyncio - async def test_get_token_error_handling_with_managed_identity(self): - """Test error handling when token acquisition fails with ManagedIdentityCredentials.""" - mock_credentials = ManagedIdentityCredentials( - client_id="test-managed-identity-client-id", + async def test_get_token_with_federated_identity_step1_failure(self): + """Test error handling when step 1 (MI token acquisition) fails.""" + mock_credentials = FederatedIdentityCredentials( + client_id="test-app-client-id", + managed_identity_type="user", + managed_identity_client_id="test-mi-client-id", tenant_id="test-tenant-id", ) - # Create a mock that returns an error - mock_msal_client = create_autospec(ManagedIdentityClient, instance=True) - mock_msal_client.acquire_token_for_client.return_value = { - "error": "invalid_client", - "error_description": "Invalid managed identity configuration", - } - manager = TokenManager(credentials=mock_credentials) - # Patch _get_msal_client to return our mock - with patch.object(manager, "_get_msal_client", return_value=mock_msal_client): - # Should raise an error when token acquisition fails + # Mock step 1 to fail + with patch.object( + manager, "_acquire_managed_identity_token", side_effect=ValueError("MI token acquisition failed") + ): with pytest.raises(ValueError) as exc_info: await manager.get_bot_token() - assert "invalid_client" in str(exc_info.value) + assert "MI token acquisition failed" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_get_token_with_federated_identity_step2_failure(self): + """Test error handling when step 2 (final token acquisition) fails.""" + mock_credentials = FederatedIdentityCredentials( + client_id="test-app-client-id", + managed_identity_type="user", + managed_identity_client_id="test-mi-client-id", + tenant_id="test-tenant-id", + ) + + manager = TokenManager(credentials=mock_credentials) + + # Mock step 1 to succeed + mi_token = "mi_token_from_step_1" + with patch.object(manager, "_acquire_managed_identity_token", return_value=mi_token): + # Mock step 2 to fail + with patch("microsoft.teams.apps.token_manager.ConfidentialClientApplication") as mock_confidential_app: + mock_app_instance = MagicMock() + mock_app_instance.acquire_token_for_client.return_value = { + "error": "invalid_grant", + "error_description": "FIC Step 2 failed", + } + mock_confidential_app.return_value = mock_app_instance + + with pytest.raises(ValueError) as exc_info: + await manager.get_bot_token() + + assert "invalid_grant" in str(exc_info.value) diff --git a/stubs/msal/__init__.pyi b/stubs/msal/__init__.pyi index b437e81a..de803a7b 100644 --- a/stubs/msal/__init__.pyi +++ b/stubs/msal/__init__.pyi @@ -6,7 +6,12 @@ class ConfidentialClientApplication: """MSAL Confidential Client Application""" def __init__( - self, client_id: str, *, client_credential: str | None = None, authority: str | None = None, **kwargs: Any + self, + client_id: str, + *, + client_credential: str | dict[str, str] | None = None, + authority: str | None = None, + **kwargs: Any, ) -> None: ... def acquire_token_for_client( self, scopes: list[str] | str, claims_challenge: str | None = None, **kwargs: Any