Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions oauth2_provider/oauth2_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ def create_token_response(self, request):

return uri, headers, body, status

def create_revocation_response(self, request):
"""
A wrapper method that calls create_revocation_response on a
`server_class` instance.

:param request: The current django.http.HttpRequest object
"""
uri, http_method, body, headers = self._extract_params(request)

headers, body, status = self.server.create_revocation_response(
uri, http_method, body, headers)
uri = headers.get("Location", None)

return uri, headers, body, status

def verify_request(self, request, scopes):
"""
A wrapper method that calls verify_request on `server_class` instance.
Expand Down
25 changes: 25 additions & 0 deletions oauth2_provider/oauth2_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,31 @@ def save_bearer_token(self, token, request, *args, **kwargs):
# TODO check out a more reliable way to communicate expire time to oauthlib
token['expires_in'] = oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS

def revoke_token(self, token, token_type_hint, request, *args, **kwargs):
"""
Revoke an access or refresh token.

:param token: The token string.
:param token_type_hint: access_token or refresh_token.
:param request: The HTTP Request (oauthlib.common.Request)
"""
if token_type_hint not in [None, 'access_token', 'refresh_token']:
token_type_hint = None

if token_type_hint in [None, 'access_token']:
try:
AccessToken.objects.get(token=token).delete()
return
except AccessToken.DoesNotExist:
pass

if token_type_hint in [None, 'refresh_token']:
try:
RefreshToken.objects.get(token=token).delete()
return
except RefreshToken.DoesNotExist:
pass

def validate_user(self, username, password, client, request, *args, **kwargs):
"""
Check username and password correspond to a valid and active User
Expand Down
119 changes: 119 additions & 0 deletions oauth2_provider/tests/test_token_revocation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from __future__ import unicode_literals

import datetime

from django.test import TestCase, RequestFactory
from django.core.urlresolvers import reverse
from django.utils import timezone

from ..compat import urlencode, get_user_model
from ..models import get_application_model, AccessToken, RefreshToken
from ..settings import oauth2_settings

from .test_utils import TestCaseUtils


Application = get_application_model()
UserModel = get_user_model()


class BaseTest(TestCaseUtils, TestCase):
def setUp(self):
self.factory = RequestFactory()
self.test_user = UserModel.objects.create_user("test_user", "[email protected]", "123456")
self.dev_user = UserModel.objects.create_user("dev_user", "[email protected]", "123456")

self.application = Application(
name="Test Application",
redirect_uris="http://localhost http://example.com http://example.it",
user=self.dev_user,
client_type=Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
)
self.application.save()

oauth2_settings._SCOPES = ['read', 'write']

def tearDown(self):
self.application.delete()
self.test_user.delete()
self.dev_user.delete()


class TestRevocationView(BaseTest):
def test_revoke_access_token(self):
"""

"""
tok = AccessToken.objects.create(user=self.test_user, token='1234567890',
application=self.application,
expires=timezone.now()+datetime.timedelta(days=1),
scope='read write')
query_string = urlencode({
'client_id': self.application.client_id,
'client_secret': self.application.client_secret,
'token': tok.token,
})
url = "{url}?{qs}".format(url=reverse('oauth2_provider:revoke-token'), qs=query_string)
response = self.client.post(url)
self.assertEqual(response.status_code, 200)
self.assertFalse(AccessToken.objects.filter(id=tok.id).exists())

def test_revoke_access_token_with_hint(self):
"""

"""
tok = AccessToken.objects.create(user=self.test_user, token='1234567890',
application=self.application,
expires=timezone.now()+datetime.timedelta(days=1),
scope='read write')
query_string = urlencode({
'client_id': self.application.client_id,
'client_secret': self.application.client_secret,
'token': tok.token,
'token_type_hint': 'access_token'
})
url = "{url}?{qs}".format(url=reverse('oauth2_provider:revoke-token'), qs=query_string)
response = self.client.post(url)
self.assertEqual(response.status_code, 200)
self.assertFalse(AccessToken.objects.filter(id=tok.id).exists())

def test_revoke_access_token_with_invalid_hint(self):
"""

"""
tok = AccessToken.objects.create(user=self.test_user, token='1234567890',
application=self.application,
expires=timezone.now()+datetime.timedelta(days=1),
scope='read write')
# invalid hint should have no effect
query_string = urlencode({
'client_id': self.application.client_id,
'client_secret': self.application.client_secret,
'token': tok.token,
'token_type_hint': 'bad_hint'
})
url = "{url}?{qs}".format(url=reverse('oauth2_provider:revoke-token'), qs=query_string)
response = self.client.post(url)
self.assertEqual(response.status_code, 200)
self.assertFalse(AccessToken.objects.filter(id=tok.id).exists())

def test_revoke_refresh_token(self):
"""

"""
tok = AccessToken.objects.create(user=self.test_user, token='1234567890',
application=self.application,
expires=timezone.now()+datetime.timedelta(days=1),
scope='read write')
rtok = RefreshToken.objects.create(user=self.test_user, token='999999999',
application=self.application, access_token=tok)
query_string = urlencode({
'client_id': self.application.client_id,
'client_secret': self.application.client_secret,
'token': rtok.token,
})
url = "{url}?{qs}".format(url=reverse('oauth2_provider:revoke-token'), qs=query_string)
response = self.client.post(url)
self.assertEqual(response.status_code, 200)
self.assertFalse(RefreshToken.objects.filter(id=rtok.id).exists())
1 change: 1 addition & 0 deletions oauth2_provider/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
'',
url(r'^authorize/$', views.AuthorizationView.as_view(), name="authorize"),
url(r'^token/$', views.TokenView.as_view(), name="token"),
url(r'^revoke_token/$', views.RevokeTokenView.as_view(), name="revoke-token"),
)

# Application management views
Expand Down
2 changes: 1 addition & 1 deletion oauth2_provider/views/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import AuthorizationView, TokenView
from .base import AuthorizationView, TokenView, RevokeTokenView
from .application import ApplicationRegistration, ApplicationDetail, ApplicationList, \
ApplicationDelete, ApplicationUpdate
from .generic import ProtectedResourceView, ScopedProtectedResourceView, ReadWriteScopedResourceView
16 changes: 16 additions & 0 deletions oauth2_provider/views/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,19 @@ def post(self, request, *args, **kwargs):
for k, v in headers.items():
response[k] = v
return response


class RevokeTokenView(CsrfExemptMixin, OAuthLibMixin, View):
"""
Implements an endpoint to revoke access or refresh tokens
"""
server_class = Server
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS

def post(self, request, *args, **kwargs):
url, headers, body, status = self.create_revocation_response(request)
response = HttpResponse(content=body, status=status)

for k, v in headers.items():
response[k] = v
return response
10 changes: 10 additions & 0 deletions oauth2_provider/views/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ def create_token_response(self, request):
core = self.get_oauthlib_core()
return core.create_token_response(request)

def create_revocation_response(self, request):
"""
A wrapper method that calls create_revocation_response on the
`server_class` instance.

:param request: The current django.http.HttpRequest object
"""
core = self.get_oauthlib_core()
return core.create_revocation_response(request)

def verify_request(self, request):
"""
A wrapper method that calls verify_request on `server_class` instance.
Expand Down