Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 54 additions & 44 deletions nkeys/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
# limitations under the License.
#

from __future__ import annotations

import base64
import binascii
from typing import Optional, Tuple, Union

import nacl.signing
import nacl.exceptions
import nacl.public
Expand Down Expand Up @@ -42,14 +46,14 @@
PREFIX_BYTE_USER = 20 << 3 # Base32-encodes to 'U...'


def from_seed(seed):
def from_seed(seed: Union[bytes, bytearray]) -> KeyPair:
_, raw_seed = decode_seed(seed)
keys = nacl.signing.SigningKey(raw_seed)
del raw_seed
return KeyPair(keys=keys, seed=seed)


def decode_seed(src):
def decode_seed(src: Union[bytes, bytearray]) -> Tuple[int, bytes]:
# Add missing padding if required.
padding = bytearray()
padding += b'=' * (-len(src) % 8)
Expand Down Expand Up @@ -79,7 +83,7 @@ def decode_seed(src):
return (prefix, result)


def encode_seed(src, prefix):
def encode_seed(src: bytes, prefix: int) -> bytes:
"""
:param src: A bytestring of length 32, used as the seed
for an nkey.
Expand Down Expand Up @@ -122,7 +126,7 @@ def encode_seed(src, prefix):
return base64.b32encode(final_bytes).rstrip(b'=')


def valid_public_prefix_byte(prefix):
def valid_public_prefix_byte(prefix: int) -> bool:
if prefix == PREFIX_BYTE_OPERATOR \
or prefix == PREFIX_BYTE_SERVER \
or prefix == PREFIX_BYTE_CLUSTER \
Expand All @@ -133,7 +137,7 @@ def valid_public_prefix_byte(prefix):
return False


def valid_prefix_byte(prefix):
def valid_prefix_byte(prefix: int) -> bool:
if prefix == PREFIX_BYTE_OPERATOR \
or prefix == PREFIX_BYTE_SERVER \
or prefix == PREFIX_BYTE_CLUSTER \
Expand All @@ -146,7 +150,7 @@ def valid_prefix_byte(prefix):
return False


def verifying_nkey_to_ed25519(public_nkey):
def verifying_nkey_to_ed25519(public_nkey: Union[bytes, bytearray]) -> nacl.signing.VerifyKey:
"""
:param public_nkey: The public nkey as a bytearray.
:rtype nacl.signing.VerifyKey:
Expand Down Expand Up @@ -176,7 +180,7 @@ def verifying_nkey_to_ed25519(public_nkey):
return nacl.signing.VerifyKey(bare_key)


def verify(public_nkey, input, sig):
def verify(public_nkey: Union[bytes, bytearray], input: bytes, sig: bytes) -> bool:
"""
:param public_nkey: The public nkey as a bytearray.
:param input: The payload in bytes that was signed.
Expand All @@ -193,31 +197,29 @@ def verify(public_nkey, input, sig):
raise ErrInvalidSignature()


class KeyPair(object):
class KeyPair:
"""
NKEYS KeyPair used to sign and verify data.
"""

_seed: Optional[Union[bytes, bytearray]]
_keys: Optional[nacl.signing.SigningKey]
_public_key: Optional[bytes]
_private_key: Optional[bytes]

def __init__(
self,
seed=None,
keys=None,
public_key=None,
private_key=None,
):
"""
NKEYS KeyPair used to sign and verify data.

:param seed: The seed as a bytearray used to create the keypair.
:param keys: The keypair that can be used for signing.
:param public_key: The public key as a bytearray.

:rtype: nkeys.Keypair
:return: A KeyPair that can be used to sign and verify data.
"""
seed: Optional[Union[bytes, bytearray]] = None,
keys: Optional[nacl.signing.SigningKey] = None,
public_key: Optional[bytes] = None,
private_key: Optional[bytes] = None,
) -> None:
self._seed = seed
self._keys: nacl.signing.SigningKey = keys
self._public_key: nacl.signing.VerifyKey = public_key
self._keys = keys
self._public_key = public_key
self._private_key = private_key

def sign(self, input):
def sign(self, input: bytes) -> bytes:
"""
NKEYS KeyPair used to sign and verify data.

Expand All @@ -226,9 +228,11 @@ def sign(self, input):
:rtype bytes:
:return: The raw bytes representing the signed data.
"""
if self._keys is None:
raise ErrCannotSign()
return self._keys.sign(input).signature

def verify(self, input, sig):
def verify(self, input: bytes, sig: bytes) -> bool:
"""
:param input: The payload in bytes that was signed.
:param sig: The signature in bytes that will be verified.
Expand All @@ -239,7 +243,7 @@ def verify(self, input, sig):
return verify(self.public_key, input, sig)

@property
def public_key(self):
def public_key(self) -> bytes:
"""
Return the encoded public key associated with the KeyPair.

Expand All @@ -250,6 +254,9 @@ def public_key(self):
if self._public_key is not None:
return self._public_key

if self._seed is None or self._keys is None:
raise ErrPublicKeyOnly()

# Get the public key from the seed to verify later.
prefix, _ = decode_seed(self._seed)

Expand All @@ -268,10 +275,13 @@ def public_key(self):
return self._public_key

@property
def private_key(self):
def private_key(self) -> bytes:
if self._private_key is not None:
return self._private_key

if self._keys is None:
raise ErrPublicKeyOnly()

src = bytearray(bytes(self._keys._signing_key))
src.insert(0, PREFIX_BYTE_PRIVATE)

Expand All @@ -285,12 +295,12 @@ def private_key(self):
return self._private_key

@property
def seed(self):
if not hasattr(self, "_seed"):
def seed(self) -> Union[bytes, bytearray]:
if not hasattr(self, "_seed") or self._seed is None:
raise ErrInvalidSeed()
return self._seed

def wipe(self):
def wipe(self) -> None:
del self._seed
del self._keys
del self._public_key
Expand Down Expand Up @@ -557,14 +567,14 @@ def wipe(self):
]


def crc16(data):
def crc16(data: Union[bytes, bytearray]) -> int:
crc = 0
for c in data:
crc = ((crc << 8) & 0xffff) ^ CRC16TAB[((crc >> 8) ^ c) & 0x00FF]
return crc


def crc16_checksum(data):
def crc16_checksum(data: Union[bytes, bytearray]) -> bytes:
crc = crc16(data)
return crc.to_bytes(2, byteorder='little')

Expand All @@ -575,59 +585,59 @@ class NkeysError(Exception):

class ErrInvalidSeed(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: invalid seed"


class ErrInvalidPrefixByte(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: invalid prefix byte"


class ErrInvalidKey(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: invalid key"


class ErrInvalidPublicKey(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: invalid public key"


class ErrInvalidSeedLen(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: invalid seed length"


class ErrInvalidEncoding(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: invalid encoded key"


class ErrInvalidCheckSum(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: invalid crc16 checksum"


class ErrInvalidSignature(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: signature verification failed"


class ErrCannotSign(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: can not sign, no private key available"


class ErrPublicKeyOnly(NkeysError):

def __str__(self):
def __str__(self) -> str:
return "nkeys: no seed or private key available"
Empty file added nkeys/py.typed
Empty file.
19 changes: 15 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,24 @@ include = ["nkeys"] # package names should match these glob patterns (["*"] by
exclude = ["tests"] # exclude packages matching these glob patterns (empty by default)
namespaces = false # to disable scanning PEP 420 namespaces (true by default)

[tool.setuptools.package-data]
nkeys = ["py.typed"]

[tool.mypy]
files = ["nkeys"]
python_version = "3.7"
ignore_missing_imports = true
follow_imports = "silent"
python_version = "3.8"
strict = true
show_error_codes = true
check_untyped_defs = true
warn_unused_ignores = true
warn_redundant_casts = true
warn_return_any = true
disallow_any_unimported = false
no_implicit_reexport = true

# Third-party libraries without type stubs
[[tool.mypy.overrides]]
module = "nacl.*"
ignore_missing_imports = true

[tool.yapf]
split_before_first_argument = true
Expand Down