From 761754035c029b0c077bea153dae0e24a16f1877 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20T=C3=A9tard?= Date: Wed, 2 Jan 2019 12:00:50 +0100 Subject: [PATCH] Implement the ability to define a leeway when validating JWT tokens. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PyJWT library implement a way to define a “leeway” for time validations. > PyJWT also supports the leeway part of the expiration time > definition, which means you can validate a expiration time which is > in the past but not very far. For example, if you have a JWT payload > with a expiration time set to 30 seconds after creation but you know > that sometimes you will process it after 30 seconds, you can set a > leeway of 10 seconds in order to have some margin: > > https://github.com/jpadilla/pyjwt/blob/master/docs/usage.rst This is implemented as an optional configuration setting, `JWT_DECODE_LEEWAY`. --- docs/options.rst | 5 +++++ flask_jwt_extended/config.py | 4 ++++ flask_jwt_extended/jwt_manager.py | 1 + flask_jwt_extended/tokens.py | 8 ++++++-- flask_jwt_extended/utils.py | 8 +++++--- tests/test_decode_tokens.py | 32 ++++++++++++++++++++++++++++++- 6 files changed, 52 insertions(+), 6 deletions(-) diff --git a/docs/options.rst b/docs/options.rst index 4b9e66ab..9381ad31 100644 --- a/docs/options.rst +++ b/docs/options.rst @@ -47,6 +47,11 @@ General Options: ``JWT_DECODE_AUDIENCE`` The audience you expect in a JWT when decoding it. If this option differs from the 'aud' claim in a JWT, the ``'invalid_token_callback'`` is invoked. Defaults to ``'None'``. +``JWT_DECODE_LEEWAY`` Define the leeway part of the expiration time definition, which + means you can validate an expiration time which is in the past but + not very far. This leeway is used for `nbf` (“not before”) and `exp` + (“expiration time”). + Defaults to ``0`` ================================= ========================================= diff --git a/flask_jwt_extended/config.py b/flask_jwt_extended/config.py index 2d50c98f..2fbdc78f 100644 --- a/flask_jwt_extended/config.py +++ b/flask_jwt_extended/config.py @@ -292,5 +292,9 @@ def json_encoder(self): def audience(self): return current_app.config['JWT_DECODE_AUDIENCE'] + @property + def leeway(self): + return current_app.config['JWT_DECODE_LEEWAY'] + config = _Config() diff --git a/flask_jwt_extended/jwt_manager.py b/flask_jwt_extended/jwt_manager.py index 496a792d..8cd743cd 100644 --- a/flask_jwt_extended/jwt_manager.py +++ b/flask_jwt_extended/jwt_manager.py @@ -197,6 +197,7 @@ def _set_default_configuration_options(app): app.config.setdefault('JWT_IDENTITY_CLAIM', 'identity') app.config.setdefault('JWT_USER_CLAIMS', 'user_claims') app.config.setdefault('JWT_DECODE_AUDIENCE', None) + app.config.setdefault('JWT_DECODE_LEEWAY', 0) app.config.setdefault('JWT_CLAIMS_IN_REFRESH_TOKEN', False) diff --git a/flask_jwt_extended/tokens.py b/flask_jwt_extended/tokens.py index f990407f..563a4d86 100644 --- a/flask_jwt_extended/tokens.py +++ b/flask_jwt_extended/tokens.py @@ -113,7 +113,8 @@ def encode_refresh_token(identity, secret, algorithm, expires_delta, user_claims def decode_jwt(encoded_token, secret, algorithm, identity_claim_key, - user_claims_key, csrf_value=None, audience=None): + user_claims_key, csrf_value=None, audience=None, + leeway=0): """ Decodes an encoded JWT @@ -124,10 +125,13 @@ def decode_jwt(encoded_token, secret, algorithm, identity_claim_key, :param user_claims_key: expected key that contains the user claims :param csrf_value: Expected double submit csrf value :param audience: expected audience in the JWT + :param leeway: optional leeway to add some margin around expiration times :return: Dictionary containing contents of the JWT """ + # This call verifies the ext, iat, nbf, and aud claims - data = jwt.decode(encoded_token, secret, algorithms=[algorithm], audience=audience) + data = jwt.decode(encoded_token, secret, algorithms=[algorithm], audience=audience, + leeway=leeway) # Make sure that any custom claims we expect in the token are present if 'jti' not in data: diff --git a/flask_jwt_extended/utils.py b/flask_jwt_extended/utils.py index e3f114ee..8e27c348 100644 --- a/flask_jwt_extended/utils.py +++ b/flask_jwt_extended/utils.py @@ -84,8 +84,9 @@ def decode_token(encoded_token, csrf_value=None): secret = jwt_manager._decode_key_callback(unverified_claims, unverified_headers) except TypeError: msg = ( - "The single-argument (unverified_claims) form of decode_key_callback is deprecated. " - "Update your code to use the two-argument form (unverified_claims, unverified_headers)." + "The single-argument (unverified_claims) form of decode_key_callback ", + "is deprecated. Update your code to use the two-argument form ", + "(unverified_claims, unverified_headers)." ) warn(msg, DeprecationWarning) secret = jwt_manager._decode_key_callback(unverified_claims) @@ -96,7 +97,8 @@ def decode_token(encoded_token, csrf_value=None): identity_claim_key=config.identity_claim_key, user_claims_key=config.user_claims_key, csrf_value=csrf_value, - audience=config.audience + audience=config.audience, + leeway=config.leeway ) diff --git a/tests/test_decode_tokens.py b/tests/test_decode_tokens.py index adf37e0b..bae0b983 100644 --- a/tests/test_decode_tokens.py +++ b/tests/test_decode_tokens.py @@ -4,7 +4,11 @@ import warnings from flask import Flask -from jwt import ExpiredSignatureError, InvalidSignatureError, InvalidAudienceError + +from jwt import ( + ExpiredSignatureError, InvalidSignatureError, InvalidAudienceError, + ImmatureSignatureError +) from flask_jwt_extended import ( JWTManager, create_access_token, decode_token, create_refresh_token, @@ -37,6 +41,20 @@ def default_access_token(app): } +@pytest.fixture(scope='function') +def patch_datetime_now(monkeypatch): + + DATE_IN_FUTURE = datetime.utcnow() + timedelta(seconds=30) + + class mydatetime(datetime): + @classmethod + def utcnow(cls): + return DATE_IN_FUTURE + + monkeypatch.setattr(__name__ + ".datetime", mydatetime) + monkeypatch.setattr("datetime.datetime", mydatetime) + + @pytest.mark.parametrize("user_loader_return", [{}, None]) def test_no_user_claims(app, user_loader_return): jwtM = get_jwt_manager(app) @@ -107,6 +125,18 @@ def test_never_expire_token(app): assert 'exp' not in decoded +def test_nbf_token_in_future(app, patch_datetime_now): + with pytest.raises(ImmatureSignatureError): + with app.test_request_context(): + access_token = create_access_token('username') + decode_token(access_token) + + with app.test_request_context(): + app.config['JWT_DECODE_LEEWAY'] = 30 + access_token = create_access_token('username') + decode_token(access_token) + + def test_alternate_identity_claim(app, default_access_token): app.config['JWT_IDENTITY_CLAIM'] = 'sub'