Source code for thor_devkit.rlp

r"""RLP Encoding/Decoding layer for "real-world" objects."""
import sys
import warnings
from abc import ABC, abstractmethod
from itertools import dropwhile
from typing import (
    Any,
    Dict,
    Generic,
    Iterable,
    List,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    TypeVar,
    Union,
)

from rlp import decode as rlp_decode
from rlp import encode as rlp_encode
from rlp.sedes import BigEndianInt
from voluptuous.error import Invalid

from thor_devkit import validation
from thor_devkit.cry.utils import izip
from thor_devkit.deprecation import deprecated, renamed_class
from thor_devkit.exceptions import DeserializationError, SerializationError

if sys.version_info < (3, 10):
    from typing_extensions import TypeGuard
else:
    from typing import TypeGuard


__all__ = [
    # Main
    "ComplexCodec",
    # Scalar
    "BytesKind",
    "NumericKind",
    "BlobKind",
    "FixedBlobKind",
    "OptionalFixedBlobKind",
    "CompactFixedBlobKind",
    # Wrappers
    "DictWrapper",
    "ListWrapper",
    "HomoListWrapper",
    # Abstract
    "AbstractSerializer",
    "ScalarKind",
    "BaseWrapper",
]

# We lack recursive types with mypy
_PackedSequenceT = Sequence[
    Union[bytes, Sequence[Union[bytes, Sequence[Union[bytes, Sequence[Any]]]]]]
]
_T = TypeVar("_T")


[docs]class AbstractSerializer(Generic[_T], ABC): """Abstract class for all serializers. .. versionadded:: 2.0.0 """
[docs] @abstractmethod def serialize(self, __obj: _T) -> Union[bytes, _PackedSequenceT]: """Serialize the object into a RLP encodable "item".""" raise NotImplementedError
[docs] @abstractmethod def deserialize(self, __serial: Any) -> _T: """Deserialize given bytes into higher-level object.""" raise NotImplementedError
[docs]class ScalarKind(AbstractSerializer[_T]): """Abstract class for all scalar serializers (they accept "basic" values)."""
[docs] @abstractmethod def serialize(self, __obj: _T) -> bytes: """Serialize the object into a RLP encodable "item".""" raise NotImplementedError
[docs] @abstractmethod def deserialize(self, __serial: bytes) -> _T: """Deserialize given bytes into higher-level object.""" raise NotImplementedError
[docs]class BytesKind(ScalarKind[bytes]): """Convert bytes type of Python object to RLP "item"."""
[docs] @classmethod def is_valid_type(cls, obj: object) -> TypeGuard[bytes]: """Confirm that ``obj`` is :class:`bytes` or :class:`bytearray`.""" return isinstance(obj, (bytes, bytearray))
[docs] def serialize(self, obj: bytes) -> bytes: """Serialize the object into a RLP encodable "item". Parameters ---------- obj : bytes The input. Returns ------- bytes The "item" in bytes. Raises ------ TypeError If input is not bytes. """ if not self.is_valid_type(obj): raise TypeError( f'Expected parameter of type "bytes", got: {type(obj)}', obj ) return obj
[docs] def deserialize(self, serial: bytes) -> bytes: """Deserialize a RLP "item" back to bytes. Parameters ---------- serial : bytes The input. Returns ------- bytes Original bytes. Raises ------ TypeError If input is not bytes. """ if not self.is_valid_type(serial): raise TypeError( f'Expected parameter of type "bytes", got: {type(serial)}', serial ) return serial
[docs]class NumericKind(BigEndianInt, ScalarKind[int]): """Serializer for number-like objects. Good examples are:: '0x0', '0x123', '0', '100', 0, 0x123, True Bad examples are:: '0x123z', {}, '0x', -1, '0x12345678123456780' .. versionchanged:: 2.0.0 Allowed :class:`bool` values :class:`True` and :class:`False`. """ max_bytes: Optional[int] """Maximal allowed size of number, in bytes.""" def __init__(self, max_bytes: Optional[int] = None) -> None: """Initialize a NumericKind. Parameters ---------- max_bytes : Optional[int], optional Max bytes in the encoded result (prepend 0 if there's not enough) """ self.max_bytes = max_bytes super().__init__(l=max_bytes)
[docs] def serialize(self, obj: Union[str, int]) -> bytes: """Serialize the object into a RLP encodable "item". Parameters ---------- obj : str or int obj is either int or string representation of int parseable by :func:`int`. Returns ------- bytes Serialized data Raises ------ SerializationError If input data is malformed TypeError If input is neither int nor string representation of int """ if isinstance(obj, str): try: number = int(obj, 0) except ValueError: raise SerializationError( "The input string does not represent a number.", obj ) elif isinstance(obj, int): number = int(obj) else: raise TypeError(f"expected str or int, got: {type(obj)}") result_bytes = super().serialize(number) # remove leading 0 from bytes sequence. return bytes(dropwhile(lambda x: not x, result_bytes))
[docs] def deserialize(self, serial: bytes) -> int: """Deserialize bytes to int. Parameters ---------- serial : bytes bytes Returns ------- int Deserialized number. Raises ------ DeserializationError If bytes contain leading 0. """ if serial and not serial[0]: raise DeserializationError( "byte string must not have leading zeroes", serial ) # add leading 0 to bytes sequence if width is set. if self.max_bytes: serial = serial.rjust(self.max_bytes, b"\x00") return super().deserialize(serial)
[docs]class BlobKind(ScalarKind[str]): """Serializer for ``0x....`` hex strings. Used for strings that shouldn't be interpreted as a number, usually an identifier. Examples: address, block_ref, data to smart contract. """
[docs] def serialize(self, obj: str) -> bytes: """Serialize a ``0x...`` string to bytes. Parameters ---------- obj : str ``0x...`` style string. Returns ------- bytes Encoded string. Raises ------ SerializationError If input data is malformed. """ try: return validation.hex_string(allow_prefix=True, to_bytes=True)(obj) except Invalid as e: raise SerializationError(str(e), obj)
[docs] def deserialize(self, serial: bytes) -> str: """Deserialize bytes to ``0x...`` string. Parameters ---------- serial : bytes Encoded string. Returns ------- str string of style ``0x...`` Raises ------ TypeError If input is not ``bytes`` nor ``bytearray`` """ if not isinstance(serial, (bytes, bytearray)): raise TypeError(f"expected bytes, got: {type(serial)}") return "0x" + serial.hex()
[docs]class FixedBlobKind(BlobKind): """Serializer for ``0x....`` **fixed-length** hex strings. Used for strings that shouldn't be interpreted as a number, usually an identifier. Examples: address, block_ref, data to smart contract. Note ---- This kind has a fixed length of bytes. (also means the input hex is fixed length) """ byte_length: int """Length of blob, in bytes.""" def __init__(self, byte_length: int) -> None: self.byte_length = byte_length
[docs] def serialize(self, obj: str) -> bytes: """Serialize a ``0x...`` string to bytes. Parameters ---------- obj : str ``0x...`` style string. Returns ------- bytes Encoded string. Raises ------ SerializationError If input data is malformed (e.g. wrong length) """ try: validation.hex_string(self.byte_length * 2, allow_prefix=True)(obj) except Invalid as e: raise SerializationError(str(e), obj) from e return super().serialize(obj)
[docs] def deserialize(self, serial: bytes) -> str: """Deserialize bytes to ``0x...`` string. Parameters ---------- serial : bytes Encoded string. Returns ------- str String of style ``0x...'`` Raises ------ DeserializationError If input is malformed (e.g. wrong length) """ if len(serial) != self.byte_length: raise DeserializationError( f"Bytes should be of length {self.byte_length}", serial ) return super().deserialize(serial)
[docs]class OptionalFixedBlobKind(FixedBlobKind): """Serializer for ``0x....`` fixed-length hex strings that may be :class:`None`. Used for strings that shouldn't be interpreted as a number, usually an identifier. Examples: address, block_ref, data to smart contract. Note ---- This kind has a fixed length of bytes. (also means the input hex is fixed length) For this kind, input can be None. Then decoded is also None. """
[docs] def serialize(self, obj: Optional[str] = None) -> bytes: """Serialize a ``0x...`` string or :class:`None` to bytes. Parameters ---------- obj : Optional[str], default: None ``0x...`` style string. Returns ------- bytes Encoded string. """ if obj is None: return bytes(0) return super().serialize(obj)
# Unsafe override
[docs] def deserialize(self, serial: bytes) -> Optional[str]: # type: ignore[override] """Deserialize bytes to ``0x...`` string or :class:`None`. Parameters ---------- serial : bytes Serialized data. Returns ------- Optional[str] String of style ``0x...`` or :class:`None` """ if not serial: return None return super().deserialize(serial)
[docs]@renamed_class("NoneableFixedBlobKind") class NoneableFixedBlobKind(OptionalFixedBlobKind): """Deprecated alias for :class:`OptionalFixedBlobKind`. .. deprecated:: 2.0.0 Use :class:`OptionalFixedBlobKind` instead. .. customtox-exclude:: """
[docs]class CompactFixedBlobKind(FixedBlobKind): """Serializer for ``0x....`` fixed-length hex strings that may start with zeros. Used for strings that shouldn't be interpreted as a number, usually an identifier. Examples: address, block_ref, data to smart contract. Note ---- When encode, the result fixed length bytes will be removed of leading zeros. i.e. ``000123 -> 123`` When decode, it expects the input bytes length <= fixed_length. and it pads the leading zeros back. Output ``'0x{"0" * n}xxx...'`` """
[docs] def serialize(self, obj: str) -> bytes: """Serialize a ``0x...`` string to bytes, stripping leading zeroes. Parameters ---------- obj : str ``0x...`` style string. Returns ------- bytes Encoded string with leading zeroes removed. """ b = super().serialize(obj) return bytes(dropwhile(lambda x: not x, b))
[docs] def deserialize(self, serial: bytes) -> str: """Deserialize bytes to ``0x...`` string. Parameters ---------- serial : bytes Encoded data. Returns ------- str String of style ``0x...`` of fixed length Raises ------ DeserializationError If input is malformed. """ if len(serial) > self.byte_length: raise DeserializationError( "Bytes too long, only need {}".format(self.byte_length), serial ) if serial and not serial[0]: raise DeserializationError( "Byte sequence must have no leading zeroes", serial ) padded = bytes(serial).rjust(self.byte_length, b"\x00") return super().deserialize(padded)
[docs]class BaseWrapper(AbstractSerializer[_T]): """Abstract serializer for complex types."""
[docs] @abstractmethod def serialize(self, __obj: _T) -> _PackedSequenceT: """Serialize the object into a RLP encodable "item". .. versionadded:: 2.0.0 """ raise NotImplementedError
[docs] @abstractmethod def deserialize(self, __serial: _PackedSequenceT) -> _T: """Deserialize given bytes into higher-level object. .. versionadded:: 2.0.0 """ raise NotImplementedError
[docs]class DictWrapper(BaseWrapper[Mapping[str, Any]]): """A container for working with dict-like objects.""" keys: Sequence[str] """Field names.""" codecs: Sequence[AbstractSerializer[Any]] """Codecs to use for each field.""" def __init__( self, codecs: Union[ Sequence[Tuple[str, AbstractSerializer[Any]]], Mapping[str, AbstractSerializer[Any]], ], ) -> None: """Create wrapper from items. Parameters ---------- codecs : Mapping[str, BaseWrapper or ScalarKind] or its ``.values()``-like list Codecs to use. Possible values (codec is any BaseWrapper or ScalarKind): - Any mapping from str to codec, e.g. ``{'foo': NumericKind()}`` - Any sequence of tuples ``(name, codec)``, e.g. ``[('foo', NumericKind())]`` """ if isinstance(codecs, Mapping): self.keys, self.codecs = izip(*codecs.items()) else: self.keys, self.codecs = izip(*codecs) def __len__(self) -> int: """Count of serializable objects.""" return len(self.codecs)
[docs] def serialize(self, obj: Mapping[str, Any]) -> _PackedSequenceT: """Serialize dictionary to sequence of serialized values. .. versionadded:: 2.0.0 Parameters ---------- obj: Mapping[str, Any] Dictionary to serialize. Returns ------- Sequence[bytes or Sequence[...]] (recursive) Sequence of serialized values. Raises ------ SerializationError If input is malformed. """ try: return [ codec.serialize(obj[key]) for (key, codec, _) in izip(self.keys, self.codecs, obj) ] except KeyError as e: raise SerializationError(f"Missing key: '{e.args[0]}'", obj) except ValueError as e: raise SerializationError( f"Keys count differs: expected {len(obj)}, got {len(self)}", obj ) from e
[docs] def deserialize(self, serial: _PackedSequenceT) -> Dict[str, Any]: """Deserialize sequence of encoded values to dictionary with serialized values. .. versionadded:: 2.0.0 Parameters ---------- obj: Sequence[bytes or Sequence[...]] (recursive) Sequence of values to deserialize. Returns ------- Mapping[str, Any] Deserialized values, mapping field names to decoded values. Raises ------ DeserializationError If input is malformed. """ try: return { key: codec.deserialize(blob) for (blob, key, codec) in izip(serial, self.keys, self.codecs) } except ValueError as e: raise DeserializationError( f"Keys count differs: expected {len(serial)}, got {len(self)}", serial, ) from e
[docs]class ListWrapper(BaseWrapper[Sequence[Any]]): """Container for parsing a heterogeneous list. The items in the list can be of different types. """ codecs: Sequence[AbstractSerializer[Any]] """Codecs to use for each element of sequence.""" def __init__(self, codecs: Sequence[AbstractSerializer[Any]]) -> None: """Create wrapper from items. Parameters ---------- codecs : Sequence[AbstractSerializer] A list of codecs. eg. [codec, codec, codec...] codec is either a BaseWrapper, or a ScalarKind. """ self.codecs = list(codecs) def __len__(self) -> int: """Count of serializable objects.""" return len(self.codecs)
[docs] def serialize(self, obj: Sequence[Any]) -> _PackedSequenceT: """Serialize sequence (list) of values to sequence of serialized values. .. versionadded:: 2.0.0 Parameters ---------- obj: Sequence[Any] Sequence of values to serialize. Returns ------- Sequence[bytes or Sequence[...]] (recursive) Sequence of serialized values. Raises ------ SerializationError If input is malformed. """ try: return [codec.serialize(item) for (item, codec) in izip(obj, self.codecs)] except ValueError as e: raise SerializationError( f"Items count differs: expected {len(obj)}, got {len(self)}", obj ) from e
[docs] def deserialize(self, serial: _PackedSequenceT) -> Sequence[Any]: """Deserialize sequence of encoded values to sequence. .. versionadded:: 2.0.0 Parameters ---------- obj: Sequence[bytes or Sequence[...]] (recursive) Sequence of values to deserialize. Returns ------- Sequence[Any] Deserialized values. Raises ------ DeserializationError If input is malformed. """ try: return [ codec.deserialize(blob) for (blob, codec) in izip(serial, self.codecs) ] except ValueError as e: raise DeserializationError( f"Items count differs: expected {len(serial)}, got {len(self)}", serial, ) from e
[docs]class HomoListWrapper(BaseWrapper[Sequence[Any]]): """Container for parsing a homogeneous list. Used when the items in the list are of the same type. """ codec: AbstractSerializer[Any] """Codec to use for each element of array.""" def __init__(self, codec: AbstractSerializer[Any]) -> None: """Create wrapper from items. Parameters ---------- codec : AbstractSerializer codec is either a BaseWrapper, or a ScalarKind. """ self.codec = codec
[docs] def serialize(self, obj: Sequence[Any]) -> _PackedSequenceT: """Serialize sequence (list) of values to sequence of serialized values. .. versionadded:: 2.0.0 Parameters ---------- obj: Sequence[Any] Sequence of values to serialize. Returns ------- Sequence[bytes or Sequence[...]] (recursive) Sequence of serialized values. Raises ------ SerializationError If input is malformed. """ return [self.codec.serialize(item) for item in obj]
[docs] def deserialize(self, serial: _PackedSequenceT) -> Sequence[Any]: """Deserialize sequence of encoded values to sequence. .. versionadded:: 2.0.0 Parameters ---------- obj: Sequence[bytes or Sequence[...]] (recursive) Sequence of values to deserialize. Returns ------- Sequence[Any] Deserialized values. Raises ------ DeserializationError If input is malformed. """ return [self.codec.deserialize(blob) for blob in serial]
[docs]@deprecated def pack(obj: Any, wrapper: AbstractSerializer[Any]) -> Union[bytes, _PackedSequenceT]: """Pack a Python object according to wrapper. .. deprecated:: 2.0.0 Use ``<wrapper>.serialize`` directly instead. .. customtox-exclude:: Parameters ---------- obj : Any A dict, a list, or a string/int/any... wrapper : AbstractSerializer[Any] A Wrapper. Returns ------- bytes If obj is a basic type. List of packed items If obj is dict/list. Raises ------ SerializationError If data cannot be serialized using specified codec. TypeError If wrapper type is unknown. """ warnings.warn("Function 'pack' is deprecated. Use '<wrapper>.serialize' instead.") if not isinstance(wrapper, AbstractSerializer): raise TypeError(f"Wrapper type is unknown: {type(wrapper)}") return wrapper.serialize(obj)
[docs]@deprecated def unpack( packed: Union[bytes, _PackedSequenceT], wrapper: AbstractSerializer[Any], ) -> Union[Dict[str, Any], List[Any], Any]: """Unpack a serialized thing back into a dict/list or a Python basic type. .. deprecated:: 2.0.0 Use ``<wrapper>.deserialize`` directly instead. .. customtox-exclude:: Parameters ---------- packed : bytes or sequence of them A list of RLP encoded or pure bytes (may be nested). wrapper : AbstractSerializer[Any] The Wrapper. Returns ------- Dict[str, Any] or List[Any] or Any dict/list if the wrapper instruction is dict/list, Python basic type if input is bytes. Raises ------ DeserializationError If data cannot be deserialized using specified codec. TypeError If wrapper type is unknown. """ warnings.warn( "Function 'unpack' is deprecated. Use '<wrapper>.deserialize' instead." ) if not isinstance(wrapper, AbstractSerializer): raise TypeError("Wrapper type is unknown.") return wrapper.deserialize(packed)
def pretty_print( packed: Union[bytes, _PackedSequenceT], indent: int = 0 ) -> None: # pragma: no cover """Pretty print the bytes into hex, indenting nested structures. Parameters ---------- packed : bytes or sequence of them Data to print (may be nested). indent : int, default: 0 Indent of topmost object, in spaces. Returns ------- None """ # indent of items internal_indent = 2 # bytes? Direct print it. if isinstance(packed, (bytes, bytearray)): print(" " * (indent) + (packed.hex() or "(empty byte[])")) return # list? elif isinstance(packed, Iterable): # mypy isn't smart enough to deduce this from first `if`-branch assert not isinstance(packed, (bytes, bytearray)) print(" " * (indent) + "[") for each in packed: pretty_print(each, indent + internal_indent) print(" " * (indent) + "]")
[docs]class ComplexCodec: """Wrapper around :class:`BaseWrapper` that implements RLP encoding. Abstract layer to join serialization and encoding (and reverse operations) together. """ wrapper: AbstractSerializer[Any] """:class:`BaseWrapper` or :class:`ScalarKind` to use for serialization.""" def __init__(self, wrapper: AbstractSerializer[Any]) -> None: self.wrapper = wrapper
[docs] def encode(self, data: Any) -> bytes: """Serialize and RLP-encode given high-level data to bytes.""" packed = self.wrapper.serialize(data) return rlp_encode(packed)
[docs] def decode(self, data: bytes) -> Any: """RLP-decode and deserialize given bytes into higher-level structure.""" to_be_unpacked = rlp_decode(data) return self.wrapper.deserialize(to_be_unpacked)