Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openfga_sdk/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def _obtain_token(self, client):
"""
configuration = self._credentials.configuration

token_url = f"https://{configuration.api_issuer}/oauth/token"
token_url = self._credentials._parse_issuer(configuration.api_issuer)

post_params = {
"client_id": configuration.client_id,
Expand Down
220 changes: 220 additions & 0 deletions test/oauth2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,223 @@ async def test_get_authentication_retries_5xx_responses(self, mock_request):
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})

await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_keep_full_url(self, mock_request):
"""
Fully qualified issuer URLs should not get manipulated.
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="https://issuer.fga.example/something",
api_audience="myaudience",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.3",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/something",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
},
)
await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_add_scheme(self, mock_request):
"""
Issuer URLs without scheme should get scheme prefix added.
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example/something",
api_audience="myaudience",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.3",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/something",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
},
)
await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_add_path(self, mock_request):
"""
Issuer URLs without scheme should get scheme prefix added.
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="https://issuer.fga.example",
api_audience="myaudience",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.3",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
},
)
await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_add_scheme_and_path(self, mock_request):
"""
Issuer URLs without scheme should get scheme prefix added.
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.3",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
},
)
await rest_client.close()