Skip to content

Commit f9117a4

Browse files
committed
Modular sasl mechanisms (wip)
1 parent 4ec03e4 commit f9117a4

File tree

7 files changed

+333
-270
lines changed

7 files changed

+333
-270
lines changed

kafka/conn.py

Lines changed: 71 additions & 255 deletions
Large diffs are not rendered by default.

kafka/sasl/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.sasl.gssapi import SaslMechanismGSSAPI
4+
from kafka.sasl.oauth import SaslMechanismOAuth
5+
from kafka.sasl.plain import SaslMechanismPlain
6+
from kafka.sasl.scram import SaslMechanismScram
7+
8+
9+
SASL_MECHANISMS = {}
10+
11+
12+
def register_sasl_mechanism(name, klass, overwrite=False):
13+
if not overwrite and name in SASL_MECHANISMS:
14+
raise ValueError('Sasl mechanism %s already defined!' % name)
15+
SASL_MECHANISMS[name] = klass
16+
17+
18+
def get_sasl_mechanism(name):
19+
return SASL_MECHANISMS[name]
20+
21+
22+
register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI)
23+
register_sasl_mechanism('OAUTHBEARER', SaslMechanismOAuth)
24+
register_sasl_mechanism('PLAIN', SaslMechanismPlain)
25+
register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram)
26+
register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram)

kafka/sasl/abc.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from __future__ import absolute_import
2+
3+
import abc
4+
5+
6+
class SaslMechanism(object):
7+
__metaclass__ = abc.ABCMeta
8+
9+
@abc.abstractmethod
10+
def __init__(self, **config):
11+
pass
12+
13+
@abc.abstractmethod
14+
def auth_bytes(self):
15+
pass
16+
17+
@abc.abstractmethod
18+
def receive(self, auth_bytes):
19+
pass
20+
21+
@abc.abstractmethod
22+
def is_done(self):
23+
pass
24+
25+
@abc.abstractmethod
26+
def is_authenticated(self):
27+
pass

kafka/sasl/gssapi.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from __future__ import absolute_import
2+
3+
# needed for SASL_GSSAPI authentication:
4+
try:
5+
import gssapi
6+
from gssapi.raw.misc import GSSError
7+
except (ImportError, OSError):
8+
#no gssapi available, will disable gssapi mechanism
9+
gssapi = None
10+
GSSError = None
11+
12+
from kafka.sasl.abc import SaslMechanism
13+
14+
15+
class SaslMechanismGSSAPI(SaslMechanism):
16+
# Establish security context and negotiate protection level
17+
# For reference RFC 2222, section 7.2.1
18+
19+
SASL_QOP_AUTH = 1
20+
SASL_QOP_AUTH_INT = 2
21+
SASL_QOP_AUTH_CONF = 4
22+
23+
def __init__(self, **config):
24+
assert gssapi is not None, 'GSSAPI lib not available'
25+
assert config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
26+
self._is_done = False
27+
self._is_authenticated = False
28+
self.kerberos_damin_name = config['sasl_kerberos_domain_name'] or config['host']
29+
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
30+
self.gssapi_name = gssapi.Name(auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
31+
self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate')
32+
self._next_token = self._client_ctx.step(None)
33+
34+
def auth_bytes(self):
35+
# GSSAPI Auth does not have a final broker->client message
36+
# so mark is_done after the final auth_bytes are provided
37+
# in practice we'll still receive a response when using SaslAuthenticate
38+
# but not when using the prior unframed approach.
39+
if self._client_ctx.complete:
40+
self._is_done = True
41+
self._is_authenticated = True
42+
return self._next_token or b''
43+
44+
def receive(self, auth_bytes):
45+
if not self._client_ctx.complete:
46+
# The server will send a token back. Processing of this token either
47+
# establishes a security context, or it needs further token exchange.
48+
# The gssapi will be able to identify the needed next step.
49+
self._next_token = self._client_ctx.step(auth_bytes)
50+
elif self._is_done:
51+
# The final step of gssapi is send, so we do not expect any additional bytes
52+
# however, allow an empty message to support SaslAuthenticate response
53+
if auth_bytes != b'':
54+
raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion")
55+
else:
56+
# unwraps message containing supported protection levels and msg size
57+
msg = client_ctx.unwrap(received_token).message
58+
# Kafka currently doesn't support integrity or confidentiality security layers, so we
59+
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
60+
# by the server
61+
message_parts = [
62+
Int8.encode(self.SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))),
63+
msg[:1],
64+
self.auth_id.encode(),
65+
]
66+
# add authorization identity to the response, and GSS-wrap
67+
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message
68+
69+
def is_done(self):
70+
return self._is_done
71+
72+
def is_authenticated(self):
73+
return self._is_authenticated

kafka/sasl/oauth.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.sasl.abc import SaslMechanism
4+
5+
6+
class SaslMechanismOAuth(SaslMechanism):
7+
8+
def __init__(self, **config):
9+
self.token_provider = config['sasl_oauth_token_provider']
10+
assert self.token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
11+
assert callable(getattr(self.token_provider, 'token', None)), 'sasl_oauth_token_provider must implement method #token()'
12+
self._is_done = False
13+
self._is_authenticated = False
14+
15+
def auth_bytes(self):
16+
token = self.token_provider.token()
17+
extensions = self._token_extensions()
18+
return "n,,\x01auth=Bearer {}{}\x01\x01".format(token, extensions).encode('utf-8')
19+
20+
def receive(self, auth_bytes):
21+
if auth_bytes != b'':
22+
self._is_authenticated = False
23+
self._is_done = True
24+
25+
def is_done(self):
26+
return self._is_done
27+
28+
def is_authenticated(self):
29+
return self._is_authenticated
30+
31+
def _token_extensions(self):
32+
"""
33+
Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER
34+
initial request.
35+
"""
36+
# Only run if the #extensions() method is implemented by the clients Token Provider class
37+
# Builds up a string separated by \x01 via a dict of key value pairs
38+
extensions = getattr(self.token_provider, 'extensions', lambda: [])()
39+
msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()])
40+
return '\x01' + msg if msg else ''

kafka/sasl/plain.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from __future__ import absolute_import
2+
3+
import logging
4+
5+
from kafka.sasl.abc import SaslMechanism
6+
7+
8+
log = logging.getLogger(__name__)
9+
10+
11+
class SaslMechanismPlain(SaslMechanism):
12+
13+
def __init__(self, **config):
14+
if config['security_protocol'] == 'SASL_PLAINTEXT':
15+
log.warning('Sending username and password in the clear')
16+
assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
17+
assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
18+
19+
self.username = config['sasl_plain_username']
20+
self.password = config['sasl_plain_password']
21+
self._is_done = False
22+
self._is_authenticated = False
23+
24+
def auth_bytes(self):
25+
# Send PLAIN credentials per RFC-4616
26+
return bytes('\0'.join([self.username, self.username, self.password]).encode('utf-8'))
27+
28+
def receive(self, auth_bytes):
29+
self._is_done = True
30+
self._is_authenticated = auth_bytes == b''
31+
32+
def is_done(self):
33+
return self._is_done
34+
35+
def is_authenticated(self):
36+
return self._is_authenticated

kafka/scram.py renamed to kafka/sasl/scram.py

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,17 @@
33
import base64
44
import hashlib
55
import hmac
6+
import logging
67
import uuid
78

9+
10+
from kafka.sasl.abc import SaslMechanism
811
from kafka.vendor import six
912

1013

14+
log = logging.getLogger(__name__)
15+
16+
1117
if six.PY2:
1218
def xor_bytes(left, right):
1319
return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right))
@@ -16,17 +22,58 @@ def xor_bytes(left, right):
1622
return bytes(lb ^ rb for lb, rb in zip(left, right))
1723

1824

25+
class SaslMechanismScram(SaslMechanism):
26+
27+
def __init__(self, **config):
28+
assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for SCRAM sasl'
29+
assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for SCRAM sasl'
30+
if config['security_protocol'] == 'SASL_PLAINTEXT':
31+
log.warning('Exchanging credentials in the clear during Sasl Authentication')
32+
33+
self._scram_client = ScramClient(
34+
config['sasl_plain_username'],
35+
config['sasl_plain_password'],
36+
config['sasl_mechanism']
37+
)
38+
self._state = 0
39+
40+
def auth_bytes(self):
41+
if self._state == 0:
42+
return self._scram_client.first_message()
43+
elif self._state == 1:
44+
return self._scram_client.final_message()
45+
else:
46+
raise ValueError('No auth_bytes for state: %s' % self._state)
47+
48+
def receive(self, auth_bytes):
49+
if self._state == 0:
50+
self._scram_client.process_server_first_message(auth_bytes)
51+
elif self._state == 1:
52+
self._scram_client.process_server_final_message(auth_bytes)
53+
else:
54+
raise ValueError('Cannot receive bytes in state: %s' % self._state)
55+
self._state += 1
56+
return self.is_done()
57+
58+
def is_done(self):
59+
return self._state == 2
60+
61+
def is_authenticated(self):
62+
# receive raises if authentication fails...?
63+
return self._state == 2
64+
65+
1966
class ScramClient:
2067
MECHANISMS = {
2168
'SCRAM-SHA-256': hashlib.sha256,
2269
'SCRAM-SHA-512': hashlib.sha512
2370
}
2471

2572
def __init__(self, user, password, mechanism):
26-
self.nonce = str(uuid.uuid4()).replace('-', '')
27-
self.auth_message = ''
73+
self.nonce = str(uuid.uuid4()).replace('-', '').encode('utf-8')
74+
self.auth_message = b''
2875
self.salted_password = None
29-
self.user = user
76+
self.user = user.encode('utf-8')
3077
self.password = password.encode('utf-8')
3178
self.hashfunc = self.MECHANISMS[mechanism]
3279
self.hashname = ''.join(mechanism.lower().split('-')[1:3])
@@ -38,29 +85,29 @@ def __init__(self, user, password, mechanism):
3885
self.server_signature = None
3986

4087
def first_message(self):
41-
client_first_bare = 'n={},r={}'.format(self.user, self.nonce)
88+
client_first_bare = b'n=' + self.user + b',r=' + self.nonce
4289
self.auth_message += client_first_bare
43-
return 'n,,' + client_first_bare
90+
return b'n,,' + client_first_bare
4491

4592
def process_server_first_message(self, server_first_message):
46-
self.auth_message += ',' + server_first_message
47-
params = dict(pair.split('=', 1) for pair in server_first_message.split(','))
48-
server_nonce = params['r']
93+
self.auth_message += b',' + server_first_message
94+
params = dict(pair.split('=', 1) for pair in server_first_message.decode('utf-8').split(','))
95+
server_nonce = params['r'].encode('utf-8')
4996
if not server_nonce.startswith(self.nonce):
5097
raise ValueError("Server nonce, did not start with client nonce!")
5198
self.nonce = server_nonce
52-
self.auth_message += ',c=biws,r=' + self.nonce
99+
self.auth_message += b',c=biws,r=' + self.nonce
53100

54101
salt = base64.b64decode(params['s'].encode('utf-8'))
55102
iterations = int(params['i'])
56103
self.create_salted_password(salt, iterations)
57104

58105
self.client_key = self.hmac(self.salted_password, b'Client Key')
59106
self.stored_key = self.hashfunc(self.client_key).digest()
60-
self.client_signature = self.hmac(self.stored_key, self.auth_message.encode('utf-8'))
107+
self.client_signature = self.hmac(self.stored_key, self.auth_message)
61108
self.client_proof = xor_bytes(self.client_key, self.client_signature)
62109
self.server_key = self.hmac(self.salted_password, b'Server Key')
63-
self.server_signature = self.hmac(self.server_key, self.auth_message.encode('utf-8'))
110+
self.server_signature = self.hmac(self.server_key, self.auth_message)
64111

65112
def hmac(self, key, msg):
66113
return hmac.new(key, msg, digestmod=self.hashfunc).digest()
@@ -71,11 +118,9 @@ def create_salted_password(self, salt, iterations):
71118
)
72119

73120
def final_message(self):
74-
return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode('utf-8'))
121+
return b'c=biws,r=' + self.nonce + b',p=' + base64.b64encode(self.client_proof)
75122

76123
def process_server_final_message(self, server_final_message):
77-
params = dict(pair.split('=', 1) for pair in server_final_message.split(','))
124+
params = dict(pair.split('=', 1) for pair in server_final_message.decode('utf-8').split(','))
78125
if self.server_signature != base64.b64decode(params['v'].encode('utf-8')):
79126
raise ValueError("Server sent wrong signature!")
80-
81-

0 commit comments

Comments
 (0)