Skip to content
Merged
18 changes: 18 additions & 0 deletions oauth2_provider/templates/oauth2_provider/authorized-oob.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{% extends "oauth2_provider/base.html" %}

{% load i18n %}
{% block content %}
<div class="block-center">
{% if not error %}
<h2>{% trans "Success" %}</h2>

<p>{% trans "Please return to your application and enter this code:" %}</p>

<p><code>{{ code }}</code></p>

{% else %}
<h2>Error: {{ error.error }}</h2>
<p>{{ error.description }}</p>
{% endif %}
</div>
{% endblock %}
46 changes: 43 additions & 3 deletions oauth2_provider/views/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
import logging
import urllib.parse

from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import HttpResponse
from django.http import HttpResponse, JsonResponse
from django.utils import timezone
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.debug import sensitive_post_parameters
from django.views.generic import FormView, View
from django.shortcuts import render
from django.urls import reverse

from ..exceptions import OAuthToolkitError
from ..forms import AllowForm
Expand All @@ -18,7 +21,6 @@
from ..signals import app_authorized
from .mixins import OAuthLibMixin


log = logging.getLogger("oauth2_provider")


Expand Down Expand Up @@ -59,6 +61,7 @@ def redirect(self, redirect_to, application):
allowed_schemes = application.get_allowed_schemes()
return OAuth2ResponseRedirect(redirect_to, allowed_schemes)

RFC3339 = '%Y-%m-%dT%H:%M:%SZ'

class AuthorizationView(BaseAuthorizationView, FormView):
"""
Expand Down Expand Up @@ -204,13 +207,50 @@ def get(self, request, *args, **kwargs):
request=self.request, scopes=" ".join(scopes),
credentials=credentials, allow=True
)
return self.redirect(uri, application)
return self.redirect(uri, application, token)

except OAuthToolkitError as error:
return self.error_response(error, application)

return self.render_to_response(self.get_context_data(**kwargs))

def redirect(self, redirect_to, application,
token = None):

if not redirect_to.startswith("urn:ietf:wg:oauth:2.0:oob"):
return super().redirect(redirect_to, application)

parsed_redirect = urllib.parse.urlparse(redirect_to)
code = urllib.parse.parse_qs(parsed_redirect.query)['code'][0]

if redirect_to.startswith('urn:ietf:wg:oauth:2.0:oob:auto'):

response = {
'access_token': code,
'token_uri': redirect_to,
'refresh_token': None,
'token_expiry': None,
'token_uri': reverse('oauth2_provider:token'),
'user_agent': '',
'client_id': application.client_id,
'client_secret': application.client_secret,
'revoke_uri': reverse('oauth2_provider:revoke-token'),
}

if token is not None:
response['refresh_token'] = token.token
response['token_expiry'] = token.expires.format(RFC3339)

return JsonResponse(response)

else:
return render(
request=self.request,
template_name="oauth2_provider/authorized-oob.html",
context={
'code': code,
},
)

@method_decorator(csrf_exempt, name="dispatch")
class TokenView(OAuthLibMixin, View):
Expand Down
109 changes: 109 additions & 0 deletions tests/test_oob_authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import base64
import datetime
import hashlib
import json
import re
from urllib.parse import parse_qs, urlencode, urlparse

from django.contrib.auth import get_user_model
from django.test import RequestFactory, TestCase
from django.urls import reverse
from django.utils import timezone
from django.utils.crypto import get_random_string
from oauthlib.oauth2.rfc6749 import errors as oauthlib_errors

from oauth2_provider.models import (
get_access_token_model, get_application_model,
get_grant_model, get_refresh_token_model
)
from oauth2_provider.settings import oauth2_settings

from .utils import get_basic_auth_header

Application = get_application_model()
AccessToken = get_access_token_model()
Grant = get_grant_model()
RefreshToken = get_refresh_token_model()
UserModel = get_user_model()

class BaseTest(TestCase):
def setUp(self):
self.factory = RequestFactory()
self.test_user = UserModel.objects.create_user("test_user", "[email protected]", "123456")
self.dev_user = UserModel.objects.create_user("dev_user", "[email protected]", "123456")

oauth2_settings.ALLOWED_REDIRECT_URI_SCHEMES = ["http", "custom-scheme", "urn"]

self.application = Application.objects.create(
name="Test Application",
redirect_uris="urn:ietf:wg:oauth:2.0:oob urn:ietf:wg:oauth:2.0:oob:auto",
user=self.dev_user,
client_type=Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
)

oauth2_settings._SCOPES = ["read", "write"]
oauth2_settings._DEFAULT_SCOPES = ["read", "write"]

def tearDown(self):
self.application.delete()
self.test_user.delete()
self.dev_user.delete()

class TestOobAuthorizationCodeView(BaseTest):
def test_oob_as_html(self):
"""
...
"""
self.client.login(username="test_user", password="123456")
self.application.skip_authorization = True
self.application.save()

query_string = urlencode({
"client_id": self.application.client_id,
"response_type": "code",
"state": "random_state_string",
"scope": "read write",
"redirect_uri": "urn:ietf:wg:oauth:2.0:oob",
})
url = "{url}?{qs}".format(url=reverse("oauth2_provider:authorize"), qs=query_string)

response = self.client.get(url)
self.assertEqual(response.status_code, 200)

content = str(response.content, encoding='UTF-8')

code_matches = re.search(r'<code>([^<]*)</code>', content)
code = code_matches.groups(0)
self.assertNotEqual(code, '')

def test_oob_as_json(self):
"""
...
"""
self.client.login(username="test_user", password="123456")
self.application.skip_authorization = True
self.application.save()

query_string = urlencode({
"client_id": self.application.client_id,
"response_type": "code",
"state": "random_state_string",
"scope": "read write",
"redirect_uri": "urn:ietf:wg:oauth:2.0:oob:auto",
})
url = "{url}?{qs}".format(url=reverse("oauth2_provider:authorize"), qs=query_string)

response = self.client.get(url)
self.assertEqual(response.status_code, 200)

content = json.loads(str(response.content, encoding='UTF-8'))

for field in [
'access_token', 'token_uri',
'refresh_token', 'token_expiry',
'token_uri', 'user_agent',
'client_id', 'client_secret',
'revoke_uri',
]:
self.assertIn(field, content)