1
1
from __future__ import absolute_import
2
2
3
+ import abc
4
+
3
5
from kafka .sasl .abc import SaslMechanism
4
6
5
7
6
8
class SaslMechanismOAuth (SaslMechanism ):
7
9
8
10
def __init__ (self , ** config ):
9
11
assert 'sasl_oauth_token_provider' in config , 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
12
+ assert isinstance (config ['sasl_oauth_token_provider' ], AbstractTokenProvider ), \
13
+ 'sasl_oauth_token_provider must implement kafka.sasl.oauth.AbstractTokenProvider'
10
14
self .token_provider = config ['sasl_oauth_token_provider' ]
11
- assert callable (getattr (self .token_provider , 'token' , None )), 'sasl_oauth_token_provider must implement method #token()'
12
15
self ._is_done = False
13
16
self ._is_authenticated = False
14
17
@@ -32,13 +35,53 @@ def _token_extensions(self):
32
35
Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER
33
36
initial request.
34
37
"""
35
- # Only run if the #extensions() method is implemented by the clients Token Provider class
36
38
# Builds up a string separated by \x01 via a dict of key value pairs
37
- extensions = getattr ( self .token_provider , ' extensions' , lambda : []) ()
39
+ extensions = self .token_provider . extensions ()
38
40
msg = '\x01 ' .join (['{}={}' .format (k , v ) for k , v in extensions .items ()])
39
41
return '\x01 ' + msg if msg else ''
40
42
41
43
def auth_details (self ):
42
44
if not self .is_authenticated :
43
45
raise RuntimeError ('Not authenticated yet!' )
44
46
return 'Authenticated via SASL / OAuth'
47
+
48
+ # This statement is compatible with both Python 2.7 & 3+
49
+ ABC = abc .ABCMeta ('ABC' , (object ,), {'__slots__' : ()})
50
+
51
+ class AbstractTokenProvider (ABC ):
52
+ """
53
+ A Token Provider must be used for the SASL OAuthBearer protocol.
54
+
55
+ The implementation should ensure token reuse so that multiple
56
+ calls at connect time do not create multiple tokens. The implementation
57
+ should also periodically refresh the token in order to guarantee
58
+ that each call returns an unexpired token. A timeout error should
59
+ be returned after a short period of inactivity so that the
60
+ broker can log debugging info and retry.
61
+
62
+ Token Providers MUST implement the token() method
63
+ """
64
+
65
+ def __init__ (self , ** config ):
66
+ pass
67
+
68
+ @abc .abstractmethod
69
+ def token (self ):
70
+ """
71
+ Returns a (str) ID/Access Token to be sent to the Kafka
72
+ client.
73
+ """
74
+ pass
75
+
76
+ def extensions (self ):
77
+ """
78
+ This is an OPTIONAL method that may be implemented.
79
+
80
+ Returns a map of key-value pairs that can
81
+ be sent with the SASL/OAUTHBEARER initial client request. If
82
+ not implemented, the values are ignored. This feature is only available
83
+ in Kafka >= 2.1.0.
84
+
85
+ All returned keys and values should be type str
86
+ """
87
+ return {}
0 commit comments