diff --git a/docs/options.rst b/docs/options.rst index 4b9e66ab..9381ad31 100644 --- a/docs/options.rst +++ b/docs/options.rst @@ -47,6 +47,11 @@ General Options: ``JWT_DECODE_AUDIENCE`` The audience you expect in a JWT when decoding it. If this option differs from the 'aud' claim in a JWT, the ``'invalid_token_callback'`` is invoked. Defaults to ``'None'``. +``JWT_DECODE_LEEWAY`` Define the leeway part of the expiration time definition, which + means you can validate an expiration time which is in the past but + not very far. This leeway is used for `nbf` (“not before”) and `exp` + (“expiration time”). + Defaults to ``0`` ================================= ========================================= diff --git a/flask_jwt_extended/config.py b/flask_jwt_extended/config.py index 2d50c98f..2fbdc78f 100644 --- a/flask_jwt_extended/config.py +++ b/flask_jwt_extended/config.py @@ -292,5 +292,9 @@ def json_encoder(self): def audience(self): return current_app.config['JWT_DECODE_AUDIENCE'] + @property + def leeway(self): + return current_app.config['JWT_DECODE_LEEWAY'] + config = _Config() diff --git a/flask_jwt_extended/jwt_manager.py b/flask_jwt_extended/jwt_manager.py index 496a792d..8cd743cd 100644 --- a/flask_jwt_extended/jwt_manager.py +++ b/flask_jwt_extended/jwt_manager.py @@ -197,6 +197,7 @@ def _set_default_configuration_options(app): app.config.setdefault('JWT_IDENTITY_CLAIM', 'identity') app.config.setdefault('JWT_USER_CLAIMS', 'user_claims') app.config.setdefault('JWT_DECODE_AUDIENCE', None) + app.config.setdefault('JWT_DECODE_LEEWAY', 0) app.config.setdefault('JWT_CLAIMS_IN_REFRESH_TOKEN', False) diff --git a/flask_jwt_extended/tokens.py b/flask_jwt_extended/tokens.py index f990407f..563a4d86 100644 --- a/flask_jwt_extended/tokens.py +++ b/flask_jwt_extended/tokens.py @@ -113,7 +113,8 @@ def encode_refresh_token(identity, secret, algorithm, expires_delta, user_claims def decode_jwt(encoded_token, secret, algorithm, identity_claim_key, - user_claims_key, csrf_value=None, audience=None): + user_claims_key, csrf_value=None, audience=None, + leeway=0): """ Decodes an encoded JWT @@ -124,10 +125,13 @@ def decode_jwt(encoded_token, secret, algorithm, identity_claim_key, :param user_claims_key: expected key that contains the user claims :param csrf_value: Expected double submit csrf value :param audience: expected audience in the JWT + :param leeway: optional leeway to add some margin around expiration times :return: Dictionary containing contents of the JWT """ + # This call verifies the ext, iat, nbf, and aud claims - data = jwt.decode(encoded_token, secret, algorithms=[algorithm], audience=audience) + data = jwt.decode(encoded_token, secret, algorithms=[algorithm], audience=audience, + leeway=leeway) # Make sure that any custom claims we expect in the token are present if 'jti' not in data: diff --git a/flask_jwt_extended/utils.py b/flask_jwt_extended/utils.py index e3f114ee..8e27c348 100644 --- a/flask_jwt_extended/utils.py +++ b/flask_jwt_extended/utils.py @@ -84,8 +84,9 @@ def decode_token(encoded_token, csrf_value=None): secret = jwt_manager._decode_key_callback(unverified_claims, unverified_headers) except TypeError: msg = ( - "The single-argument (unverified_claims) form of decode_key_callback is deprecated. " - "Update your code to use the two-argument form (unverified_claims, unverified_headers)." + "The single-argument (unverified_claims) form of decode_key_callback ", + "is deprecated. Update your code to use the two-argument form ", + "(unverified_claims, unverified_headers)." ) warn(msg, DeprecationWarning) secret = jwt_manager._decode_key_callback(unverified_claims) @@ -96,7 +97,8 @@ def decode_token(encoded_token, csrf_value=None): identity_claim_key=config.identity_claim_key, user_claims_key=config.user_claims_key, csrf_value=csrf_value, - audience=config.audience + audience=config.audience, + leeway=config.leeway ) diff --git a/tests/test_decode_tokens.py b/tests/test_decode_tokens.py index adf37e0b..bae0b983 100644 --- a/tests/test_decode_tokens.py +++ b/tests/test_decode_tokens.py @@ -4,7 +4,11 @@ import warnings from flask import Flask -from jwt import ExpiredSignatureError, InvalidSignatureError, InvalidAudienceError + +from jwt import ( + ExpiredSignatureError, InvalidSignatureError, InvalidAudienceError, + ImmatureSignatureError +) from flask_jwt_extended import ( JWTManager, create_access_token, decode_token, create_refresh_token, @@ -37,6 +41,20 @@ def default_access_token(app): } +@pytest.fixture(scope='function') +def patch_datetime_now(monkeypatch): + + DATE_IN_FUTURE = datetime.utcnow() + timedelta(seconds=30) + + class mydatetime(datetime): + @classmethod + def utcnow(cls): + return DATE_IN_FUTURE + + monkeypatch.setattr(__name__ + ".datetime", mydatetime) + monkeypatch.setattr("datetime.datetime", mydatetime) + + @pytest.mark.parametrize("user_loader_return", [{}, None]) def test_no_user_claims(app, user_loader_return): jwtM = get_jwt_manager(app) @@ -107,6 +125,18 @@ def test_never_expire_token(app): assert 'exp' not in decoded +def test_nbf_token_in_future(app, patch_datetime_now): + with pytest.raises(ImmatureSignatureError): + with app.test_request_context(): + access_token = create_access_token('username') + decode_token(access_token) + + with app.test_request_context(): + app.config['JWT_DECODE_LEEWAY'] = 30 + access_token = create_access_token('username') + decode_token(access_token) + + def test_alternate_identity_claim(app, default_access_token): app.config['JWT_IDENTITY_CLAIM'] = 'sub'