Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dependencies = [
"eth_abi>=4.0.0; python_version>=\"3.11\"",
"eth_account>=0.4.0,<0.11.0",
"python-magic",
"typer",
"typing_extensions",
]

Expand All @@ -54,7 +53,7 @@ mqtt = [
nuls2 = [
"aleph-nuls2",
]
polkadot = [
substrate = [
"py-sr25519-bindings",
"substrate-interface",
]
Expand All @@ -71,7 +70,7 @@ encryption = [
"eciespy>=0.3.13; python_version>=\"3.11\"",
]
all = [
"aleph-sdk-python[cosmos,dns,docs,ledger,mqtt,nuls2,polkadot,solana,tezos,encryption]",
"aleph-sdk-python[cosmos,dns,docs,ledger,mqtt,nuls2,substrate,solana,tezos,encryption]",
]

[tool.hatch.metadata]
Expand Down Expand Up @@ -112,7 +111,7 @@ features = [
"dns",
"ledger",
"nuls2",
"polkadot",
"substrate",
"solana",
"tezos",
"encryption",
Expand Down
98 changes: 7 additions & 91 deletions src/aleph/sdk/chains/sol.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,9 @@
import json
from pathlib import Path
from typing import Dict, Optional, Union
import warnings

import base58
from nacl.exceptions import BadSignatureError as NaclBadSignatureError
from nacl.public import PrivateKey, SealedBox
from nacl.signing import SigningKey, VerifyKey
from aleph.sdk.chains.solana import * # noqa

from ..exceptions import BadSignatureError
from .common import BaseAccount, get_fallback_private_key, get_verification_buffer


def encode(item):
return base58.b58encode(bytes(item)).decode("ascii")


class SOLAccount(BaseAccount):
CHAIN = "SOL"
CURVE = "curve25519"
_signing_key: SigningKey
_private_key: PrivateKey

def __init__(self, private_key: bytes):
self.private_key = private_key
self._signing_key = SigningKey(self.private_key)
self._private_key = self._signing_key.to_curve25519_private_key()

async def sign_message(self, message: Dict) -> Dict:
"""Sign a message inplace."""
message = self._setup_sender(message)
verif = get_verification_buffer(message)
signature = await self.sign_raw(verif)
sig = {
"publicKey": self.get_address(),
"signature": encode(signature),
}
message["signature"] = json.dumps(sig)
return message

async def sign_raw(self, buffer: bytes) -> bytes:
"""Sign a raw buffer."""
sig = self._signing_key.sign(buffer)
return sig.signature

def get_address(self) -> str:
return encode(self._signing_key.verify_key)

def get_public_key(self) -> str:
return bytes(self._signing_key.verify_key.to_curve25519_public_key()).hex()

async def encrypt(self, content) -> bytes:
value: bytes = bytes(SealedBox(self._private_key.public_key).encrypt(content))
return value

async def decrypt(self, content) -> bytes:
value: bytes = SealedBox(self._private_key).decrypt(content)
return value


def get_fallback_account(path: Optional[Path] = None) -> SOLAccount:
return SOLAccount(private_key=get_fallback_private_key(path=path))


def generate_key() -> bytes:
privkey = bytes(SigningKey.generate())
return privkey


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
):
"""
Verifies a signature.
Args:
signature: The signature to verify. Can be a base58 encoded string or bytes.
public_key: The public key to use for verification. Can be a base58 encoded string or bytes.
message: The message to verify. Can be an utf-8 string or bytes.
Raises:
BadSignatureError: If the signature is invalid.
"""
if isinstance(signature, str):
signature = base58.b58decode(signature)
if isinstance(message, str):
message = message.encode("utf-8")
if isinstance(public_key, str):
public_key = base58.b58decode(public_key)
try:
VerifyKey(public_key).verify(message, signature)
except NaclBadSignatureError as e:
raise BadSignatureError from e
warnings.warn(
"aleph.sdk.chains.sol is deprecated, use aleph.sdk.chains.solana instead",
DeprecationWarning,
stacklevel=1,
)
93 changes: 93 additions & 0 deletions src/aleph/sdk/chains/solana.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import json
from pathlib import Path
from typing import Dict, Optional, Union

import base58
from nacl.exceptions import BadSignatureError as NaclBadSignatureError
from nacl.public import PrivateKey, SealedBox
from nacl.signing import SigningKey, VerifyKey

from ..exceptions import BadSignatureError
from .common import BaseAccount, get_fallback_private_key, get_verification_buffer


def encode(item):
return base58.b58encode(bytes(item)).decode("ascii")


class SOLAccount(BaseAccount):
CHAIN = "SOL"
CURVE = "curve25519"
_signing_key: SigningKey
_private_key: PrivateKey

def __init__(self, private_key: bytes):
self.private_key = private_key
self._signing_key = SigningKey(self.private_key)
self._private_key = self._signing_key.to_curve25519_private_key()

async def sign_message(self, message: Dict) -> Dict:
"""Sign a message inplace."""
message = self._setup_sender(message)
verif = get_verification_buffer(message)
signature = await self.sign_raw(verif)
sig = {
"publicKey": self.get_address(),
"signature": encode(signature),
}
message["signature"] = json.dumps(sig)
return message

async def sign_raw(self, buffer: bytes) -> bytes:
"""Sign a raw buffer."""
sig = self._signing_key.sign(buffer)
return sig.signature

def get_address(self) -> str:
return encode(self._signing_key.verify_key)

def get_public_key(self) -> str:
return bytes(self._signing_key.verify_key.to_curve25519_public_key()).hex()

async def encrypt(self, content) -> bytes:
value: bytes = bytes(SealedBox(self._private_key.public_key).encrypt(content))
return value

async def decrypt(self, content) -> bytes:
value: bytes = SealedBox(self._private_key).decrypt(content)
return value


def get_fallback_account(path: Optional[Path] = None) -> SOLAccount:
return SOLAccount(private_key=get_fallback_private_key(path=path))


def generate_key() -> bytes:
privkey = bytes(SigningKey.generate())
return privkey


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
):
"""
Verifies a signature.
Args:
signature: The signature to verify. Can be a base58 encoded string or bytes.
public_key: The public key to use for verification. Can be a base58 encoded string or bytes.
message: The message to verify. Can be an utf-8 string or bytes.
Raises:
BadSignatureError: If the signature is invalid.
"""
if isinstance(signature, str):
signature = base58.b58decode(signature)
if isinstance(message, str):
message = message.encode("utf-8")
if isinstance(public_key, str):
public_key = base58.b58decode(public_key)
try:
VerifyKey(public_key).verify(message, signature)
except NaclBadSignatureError as e:
raise BadSignatureError from e
2 changes: 1 addition & 1 deletion tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from aleph_message.models import AggregateMessage, AlephMessage, PostMessage

import aleph.sdk.chains.ethereum as ethereum
import aleph.sdk.chains.sol as solana
import aleph.sdk.chains.solana as solana
import aleph.sdk.chains.substrate as substrate
import aleph.sdk.chains.tezos as tezos
from aleph.sdk import AlephHttpClient, AuthenticatedAlephHttpClient
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_chain_solana.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from nacl.signing import VerifyKey

from aleph.sdk.chains.common import get_verification_buffer
from aleph.sdk.chains.sol import SOLAccount, get_fallback_account, verify_signature
from aleph.sdk.chains.solana import SOLAccount, get_fallback_account, verify_signature
from aleph.sdk.exceptions import BadSignatureError


Expand Down