Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 123 additions & 55 deletions scalecodec/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

import re
import warnings
import threading
import weakref
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import Optional, TYPE_CHECKING, Union
from typing import Optional, TYPE_CHECKING, Union, Any, Tuple

from scalecodec.constants import TYPE_DECOMP_MAX_RECURSIVE
from scalecodec.exceptions import RemainingScaleBytesNotEmptyException, InvalidScaleTypeValueException
Expand All @@ -27,6 +29,40 @@
from scalecodec.types import GenericMetadataVersioned, GenericRegistryType


# ---------------------------------------------------------------------------
# Dynamic class cache to avoid repeated ABC subclass allocations
# ---------------------------------------------------------------------------

# Weak cache so entries disappear once the classes are genuinely unused.
# Key format: (base_class, name, frozen_attrs)
_DYNAMIC_CLASS_CACHE: "weakref.WeakValueDictionary[Tuple[Any, str, Tuple], type]" = weakref.WeakValueDictionary()
_DYNAMIC_CACHE_LOCK = threading.RLock()


def _freeze_attr(v: Any) -> Any:
"""Convert common containers to hashable forms for the cache key."""
if isinstance(v, dict):
return ("dict", tuple(sorted((k, _freeze_attr(vv)) for k, vv in v.items())))
if isinstance(v, (list, tuple)):
return ("seq", tuple(_freeze_attr(x) for x in v))
return v


def _make_dynamic_class(base_class: type, name: str, attrs: dict) -> type:
"""
Cached dynamic class constructor. Preserves the given class `name`
(including characters like '<' and '::') so existing reset logic keeps working.
"""
key = (base_class, name, tuple(sorted((k, _freeze_attr(v)) for k, v in attrs.items())))
with _DYNAMIC_CACHE_LOCK:
cls = _DYNAMIC_CLASS_CACHE.get(key)
if cls is not None:
return cls
cls = type(name, (base_class,), dict(attrs))
_DYNAMIC_CLASS_CACHE[key] = cls
return cls


class Singleton(type):
_instances = {}

Expand Down Expand Up @@ -114,9 +150,11 @@ def get_decoder_class(self, type_string: Union[str, dict]):

if type(type_string) is dict:
# Inner struct
decoder_class = type('InnerStruct', (self.get_decoder_class('Struct'),), {
'type_mapping': tuple(type_string.items())
})
decoder_class = _make_dynamic_class(
self.get_decoder_class('Struct'),
'InnerStruct',
{'type_mapping': tuple(type_string.items())}
)
decoder_class.runtime_config = self
return decoder_class

Expand All @@ -130,7 +168,7 @@ def get_decoder_class(self, type_string: Union[str, dict]):

if not decoder_class:

# Type string containg subtype
# Type string containing subtype
if type_string[-1:] == '>':

# Extract sub types
Expand All @@ -143,14 +181,20 @@ def get_decoder_class(self, type_string: Union[str, dict]):
# Create dynamic class for Part1<Part2> based on Part1 and set class variable Part2 as sub_type
base_class = self.type_registry.get('types', {}).get(type_parts[0].lower(), None)
if base_class:
decoder_class = type(type_string, (base_class,), {'sub_type': type_parts[1]})
decoder_class = _make_dynamic_class(
base_class,
type_string,
{'sub_type': type_parts[1]}
)

# Custom tuples
elif type_string != '()' and type_string[0] == '(' and type_string[-1] == ')':

decoder_class = type(type_string, (self.get_decoder_class('tuple'),), {
'type_string': type_string
})
decoder_class = _make_dynamic_class(
self.get_decoder_class('tuple'),
type_string,
{'type_string': type_string}
)

decoder_class.build_type_mapping()

Expand All @@ -162,10 +206,14 @@ def get_decoder_class(self, type_string: Union[str, dict]):

if type_parts:
# Create dynamic class for e.g. [u8; 4] resulting in array of u8 with 4 elements
decoder_class = type(type_string, (self.get_decoder_class('FixedLengthArray'),), {
'sub_type': type_parts[0],
'element_count': int(type_parts[1])
})
decoder_class = _make_dynamic_class(
self.get_decoder_class('FixedLengthArray'),
type_string,
{
'sub_type': type_parts[0],
'element_count': int(type_parts[1])
}
)

if decoder_class:
# Attach RuntimeConfigurationObject to new class
Expand Down Expand Up @@ -236,18 +284,22 @@ def update_type_registry_types(self, types_dict):
if base_cls is None:
base_cls = Struct

decoder_class = type(type_string, (base_cls,), {
'type_mapping': decoder_class_data.get('type_mapping')
})
decoder_class = _make_dynamic_class(
base_cls,
type_string,
{'type_mapping': decoder_class_data.get('type_mapping')}
)

elif decoder_class_data['type'] == 'tuple':

if base_cls is None:
base_cls = Tuple

decoder_class = type(type_string, (base_cls,), {
'type_mapping': decoder_class_data.get('type_mapping')
})
decoder_class = _make_dynamic_class(
base_cls,
type_string,
{'type_mapping': decoder_class_data.get('type_mapping')}
)

elif decoder_class_data['type'] == 'enum':

Expand All @@ -260,20 +312,28 @@ def update_type_registry_types(self, types_dict):
# Transform value_list with explicitly specified index numbers
value_list = {i: v for v, i in value_list.items()}

decoder_class = type(type_string, (base_cls,), {
'value_list': value_list,
'type_mapping': decoder_class_data.get('type_mapping')
})
decoder_class = _make_dynamic_class(
base_cls,
type_string,
{
'value_list': value_list,
'type_mapping': decoder_class_data.get('type_mapping')
}
)

elif decoder_class_data['type'] == 'set':

if base_cls is None:
base_cls = Set

decoder_class = type(type_string, (base_cls,), {
'value_list': decoder_class_data.get('value_list'),
'value_type': decoder_class_data.get('value_type', 'u64')
})
decoder_class = _make_dynamic_class(
base_cls,
type_string,
{
'value_list': decoder_class_data.get('value_list'),
'value_type': decoder_class_data.get('value_type', 'u64')
}
)

else:
raise NotImplementedError("Dynamic decoding type '{}' not supported".format(
Expand Down Expand Up @@ -378,7 +438,7 @@ def get_decoder_class_for_scale_info_definition(

if base_decoder_class and hasattr(base_decoder_class, 'process_scale_info_definition'):
# if process_scale_info_definition is implemented result is final
decoder_class = type(type_string, (base_decoder_class,), {})
decoder_class = _make_dynamic_class(base_decoder_class, type_string, {})
decoder_class.process_scale_info_definition(scale_info_type, prefix)

# Link ScaleInfo RegistryType to decoder class
Expand All @@ -394,10 +454,14 @@ def get_decoder_class_for_scale_info_definition(
if base_decoder_class is None:
base_decoder_class = self.get_decoder_class('FixedLengthArray')

decoder_class = type(type_string, (base_decoder_class,), {
'sub_type': f"{prefix}::{scale_info_type.value['def']['array']['type']}",
'element_count': scale_info_type.value['def']['array']['len']
})
decoder_class = _make_dynamic_class(
base_decoder_class,
type_string,
{
'sub_type': f"{prefix}::{scale_info_type.value['def']['array']['type']}",
'element_count': scale_info_type.value['def']['array']['len']
}
)

elif 'composite' in scale_info_type.value['def']:

Expand All @@ -420,15 +484,19 @@ def get_decoder_class_for_scale_info_definition(
if base_decoder_class is None:
base_decoder_class = self.get_decoder_class(base_type_string)

decoder_class = type(type_string, (base_decoder_class,), {
'type_mapping': type_mapping
})
decoder_class = _make_dynamic_class(
base_decoder_class,
type_string,
{'type_mapping': type_mapping}
)

elif 'sequence' in scale_info_type.value['def']:
# Vec
decoder_class = type(type_string, (self.get_decoder_class('Vec'),), {
'sub_type': f"{prefix}::{scale_info_type.value['def']['sequence']['type']}"
})
decoder_class = _make_dynamic_class(
self.get_decoder_class('Vec'),
type_string,
{'sub_type': f"{prefix}::{scale_info_type.value['def']['sequence']['type']}"}
)

elif 'variant' in scale_info_type.value['def']:
# Enum
Expand Down Expand Up @@ -464,38 +532,40 @@ def get_decoder_class_for_scale_info_definition(
if base_decoder_class is None:
base_decoder_class = self.get_decoder_class("Enum")

decoder_class = type(type_string, (base_decoder_class,), {
'type_mapping': type_mapping
})
decoder_class = _make_dynamic_class(
base_decoder_class,
type_string,
{'type_mapping': type_mapping}
)

elif 'tuple' in scale_info_type.value['def']:

type_mapping = [f"{prefix}::{f}" for f in scale_info_type.value['def']['tuple']]

decoder_class = type(type_string, (self.get_decoder_class('Tuple'),), {
'type_mapping': type_mapping
})
decoder_class = _make_dynamic_class(
self.get_decoder_class('Tuple'),
type_string,
{'type_mapping': type_mapping}
)

elif 'compact' in scale_info_type.value['def']:
# Compact
decoder_class = type(type_string, (self.get_decoder_class('Compact'),), {
'sub_type': f"{prefix}::{scale_info_type.value['def']['compact']['type']}"
})
decoder_class = _make_dynamic_class(
self.get_decoder_class('Compact'),
type_string,
{'sub_type': f"{prefix}::{scale_info_type.value['def']['compact']['type']}"}
)

elif 'phantom' in scale_info_type.value['def']:
decoder_class = type(type_string, (self.get_decoder_class('Null'),), {})
decoder_class = _make_dynamic_class(self.get_decoder_class('Null'), type_string, {})

elif 'bitsequence' in scale_info_type.value['def']:
decoder_class = type(type_string, (self.get_decoder_class('BitVec'),), {})
decoder_class = _make_dynamic_class(self.get_decoder_class('BitVec'), type_string, {})

else:
raise NotImplementedError(f"RegistryTypeDef {scale_info_type.value['def']} not implemented")

# if 'path' in scale_info_type.value:
# decoder_class.type_string = '::'.join(scale_info_type.value['path'])

# Link ScaleInfo RegistryType to decoder class

decoder_class.scale_info_type = scale_info_type

return decoder_class
Expand Down Expand Up @@ -1078,5 +1148,3 @@ class ScalePrimitive(ScaleType, ABC):
@classmethod
def generate_type_decomposition(cls, _recursion_level: int = 0, max_recursion: int = TYPE_DECOMP_MAX_RECURSIVE):
return cls.__name__.lower()