Skip to content

Commit 96bf68d

Browse files
committed
Fix: Improve code structure in pair-programming with Lyam
1 parent 2b25fe5 commit 96bf68d

File tree

9 files changed

+146
-427
lines changed

9 files changed

+146
-427
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ MANIFEST
5050
**/device.key
5151

5252
# environment variables
53-
.env
53+
.config.json
5454
.env.local
5555

5656
.gitsigners

src/aleph/sdk/account.py

Lines changed: 37 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,30 @@
11
import asyncio
2-
import json
32
import logging
43
from pathlib import Path
5-
from typing import Dict, List, Optional, Type, TypeVar, Union, overload
4+
from typing import Dict, Optional, Type, TypeVar
65

7-
import base58
86
from aleph_message.models import Chain
97

108
from aleph.sdk.chains.common import get_fallback_private_key
119
from aleph.sdk.chains.ethereum import ETHAccount
1210
from aleph.sdk.chains.remote import RemoteAccount
13-
from aleph.sdk.chains.solana import (
14-
SOLAccount,
15-
parse_solana_private_key,
16-
solana_private_key_from_bytes,
17-
)
18-
from aleph.sdk.conf import settings
11+
from aleph.sdk.chains.solana import SOLAccount
12+
from aleph.sdk.conf import load_main_configuration, settings
1913
from aleph.sdk.types import AccountFromPrivateKey
20-
from aleph.sdk.utils import load_account_key_context
2114

2215
logger = logging.getLogger(__name__)
2316

2417
T = TypeVar("T", bound=AccountFromPrivateKey)
2518

26-
CHAIN_TO_ACCOUNT_MAP: Dict[Chain, Type[AccountFromPrivateKey]] = {
27-
Chain.ETH: ETHAccount,
28-
Chain.AVAX: ETHAccount,
29-
Chain.SOL: SOLAccount,
30-
Chain.BASE: ETHAccount,
31-
}
3219

33-
34-
def detect_chain_from_private_key(private_key: Union[str, List[int], bytes]) -> Chain:
35-
"""
36-
Detect the blockchain chain based on the private key format.
37-
- Chain.ETH for Ethereum (EVM) private keys
38-
- Chain.SOL for Solana private keys (base58 or uint8 format).
39-
40-
Raises:
41-
ValueError: If the private key format is invalid or not recognized.
42-
"""
43-
if isinstance(private_key, (str, bytes)) and is_valid_private_key(
44-
private_key, ETHAccount
45-
):
46-
return Chain.ETH
47-
48-
elif is_valid_private_key(private_key, SOLAccount):
49-
return Chain.SOL
50-
51-
else:
52-
raise ValueError("Unsupported private key format. Unable to detect chain.")
53-
54-
55-
@overload
56-
def is_valid_private_key(
57-
private_key: Union[str, bytes], account_type: Type[ETHAccount]
58-
) -> bool: ...
59-
60-
61-
@overload
62-
def is_valid_private_key(
63-
private_key: Union[str, List[int], bytes], account_type: Type[SOLAccount]
64-
) -> bool: ...
65-
66-
67-
def is_valid_private_key(
68-
private_key: Union[str, List[int], bytes], account_type: Type[T]
69-
) -> bool:
70-
"""
71-
Check if the private key is valid for either Ethereum or Solana based on the account type.
72-
"""
73-
try:
74-
if account_type == ETHAccount:
75-
# Handle Ethereum private key validation
76-
if isinstance(private_key, str):
77-
if private_key.startswith("0x"):
78-
private_key = private_key[2:]
79-
private_key = bytes.fromhex(private_key)
80-
elif isinstance(private_key, list):
81-
raise ValueError("Ethereum keys cannot be a list of integers")
82-
83-
account_type(private_key)
84-
85-
elif account_type == SOLAccount:
86-
# Handle Solana private key validation
87-
if isinstance(private_key, bytes):
88-
return len(private_key) == 64
89-
elif isinstance(private_key, str):
90-
decoded_key = base58.b58decode(private_key)
91-
return len(decoded_key) == 64
92-
elif isinstance(private_key, list):
93-
return len(private_key) == 64 and all(
94-
isinstance(i, int) and 0 <= i <= 255 for i in private_key
95-
)
96-
97-
return True
98-
except Exception:
99-
return False
20+
def load_chain_account_type(chain: Chain) -> Type[AccountFromPrivateKey]:
21+
chain_account_map: Dict[Chain, Type[AccountFromPrivateKey]] = {
22+
Chain.ETH: ETHAccount,
23+
Chain.AVAX: ETHAccount,
24+
Chain.SOL: SOLAccount,
25+
Chain.BASE: ETHAccount,
26+
}
27+
return chain_account_map.get(chain) or ETHAccount
10028

10129

10230
def account_from_hex_string(private_key_str: str, account_type: Type[T]) -> T:
@@ -107,72 +35,42 @@ def account_from_hex_string(private_key_str: str, account_type: Type[T]) -> T:
10735

10836
def account_from_file(private_key_path: Path, account_type: Type[T]) -> T:
10937
private_key = private_key_path.read_bytes()
110-
if account_type == SOLAccount:
111-
private_key = parse_solana_private_key(
112-
solana_private_key_from_bytes(private_key)
113-
)
114-
11538
return account_type(private_key)
11639

11740

11841
def _load_account(
11942
private_key_str: Optional[str] = None,
12043
private_key_path: Optional[Path] = None,
121-
account_type: Type[AccountFromPrivateKey] = ETHAccount,
44+
account_type: Optional[Type[AccountFromPrivateKey]] = None,
12245
) -> AccountFromPrivateKey:
12346
"""Load private key from a string or a file. takes the string argument in priority"""
124-
125-
if private_key_str:
126-
# Check Account type based on private-key string format (base58 / uint for solana)
127-
private_key_chain = detect_chain_from_private_key(private_key=private_key_str)
128-
if private_key_chain == Chain.SOL:
129-
account_type = SOLAccount
130-
logger.debug("Solana private key is detected")
131-
parsed_key = parse_solana_private_key(private_key_str)
132-
return account_type(parsed_key)
133-
logger.debug("Using account from string")
134-
return account_from_hex_string(private_key_str, account_type)
135-
elif private_key_path and private_key_path.is_file():
136-
if private_key_path:
137-
account_type = ETHAccount # Default account type
138-
139-
try:
140-
account_data = load_account_key_context(settings.CONFIG_FILE)
141-
142-
if account_data:
143-
chain = Chain(account_data.chain)
144-
account_type = (
145-
CHAIN_TO_ACCOUNT_MAP.get(chain, ETHAccount) or ETHAccount
146-
)
147-
logger.debug(
148-
f"Detected {chain} account for path {private_key_path}"
149-
)
150-
else:
151-
logger.warning(
152-
f"No account data found in {private_key_path}, defaulting to {account_type.__name__}"
153-
)
154-
155-
except FileNotFoundError:
156-
logger.warning(
157-
f"{private_key_path} not found, using default account type {account_type.__name__}"
47+
if private_key_str or (private_key_path and private_key_path.is_file()):
48+
if account_type:
49+
if private_key_path and private_key_path.is_file():
50+
return account_from_file(private_key_path, account_type)
51+
elif private_key_str:
52+
return account_from_hex_string(private_key_str, account_type)
53+
else:
54+
raise ValueError("Any private key specified")
55+
else:
56+
main_configuration = load_main_configuration(settings.CONFIG_FILE)
57+
if main_configuration:
58+
account_type = load_chain_account_type(main_configuration.chain)
59+
logger.debug(
60+
f"Detected {main_configuration.chain} account for path {settings.CONFIG_FILE}"
15861
)
159-
except json.JSONDecodeError:
160-
logger.error(
161-
f"Invalid format in {private_key_path}, unable to load account info."
162-
)
163-
raise ValueError(f"Invalid format in {private_key_path}.")
164-
except KeyError as e:
165-
logger.error(f"Missing key in account config: {e}")
166-
raise ValueError(
167-
f"Invalid account data in {private_key_path}. Key {e} is missing."
168-
)
169-
except Exception as e:
170-
logger.error(f"Error loading account from {private_key_path}: {e}")
171-
raise ValueError(
172-
f"Could not load account data from {private_key_path}."
62+
else:
63+
account_type = ETHAccount # Defaults to ETHAccount
64+
logger.warning(
65+
f"No main configuration data found in {settings.CONFIG_FILE}, defaulting to {account_type.__name__}"
17366
)
67+
if private_key_path and private_key_path.is_file():
68+
return account_from_file(private_key_path, account_type)
69+
elif private_key_str:
70+
return account_from_hex_string(private_key_str, account_type)
71+
else:
72+
raise ValueError("Any private key specified")
17473

175-
return account_from_file(private_key_path, account_type)
17674
elif settings.REMOTE_CRYPTO_HOST:
17775
logger.debug("Using remote account")
17876
loop = asyncio.get_event_loop()
@@ -183,6 +81,7 @@ def _load_account(
18381
)
18482
)
18583
else:
84+
account_type = ETHAccount # Defaults to ETHAccount
18685
new_private_key = get_fallback_private_key()
18786
account = account_type(private_key=new_private_key)
18887
logger.info(

src/aleph/sdk/chains/common.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from pathlib import Path
44
from typing import Dict, Optional
55

6-
import nacl.signing
76
from coincurve.keys import PrivateKey
87
from typing_extensions import deprecated
98

@@ -150,20 +149,6 @@ def generate_key() -> bytes:
150149
return privkey.secret
151150

152151

153-
def generate_key_solana() -> bytes:
154-
"""
155-
Generate a new Solana private key (32 bytes) using Ed25519.
156-
157-
Returns:
158-
A bytes object representing the 32-byte Solana private key.
159-
"""
160-
# Generate a new signing key (this is the private key part)
161-
private_key = nacl.signing.SigningKey.generate()
162-
163-
# Return only the private key (32 bytes)
164-
return private_key.encode()
165-
166-
167152
def get_fallback_private_key(path: Optional[Path] = None) -> bytes:
168153
path = path or settings.PRIVATE_KEY_FILE
169154
private_key: bytes

src/aleph/sdk/chains/solana.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class SOLAccount(BaseAccount):
2222
_private_key: PrivateKey
2323

2424
def __init__(self, private_key: bytes):
25-
self.private_key = private_key
25+
self.private_key = parse_private_key(private_key_from_bytes(private_key))
2626
self._signing_key = SigningKey(self.private_key)
2727
self._private_key = self._signing_key.to_curve25519_private_key()
2828

@@ -93,7 +93,7 @@ def verify_signature(
9393
raise BadSignatureError from e
9494

9595

96-
def solana_private_key_from_bytes(
96+
def private_key_from_bytes(
9797
private_key_bytes: bytes, output_format: str = "base58"
9898
) -> Union[str, List[int], bytes]:
9999
"""
@@ -132,7 +132,7 @@ def solana_private_key_from_bytes(
132132
raise ValueError("Invalid output format. Choose 'base58', 'list', or 'bytes'.")
133133

134134

135-
def parse_solana_private_key(private_key: Union[str, List[int], bytes]) -> bytes:
135+
def parse_private_key(private_key: Union[str, List[int], bytes]) -> bytes:
136136
"""
137137
Parse the private key which could be either:
138138
- a base58-encoded string (which may contain both private and public key)

src/aleph/sdk/conf.py

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
import json
2+
import logging
23
import os
34
from pathlib import Path
45
from shutil import which
56
from typing import Dict, Optional, Union
67

78
from aleph_message.models import Chain
89
from aleph_message.models.execution.environment import HypervisorType
9-
from pydantic import BaseSettings, Field
10+
from pydantic import BaseModel, BaseSettings, Field
1011

1112
from aleph.sdk.types import ChainInfo
1213

14+
logger = logging.getLogger(__name__)
15+
1316

1417
class Settings(BaseSettings):
1518
CONFIG_HOME: Optional[str] = None
1619

1720
CONFIG_FILE: Path = Field(
18-
default=Path("chains_config.json"),
21+
default=Path("config.json"),
1922
description="Path to the JSON file containing chain account configurations",
2023
)
2124

@@ -145,6 +148,18 @@ class Config:
145148
env_file = ".env"
146149

147150

151+
class MainConfiguration(BaseModel):
152+
"""
153+
Intern Chain Management with Account.
154+
"""
155+
156+
path: Path
157+
chain: Chain
158+
159+
class Config:
160+
use_enum_values = True
161+
162+
148163
# Settings singleton
149164
settings = Settings()
150165

@@ -168,8 +183,8 @@ class Config:
168183
settings.PRIVATE_MNEMONIC_FILE = Path(
169184
settings.CONFIG_HOME, "private-keys", "substrate.mnemonic"
170185
)
171-
if str(settings.CONFIG_FILE) == "chains_config.json":
172-
settings.CONFIG_FILE = Path(settings.CONFIG_HOME, "chains_config.json")
186+
if str(settings.CONFIG_FILE) == "config.json":
187+
settings.CONFIG_FILE = Path(settings.CONFIG_HOME, "config.json")
173188
# If Config file exist and well filled we update the PRIVATE_KEY_FILE default
174189
if settings.CONFIG_FILE.exists():
175190
try:
@@ -191,3 +206,35 @@ class Config:
191206
field = field.lower()
192207
settings.CHAINS[chain].__dict__[field] = value
193208
settings.__delattr__(f"CHAINS_{fields}")
209+
210+
211+
def save_main_configuration(file_path: Path, data: MainConfiguration):
212+
"""
213+
Synchronously save a single ChainAccount object as JSON to a file.
214+
"""
215+
with file_path.open("w") as file:
216+
data_serializable = data.dict()
217+
data_serializable["path"] = str(data_serializable["path"])
218+
json.dump(data_serializable, file, indent=4)
219+
220+
221+
def load_main_configuration(file_path: Path) -> Optional[MainConfiguration]:
222+
"""
223+
Synchronously load the private key and chain type from a file.
224+
If the file does not exist or is empty, return None.
225+
"""
226+
if not file_path.exists() or file_path.stat().st_size == 0:
227+
logger.debug(f"File {file_path} does not exist or is empty. Returning None.")
228+
return None
229+
230+
try:
231+
with file_path.open("rb") as file:
232+
content = file.read()
233+
data = json.loads(content.decode("utf-8"))
234+
return MainConfiguration(**data)
235+
except UnicodeDecodeError as e:
236+
logger.error(f"Unable to decode {file_path} as UTF-8: {e}")
237+
except json.JSONDecodeError:
238+
logger.error(f"Invalid JSON format in {file_path}.")
239+
240+
return None

0 commit comments

Comments
 (0)