From 0473adb26411a721b7400adc392326395658ec01 Mon Sep 17 00:00:00 2001 From: Manuel Lang Date: Wed, 2 Apr 2025 21:28:03 +0200 Subject: [PATCH] feat(python): dynamic token issuer --- openfga_sdk/oauth2.py | 2 +- test/oauth2_test.py | 220 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+), 1 deletion(-) diff --git a/openfga_sdk/oauth2.py b/openfga_sdk/oauth2.py index 6a7bff8..5b70374 100644 --- a/openfga_sdk/oauth2.py +++ b/openfga_sdk/oauth2.py @@ -70,7 +70,7 @@ async def _obtain_token(self, client): """ configuration = self._credentials.configuration - token_url = f"https://{configuration.api_issuer}/oauth/token" + token_url = self._credentials._parse_issuer(configuration.api_issuer) post_params = { "client_id": configuration.client_id, diff --git a/test/oauth2_test.py b/test/oauth2_test.py index 535b964..35b5267 100644 --- a/test/oauth2_test.py +++ b/test/oauth2_test.py @@ -274,3 +274,223 @@ async def test_get_authentication_retries_5xx_responses(self, mock_request): self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"}) await rest_client.close() + + @patch.object(rest.RESTClientObject, "request") + async def test_get_authentication_keep_full_url(self, mock_request): + """ + Fully qualified issuer URLs should not get manipulated. + """ + response_body = """ +{ + "expires_in": 120, + "access_token": "AABBCCDD" +} + """ + mock_request.return_value = mock_response(response_body, 200) + + credentials = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="https://issuer.fga.example/something", + api_audience="myaudience", + ), + ) + rest_client = rest.RESTClientObject(Configuration()) + current_time = datetime.now() + client = OAuth2Client(credentials) + auth_header = await client.get_authentication_header(rest_client) + self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"}) + self.assertEqual(client._access_token, "AABBCCDD") + self.assertGreaterEqual( + client._access_expiry_time, current_time + timedelta(seconds=120) + ) + expected_header = urllib3.response.HTTPHeaderDict( + { + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "openfga-sdk (python) 0.9.3", + } + ) + mock_request.assert_called_once_with( + method="POST", + url="https://issuer.fga.example/something", + headers=expected_header, + query_params=None, + body=None, + _preload_content=True, + _request_timeout=None, + post_params={ + "client_id": "myclientid", + "client_secret": "mysecret", + "audience": "myaudience", + "grant_type": "client_credentials", + }, + ) + await rest_client.close() + + @patch.object(rest.RESTClientObject, "request") + async def test_get_authentication_add_scheme(self, mock_request): + """ + Issuer URLs without scheme should get scheme prefix added. + """ + response_body = """ +{ + "expires_in": 120, + "access_token": "AABBCCDD" +} + """ + mock_request.return_value = mock_response(response_body, 200) + + credentials = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="issuer.fga.example/something", + api_audience="myaudience", + ), + ) + rest_client = rest.RESTClientObject(Configuration()) + current_time = datetime.now() + client = OAuth2Client(credentials) + auth_header = await client.get_authentication_header(rest_client) + self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"}) + self.assertEqual(client._access_token, "AABBCCDD") + self.assertGreaterEqual( + client._access_expiry_time, current_time + timedelta(seconds=120) + ) + expected_header = urllib3.response.HTTPHeaderDict( + { + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "openfga-sdk (python) 0.9.3", + } + ) + mock_request.assert_called_once_with( + method="POST", + url="https://issuer.fga.example/something", + headers=expected_header, + query_params=None, + body=None, + _preload_content=True, + _request_timeout=None, + post_params={ + "client_id": "myclientid", + "client_secret": "mysecret", + "audience": "myaudience", + "grant_type": "client_credentials", + }, + ) + await rest_client.close() + + @patch.object(rest.RESTClientObject, "request") + async def test_get_authentication_add_path(self, mock_request): + """ + Issuer URLs without scheme should get scheme prefix added. + """ + response_body = """ +{ + "expires_in": 120, + "access_token": "AABBCCDD" +} + """ + mock_request.return_value = mock_response(response_body, 200) + + credentials = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="https://issuer.fga.example", + api_audience="myaudience", + ), + ) + rest_client = rest.RESTClientObject(Configuration()) + current_time = datetime.now() + client = OAuth2Client(credentials) + auth_header = await client.get_authentication_header(rest_client) + self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"}) + self.assertEqual(client._access_token, "AABBCCDD") + self.assertGreaterEqual( + client._access_expiry_time, current_time + timedelta(seconds=120) + ) + expected_header = urllib3.response.HTTPHeaderDict( + { + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "openfga-sdk (python) 0.9.3", + } + ) + mock_request.assert_called_once_with( + method="POST", + url="https://issuer.fga.example/oauth/token", + headers=expected_header, + query_params=None, + body=None, + _preload_content=True, + _request_timeout=None, + post_params={ + "client_id": "myclientid", + "client_secret": "mysecret", + "audience": "myaudience", + "grant_type": "client_credentials", + }, + ) + await rest_client.close() + + @patch.object(rest.RESTClientObject, "request") + async def test_get_authentication_add_scheme_and_path(self, mock_request): + """ + Issuer URLs without scheme should get scheme prefix added. + """ + response_body = """ +{ + "expires_in": 120, + "access_token": "AABBCCDD" +} + """ + mock_request.return_value = mock_response(response_body, 200) + + credentials = Credentials( + method="client_credentials", + configuration=CredentialConfiguration( + client_id="myclientid", + client_secret="mysecret", + api_issuer="issuer.fga.example", + api_audience="myaudience", + ), + ) + rest_client = rest.RESTClientObject(Configuration()) + current_time = datetime.now() + client = OAuth2Client(credentials) + auth_header = await client.get_authentication_header(rest_client) + self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"}) + self.assertEqual(client._access_token, "AABBCCDD") + self.assertGreaterEqual( + client._access_expiry_time, current_time + timedelta(seconds=120) + ) + expected_header = urllib3.response.HTTPHeaderDict( + { + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "openfga-sdk (python) 0.9.3", + } + ) + mock_request.assert_called_once_with( + method="POST", + url="https://issuer.fga.example/oauth/token", + headers=expected_header, + query_params=None, + body=None, + _preload_content=True, + _request_timeout=None, + post_params={ + "client_id": "myclientid", + "client_secret": "mysecret", + "audience": "myaudience", + "grant_type": "client_credentials", + }, + ) + await rest_client.close()