33Licensed under the MIT License.
44"""
55
6+ from typing import Literal , cast
67from unittest .mock import MagicMock , create_autospec , patch
78
89import pytest
9- from microsoft .teams .api import ClientCredentials , JsonWebToken , ManagedIdentityCredentials
10+ from microsoft .teams .api import (
11+ ClientCredentials ,
12+ FederatedIdentityCredentials ,
13+ JsonWebToken ,
14+ ManagedIdentityCredentials ,
15+ )
1016from microsoft .teams .apps .token_manager import TokenManager
1117from msal import ManagedIdentityClient # pyright: ignore[reportMissingTypeStubs]
1218
@@ -128,8 +134,8 @@ async def test_get_token_with_managed_identity(self, get_token_method: str, expe
128134
129135 manager = TokenManager (credentials = mock_credentials )
130136
131- # Patch _get_msal_client to return our mock
132- with patch .object (manager , "_get_msal_client " , return_value = mock_msal_client ):
137+ # Patch _get_managed_identity_client to return our mock
138+ with patch .object (manager , "_get_managed_identity_client " , return_value = mock_msal_client ):
133139 # Call the method dynamically
134140 token = await getattr (manager , get_token_method )()
135141
@@ -155,22 +161,22 @@ async def test_get_graph_token_with_managed_identity_and_tenant(self):
155161
156162 manager = TokenManager (credentials = mock_credentials )
157163
158- # Track calls to _get_msal_client
164+ # Track calls to _get_managed_identity_client
159165 get_msal_client_calls : list [str ] = []
160166
161- def track_get_msal_client ( tenant_id : str ):
167+ def track_get_managed_identity_client ( credentials : ManagedIdentityCredentials , tenant_id : str ):
162168 get_msal_client_calls .append (tenant_id )
163169 return mock_msal_client
164170
165- # Patch _get_msal_client to track calls
166- with patch .object (manager , "_get_msal_client " , side_effect = track_get_msal_client ):
171+ # Patch _get_managed_identity_client to track calls
172+ with patch .object (manager , "_get_managed_identity_client " , side_effect = track_get_managed_identity_client ):
167173 # Request token for different tenant
168174 token = await manager .get_graph_token ("different-tenant-id" )
169175
170176 assert token is not None
171177 assert isinstance (token , JsonWebToken )
172178
173- # Verify _get_msal_client was called with different-tenant-id
179+ # Verify _get_managed_identity_client was called with different-tenant-id
174180 assert "different-tenant-id" in get_msal_client_calls
175181
176182 @pytest .mark .asyncio
@@ -190,10 +196,99 @@ async def test_get_token_error_handling_with_managed_identity(self):
190196
191197 manager = TokenManager (credentials = mock_credentials )
192198
193- # Patch _get_msal_client to return our mock
194- with patch .object (manager , "_get_msal_client " , return_value = mock_msal_client ):
199+ # Patch _get_managed_identity_client to return our mock
200+ with patch .object (manager , "_get_managed_identity_client " , return_value = mock_msal_client ):
195201 # Should raise an error when token acquisition fails
196202 with pytest .raises (ValueError ) as exc_info :
197203 await manager .get_bot_token ()
198204
199205 assert "invalid_client" in str (exc_info .value )
206+
207+ @pytest .mark .asyncio
208+ @pytest .mark .parametrize (
209+ "mi_type,mi_client_id,description" ,
210+ [
211+ ("system" , None , "system-assigned managed identity" ),
212+ ("user" , "test-user-mi-client-id" , "user-assigned managed identity" ),
213+ ],
214+ )
215+ async def test_get_token_with_federated_identity (self , mi_type : str , mi_client_id : str | None , description : str ):
216+ """Test token retrieval using FederatedIdentityCredentials (two-step flow)."""
217+ mock_credentials = FederatedIdentityCredentials (
218+ client_id = "test-app-client-id" ,
219+ managed_identity_type = cast (Literal ["system" , "user" ], mi_type ),
220+ managed_identity_client_id = mi_client_id ,
221+ tenant_id = "test-tenant-id" ,
222+ )
223+
224+ manager = TokenManager (credentials = mock_credentials )
225+
226+ # Mock the managed identity token acquisition (step 1)
227+ mi_token = "mi_token_from_step_1"
228+ with patch .object (manager , "_acquire_managed_identity_token" , return_value = mi_token ):
229+ # Mock ConfidentialClientApplication for step 2
230+ with patch ("microsoft.teams.apps.token_manager.ConfidentialClientApplication" ) as mock_confidential_app :
231+ mock_app_instance = MagicMock ()
232+ mock_app_instance .acquire_token_for_client .return_value = {"access_token" : VALID_TEST_TOKEN }
233+ mock_confidential_app .return_value = mock_app_instance
234+
235+ token = await manager .get_bot_token ()
236+
237+ assert token is not None , f"Failed for: { description } "
238+ assert isinstance (token , JsonWebToken ), f"Failed for: { description } "
239+ assert str (token ) == VALID_TEST_TOKEN , f"Failed for: { description } "
240+
241+ # Verify ConfidentialClientApplication was called with MI token as client_assertion
242+ mock_confidential_app .assert_called_once ()
243+ call_kwargs = mock_confidential_app .call_args [1 ]
244+ assert call_kwargs ["client_credential" ] == {"client_assertion" : mi_token }, f"Failed for: { description } "
245+
246+ @pytest .mark .asyncio
247+ async def test_get_token_with_federated_identity_step1_failure (self ):
248+ """Test error handling when step 1 (MI token acquisition) fails."""
249+ mock_credentials = FederatedIdentityCredentials (
250+ client_id = "test-app-client-id" ,
251+ managed_identity_type = "user" ,
252+ managed_identity_client_id = "test-mi-client-id" ,
253+ tenant_id = "test-tenant-id" ,
254+ )
255+
256+ manager = TokenManager (credentials = mock_credentials )
257+
258+ # Mock step 1 to fail
259+ with patch .object (
260+ manager , "_acquire_managed_identity_token" , side_effect = ValueError ("MI token acquisition failed" )
261+ ):
262+ with pytest .raises (ValueError ) as exc_info :
263+ await manager .get_bot_token ()
264+
265+ assert "MI token acquisition failed" in str (exc_info .value )
266+
267+ @pytest .mark .asyncio
268+ async def test_get_token_with_federated_identity_step2_failure (self ):
269+ """Test error handling when step 2 (final token acquisition) fails."""
270+ mock_credentials = FederatedIdentityCredentials (
271+ client_id = "test-app-client-id" ,
272+ managed_identity_type = "user" ,
273+ managed_identity_client_id = "test-mi-client-id" ,
274+ tenant_id = "test-tenant-id" ,
275+ )
276+
277+ manager = TokenManager (credentials = mock_credentials )
278+
279+ # Mock step 1 to succeed
280+ mi_token = "mi_token_from_step_1"
281+ with patch .object (manager , "_acquire_managed_identity_token" , return_value = mi_token ):
282+ # Mock step 2 to fail
283+ with patch ("microsoft.teams.apps.token_manager.ConfidentialClientApplication" ) as mock_confidential_app :
284+ mock_app_instance = MagicMock ()
285+ mock_app_instance .acquire_token_for_client .return_value = {
286+ "error" : "invalid_grant" ,
287+ "error_description" : "FIC Step 2 failed" ,
288+ }
289+ mock_confidential_app .return_value = mock_app_instance
290+
291+ with pytest .raises (ValueError ) as exc_info :
292+ await manager .get_bot_token ()
293+
294+ assert "invalid_grant" in str (exc_info .value )
0 commit comments