r"""ABI encoding module."""
import os
import re
import sys
import warnings
from abc import ABC, abstractmethod
from collections import namedtuple
from keyword import iskeyword
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generic,
Iterable,
Iterator,
List,
Mapping,
NamedTuple,
NoReturn,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
cast,
overload,
)
import eth_abi
import eth_utils
import solcx
import voluptuous
from voluptuous import Schema
from thor_devkit.cry import keccak256
from thor_devkit.cry.utils import _with_doc_mro, izip
from thor_devkit.deprecation import deprecated_to_property
if sys.version_info < (3, 8):
from typing_extensions import Final, Literal, TypedDict
else:
from typing import Final, Literal, TypedDict
if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias
if sys.version_info < (3, 11):
from typing_extensions import NotRequired
else:
from typing import NotRequired
__all__ = [
# Main
"Function",
"Constructor",
"Event",
"Coder",
# Types
"_ParameterT",
"StateMutabilityT",
"FuncParameterT",
"FunctionT",
"ConstructorT",
"EventParameterT",
"EventT",
# Schemas
"MUTABILITY",
"FUNC_PARAMETER",
"FUNCTION",
"CONSTRUCTOR",
"EVENT_PARAMETER",
"EVENT",
# Other
"calc_event_topic",
"calc_function_selector",
"FunctionResult",
"Encodable",
"FunctionBase",
]
MUTABILITY: Final = Schema(voluptuous.Any("pure", "view", "payable", "nonpayable"))
"""
Validation :external:class:`~voluptuous.schema_builder.Schema`
for ``stateMutability`` parameter.
Must be a string, one of: "pure", "view", "payable", "nonpayable".
:meta hide-value:
.. versionchanged:: 2.0.0
Removed unsupported "constant" option.
"""
StateMutabilityT: TypeAlias = Literal["pure", "view", "payable", "nonpayable"]
"""Literal type of ``stateMutability`` parameter.
Must be a string, one of: "pure", "view", "payable", "nonpayable".
.. versionadded:: 2.0.0
"""
class _ParameterT(TypedDict):
"""Base for parameter of function or event."""
name: str
"""Parameter name."""
type: str # noqa: A003
"""Parameter type."""
FUNC_PARAMETER: Final = Schema(
{
"name": str,
"type": str,
voluptuous.Optional("internalType"): str,
# if the "type" field is "tuple" or "type[]"
voluptuous.Optional("components"): [voluptuous.Self],
},
required=True,
)
"""
Validation :external:class:`~voluptuous.schema_builder.Schema` for function parameter.
:meta hide-value:
"""
[docs]@_with_doc_mro(_ParameterT)
class FuncParameterT(_ParameterT):
"""Type of ABI function parameter.
.. versionadded:: 2.0.0
"""
internalType: NotRequired[str] # noqa: N815
"""InternalType is used for struct name aliases, may be ignored."""
# Recursive types aren't really supported, but do partially work
# This will be expanded a few times and then replaced with Any (deeply nested)
components: NotRequired[Sequence["FuncParameterT"]] # type: ignore[misc]
"""Sequence of components, each must be :class:`FuncParameterT`."""
FUNCTION: Final = Schema(
{
"type": "function",
"name": str,
"stateMutability": MUTABILITY,
"inputs": [FUNC_PARAMETER],
"outputs": [FUNC_PARAMETER],
},
required=True,
extra=voluptuous.REMOVE_EXTRA,
)
"""Validation :external:class:`~voluptuous.schema_builder.Schema` for ABI function.
:meta hide-value:
.. versionchanged:: 2.0.0
Removed not required members which are not produced by solidity compiler
by default, namely ``constant`` and ``payable``.
All non-standard parameters are silently discarded now.
"""
[docs]class FunctionT(TypedDict):
"""Type of ABI function dictionary representation.
.. versionadded:: 2.0.0
"""
type: Literal["function"] # noqa: A003
"""Always ``function``."""
name: str
"""Function name."""
stateMutability: StateMutabilityT # noqa: N815
r"""Mutability (pure, view, payable or nonpayable)."""
inputs: Sequence["FuncParameterT"]
"""Function parameters."""
outputs: Sequence["FuncParameterT"]
"""Function returns."""
CONSTRUCTOR: Final = Schema(
{
"type": "constructor",
"stateMutability": MUTABILITY,
"inputs": [FUNC_PARAMETER],
},
required=True,
extra=voluptuous.REMOVE_EXTRA,
)
"""Validation :external:class:`~voluptuous.schema_builder.Schema` for ABI constructor.
Constructor is a special function case that doesn't produce outputs and is unnamed.
:meta hide-value:
.. versionadded:: 2.0.0
"""
[docs]class ConstructorT(TypedDict):
"""Type of ABI function dictionary representation.
.. versionadded:: 2.0.0
"""
type: Literal["constructor"] # noqa: A003
"""Always ``function``."""
stateMutability: StateMutabilityT # noqa: N815
r"""Mutability (pure, view, payable or nonpayable)."""
inputs: Sequence["FuncParameterT"]
"""Constructor parameters."""
EVENT_PARAMETER: Final = Schema(
{
"name": str,
"type": str,
voluptuous.Optional("components"): list,
"indexed": bool,
voluptuous.Optional("internalType"): str, # since 0.5.11+
},
required=True,
)
"""Validation :external:class:`~voluptuous.schema_builder.Schema` for event parameter.
:meta hide-value:
"""
[docs]@_with_doc_mro(_ParameterT)
class EventParameterT(_ParameterT):
"""Type of ABI event parameter.
.. versionadded:: 2.0.0
"""
indexed: bool
"""Whether parameter is indexed."""
internalType: NotRequired[str] # noqa: N815
"""InternalType is used for struct name aliases, may be ignored."""
# Recursive types aren't really supported, but do partially work
# This will be expanded a few times and then replaced with Any (deeply nested)
components: NotRequired[Sequence["EventParameterT"]] # type: ignore[misc]
"""Sequence of components, each must be :class:`EventParameterT`."""
EVENT: Final = Schema(
{
"type": "event",
"name": str,
voluptuous.Optional("anonymous"): bool,
"inputs": [EVENT_PARAMETER],
}
)
"""Validation :external:class:`~voluptuous.schema_builder.Schema` for ABI event.
:meta hide-value:
"""
[docs]class EventT(TypedDict):
"""Type of ABI event dictionary representation.
.. versionadded:: 2.0.0
"""
type: Literal["event"] # noqa: A003
"""Always ``event``."""
name: str
"""Event name."""
inputs: Sequence["EventParameterT"]
"""Event inputs."""
anonymous: NotRequired[bool]
"""Whether event is anonymous (does not include signature in ``topic``)."""
if TYPE_CHECKING:
base = NamedTuple("base", [])
else:
base = object
[docs]class FunctionResult(base):
"""Mixin for :class:`~typing.NamedTuple` with convenience methods.
It is returned from :meth:`Event.decode` and :meth:`Function.decode`.
When obtained from ``decode`` method of :class:`Function` or :class:`Event`,
this class will contain decoded parameters. They can be obtained either by name
or by numeric index as from plain tuples.
.. versionadded:: 2.0.0
Warning
-------
Names of result items can slightly differ from names in definition.
See details below.
See Also
--------
:meth:`FunctionResult.name_to_identifier`: Details of names changing.
:meth:`Function.decode`: for examples of items access
"""
[docs] def to_dict(self) -> Dict[str, Any]:
"""Return dictionary representation (recursively).
Returns
-------
Dict[str, Any]
Dictionary of form ``{name: value}``
(all inner namedtuples are converted too)
Note
----
This method reverts name changing, except empty strings.
Unnamed parameters will be still represented as ``ret_{i}``,
while python keywords are restored (so ``from_`` is again ``from`` key).
"""
return {
self.name_from_identifier(k): (
v.to_dict()
if isinstance(v, FunctionResult)
else ([v_.to_dict() for v_ in v] if isinstance(v, list) else v)
)
for k, v in self._asdict().items()
}
def __getattr__(self, name: str) -> NoReturn:
"""Dot attribute access (if not found).
This is needed to make mypy happy with mix of this and dynamic namedtuple.
We could use a mypy plugin to resolve names dynamically, but it is too
difficult with small benefits. Now any attribute access is allowed,
but all types are Any. If type-checking is very important, make sure to
`assert` proper types to narrow them.
"""
raise AttributeError(f"{self!r} does not have attribute '{name}'.")
[docs] @staticmethod
def name_to_identifier(word: str, position: int = 0) -> str:
"""Convert given word to valid python identifier.
It assumes that ``word`` is a valid ``solidity`` identifier or empty string.
The following rules apply:
- Empty string are converted to ``f"ret_{position}"``
- Python keyword (maybe already with underscores at the end)
gets underscore (``_``) appended
- All other words are returned unchanged.
Parameters
----------
word: str
Solidity identifier to make compatible.
position: int
Arbitrary integer, unique for your collection
(different for different calls).
Returns
-------
str
Valid python identifier.
Raises
------
ValueError
If given string is not a valid solidity identifier.
Examples
--------
>>> FunctionResult.name_to_identifier('foo')
'foo'
>>> FunctionResult.name_to_identifier('')
'ret_0'
>>> FunctionResult.name_to_identifier('', 1)
'ret_1'
>>> FunctionResult.name_to_identifier('for')
'for_'
>>> FunctionResult.name_to_identifier('from_')
'from__'
>>> FunctionResult.name_to_identifier('1f')
Traceback (most recent call last):
ValueError: Invalid identifier given: '1f'
"""
if not word:
return f"ret_{position}"
if not word.isidentifier():
raise ValueError(f"Invalid identifier given: '{word}'")
if iskeyword(word.rstrip("_")):
return f"{word}_"
return word
[docs] @staticmethod
def name_from_identifier(word: str) -> str:
r"""Reverse conversion to valid python identifier.
It assumes that ``word`` was a result of
:meth:`FunctionResult.name_to_identifier`.
The following rules apply:
- Word that are of form ``keyword(_)+`` (with at least one
underscore ``_`` at the end) lose one underscore
- All other words are returned unchanged.
Parameters
----------
word: str
Identifier to reverse.
Returns
-------
str
Valid solidity identifier.
Examples
--------
>>> FunctionResult.name_from_identifier('foo')
'foo'
>>> FunctionResult.name_from_identifier('ret_0')
'ret_0'
>>> FunctionResult.name_from_identifier('for_')
'for'
>>> FunctionResult.name_from_identifier('from__')
'from_'
"""
if word.endswith("_") and iskeyword(word.rstrip("_")):
return word[:-1]
return word
[docs]def calc_function_selector(abi_json: FunctionT) -> bytes:
"""Calculate the function selector (4 bytes) from the ABI json."""
f = FUNCTION(abi_json)
return eth_utils.function_abi_to_4byte_selector(f)
[docs]def calc_event_topic(abi_json: EventT) -> bytes:
"""Calculate the event log topic (32 bytes) from the ABI json."""
e = EVENT(abi_json)
return eth_utils.event_abi_to_log_topic(e)
[docs]class Coder:
"""Convenient wrapper to namespace encoding functions."""
[docs] @staticmethod
def encode_list(types: Sequence[str], values: Sequence[Any]) -> bytes:
"""Encode a sequence of values, into a single bytes."""
return eth_abi.encode_abi(types, values)
[docs] @staticmethod
def decode_list(types: Sequence[str], data: bytes) -> List[Any]:
"""Decode the data, back to a ``(...)`` tuple."""
return list(eth_abi.decode_abi(types, data))
[docs] @staticmethod
def encode_single(t: str, value: Any) -> bytes:
"""Encode value of type ``t`` into single bytes."""
return Coder.encode_list([t], [value])
[docs] @staticmethod
def decode_single(t: str, data: bytes) -> Any:
"""Decode data of type ``t`` back to a single object."""
return Coder.decode_list([t], data)[0]
# The first should be right, but results in a crash.
# See https://github.com/python/mypy/issues/8320
# _ParamT = TypeVar("_ParamT", EventParameterT, FuncParameterT)
_ParamT = TypeVar("_ParamT", bound=_ParameterT)
_BaseT = TypeVar("_BaseT")
_T = TypeVar("_T")
_Self = TypeVar("_Self", bound="Encodable[Any]")
if sys.version_info >= (3, 9) or TYPE_CHECKING:
_PathT = Union[str, os.PathLike[str]]
else:
_PathT = Union[str, os.PathLike]
class _WithName:
_definition: Union[EventT, FunctionT]
@property
def name(self) -> str:
"""Get name of object.
.. versionadded:: 2.0.0
"""
return self._definition["name"]
@deprecated_to_property
def get_name(self) -> str:
"""Get name of object.
.. customtox-exclude::
.. deprecated:: 2.0.0
Use :attr:`name` property instead.
"""
return self.name
[docs]class Encodable(Generic[_ParamT], ABC):
"""Base class for :class:`Function` and :class:`Event`.
.. versionadded:: 2.0.0
"""
_definition: Union[FunctionT, ConstructorT, EventT]
@abstractmethod
def __init__(self, definition: Any) -> None:
raise NotImplementedError()
[docs] @abstractmethod
def encode(
self, __parameters: Sequence[Any]
) -> Union[bytes, str, List[Optional[bytes]]]:
"""Encode parameters into bytes."""
raise NotImplementedError()
[docs] @abstractmethod
def decode(self, __data: bytes) -> FunctionResult:
"""Decode data from bytes to namedtuple."""
raise NotImplementedError()
[docs] @classmethod
def make_proper_type(cls, elem: _ParamT) -> str:
"""Extract type string (inline tuples) from JSON."""
return eth_utils.abi.collapse_if_tuple(dict(elem))
@staticmethod
def _make_output_namedtuple_type(
name: str, types: Iterable[_ParamT]
) -> Type[FunctionResult]:
top_names = [
FunctionResult.name_to_identifier(t["name"], i) for i, t in enumerate(types)
]
return type(name, (namedtuple(name, top_names), FunctionResult), {})
@classmethod
def _demote_type(cls, typeinfo: _ParamT) -> Tuple[_ParamT, bool]:
# We don't have to support nested stuff like (uint256, bool[4])[],
# because type in JSON will be tuple[], uint256 and bool[4] in this case
# without nesting in string
type_ = typeinfo["type"]
new_type_ = re.sub(r"(\[\d*\])$", r"", type_)
if new_type_ == type_:
return typeinfo.copy(), False
new_type = typeinfo.copy()
new_type["type"] = new_type_
return new_type, True
[docs] @classmethod
def apply_recursive_names(
cls,
value: Any,
typeinfo: _ParamT,
chain: Optional[Sequence[str]] = None,
) -> Union[FunctionResult, List[FunctionResult], Any]:
"""Build namedtuple from values.
.. customtox-exclude::
"""
if not typeinfo["type"].startswith("tuple"):
return value
chain = [*(chain or []), typeinfo["name"].title() or "NoName"]
new_type, demoted = cls._demote_type(typeinfo)
if demoted:
return [cls.apply_recursive_names(v, new_type, chain[:-1]) for v in value]
components = cast(List[_ParamT], typeinfo.get("components", []))
NewType = cls._make_output_namedtuple_type("_".join(chain), components)
return NewType(
*(
cls.apply_recursive_names(v, t, chain)
for t, v in izip(components, value)
)
)
@classmethod
def _normalize_values_dict(
cls,
values: Mapping[str, Any],
expected: Union[_ParamT, Sequence[_ParamT]],
) -> Iterator[Any]:
assert isinstance(values, Mapping)
if isinstance(expected, Mapping):
components = expected.get("components", [])
else:
components = expected
if len(values) != len(components):
raise ValueError(
f"Invalid keys count, expected {len(components)}, got {len(values)}"
)
for typeinfo in components:
name = typeinfo.get("name")
if not name:
raise ValueError(
"Cannot serialize mapping when some types are unnamed."
)
try:
val = values[name]
except KeyError:
raise ValueError(f"Missing key for output: {name}.")
yield cls._normalize_values(val, typeinfo)
@overload
@classmethod
def _normalize_values(
cls,
values: Mapping[str, Any],
expected: Union[_ParamT, Sequence[_ParamT]],
) -> Tuple[Any, ...]:
...
@overload
@classmethod
def _normalize_values(
cls,
values: Sequence[Any],
expected: Union[_ParamT, Sequence[_ParamT]],
) -> Sequence[Any]:
...
@classmethod
def _normalize_values(
cls,
values: object,
expected: Union[_ParamT, Sequence[_ParamT]],
) -> object:
if isinstance(values, Mapping):
values = tuple(cls._normalize_values_dict(values, expected))
if not (
isinstance(values, Sequence)
# Primary types
and not isinstance(values, (str, bytes, bytearray))
):
return values
if isinstance(expected, Sequence):
return tuple(cls._normalize_values(v, t) for v, t in izip(values, expected))
type_ = expected["type"]
new_type, demoted = cls._demote_type(expected)
if demoted:
return tuple(cls._normalize_values(v, new_type) for v in values)
elif "tuple" in type_:
components = cast(List[_ParamT], expected.get("components", []))
assert components, "Missing components for tuple."
return tuple(
cls._normalize_values(v, t) for v, t in izip(values, components)
)
else:
# Give up, maybe it is inline type like {'type': '(str,int)'}
return tuple(values)
@classmethod
def _to_final_type(
cls, name: str, values: Iterable[Any], types: Iterable[_ParamT]
) -> FunctionResult:
NewType = cls._make_output_namedtuple_type(name, types)
return NewType(
*(
cls.apply_recursive_names(value, typeinfo)
for typeinfo, value in izip(types, values)
)
)
[docs] @classmethod
def from_solidity(
cls: Type[_Self],
*,
text: Optional[str] = None,
file: Optional[_PathT] = None,
name: Optional[str] = None,
version: Optional[str] = None,
) -> _Self:
"""Instantiate :class:`Encodable` from solidity definition.
.. versionadded:: 2.0.0
Parameters
----------
text: str or None (keyword-only)
Program text.
file: os.PathLike or Path or None (keyword-only)
File with program source.
name: str or None
Name of encodable to extract.
version: str or None (keyword-only)
Solidity version (supported by :func:`~solcx.install_solc`)
or ``None`` to use default.
Raises
------
ValueError
If required type (event or function) cannot be uniquely extracted.
:exc:`~solcx.exceptions.SolcError`
If input is not a valid solidity code.
See Also
--------
:external+solcx:doc:`index`: underlying library reference.
"""
def compile_() -> Dict[str, Any]:
if file is not None:
return solcx.compile_files(
[file], output_values=["abi"], solc_version=version
)
elif text is not None:
return solcx.compile_source(
text, output_values=["abi"], solc_version=version
)
else: # pragma: no cover
raise TypeError("Please specify either file or text.")
try:
result = compile_()
except solcx.exceptions.SolcNotInstalled:
solcx.install_solc(version or "latest")
result = compile_()
all_items = [e for g in result.values() for e in g["abi"]] # Flatten
given = [e for e in all_items if e["type"] == cls.__name__.lower()]
if name is not None:
given = [e for e in given if e.get("name") == name]
if not given:
raise ValueError("Missing value of expected type.")
elif len(given) > 1:
raise ValueError(
f"Ambiguous input: more than one {cls.__name__.lower()} given."
)
return cls(given[0])
[docs]class FunctionBase(Encodable[FuncParameterT]):
"""Base class for ABI functions (function itself and constructor).
.. versionadded:: 2.0.0
"""
_definition: Union[FunctionT, ConstructorT]
[docs] def encode(self, parameters: Union[Sequence[Any], Mapping[str, Any]]) -> bytes:
r"""Encode the parameters according to the function definition.
Parameters
----------
parameters : Sequence[Any] or Mapping[str, Any]
A list of parameters waiting to be encoded,
or a mapping from names to values.
Returns
-------
bytes
Encoded value
"""
inputs = self._definition["inputs"]
my_types = [self.make_proper_type(x) for x in inputs]
norm_parameters = self._normalize_values(parameters, inputs)
return self.selector + Coder.encode_list(my_types, norm_parameters)
[docs] def decode_parameters(self, value: bytes) -> FunctionResult:
"""Decode parameters back to values.
.. versionadded:: 2.0.0
Parameters
----------
value: bytes
Data to decode.
Returns
-------
FunctionResult
Decoded values.
"""
my_types = [self.make_proper_type(x) for x in self._definition["inputs"]]
# Strip signature
result_list = Coder.decode_list(my_types, value[len(self.selector) :])
return self._to_final_type("InType", result_list, self._definition["inputs"])
@property
@abstractmethod
def selector(self) -> bytes:
"""Selector to prepend to encoded data."""
raise NotImplementedError()
[docs]class Constructor(FunctionBase):
"""ABI constructor function.
.. versionadded:: 2.0.0
Examples
--------
>>> body = {
... 'type': 'constructor',
... 'inputs': [{'type': 'int', 'name': 'x'}],
... 'stateMutability': 'nonpayable',
... }
>>> Constructor(body) # doctest:+ELLIPSIS
<thor_devkit.abi.Constructor object at ...>
Or create from contract:
>>> contract = r'contract A { constructor(int x) {} }'
>>> Constructor.from_solidity(text=contract) # doctest:+ELLIPSIS
<thor_devkit.abi.Constructor object at ...>
"""
def __init__(self, definition: ConstructorT) -> None:
"""Initialize a constructor by definition.
Parameters
----------
definition : ConstructorT
A dict with style of :const:`CONSTRUCTOR`
"""
self._definition: ConstructorT = CONSTRUCTOR(definition) # Protect.
@property
def selector(self) -> bytes:
"""Empty bytes, because constructor is unnamed."""
return b""
[docs] def decode(self, data: bytes) -> NoReturn:
"""Constructor does not have outputs, so nothing to decode."""
raise AttributeError("Constructor cannot have outputs!")
_dummy = object()
[docs]class Function(_WithName, FunctionBase):
"""ABI Function."""
def __init__(self, definition: FunctionT) -> None:
"""Initialize a function by definition.
.. versionchanged:: 2.0.0
Argument renamed from ``f_definition`` to ``definition``.
Parameters
----------
definition : FunctionT
A dict with style of :const:`FUNCTION`
"""
self._definition: FunctionT = FUNCTION(definition) # Protect.
self._selector: bytes = calc_function_selector(self._definition)
@property
def selector(self) -> bytes:
"""First 4 bytes of function signature hash.
.. versionadded:: 2.0.0
"""
return self._selector
@overload
def encode(
self, parameters: Union[Sequence[Any], Mapping[str, Any]], to_hex: Literal[True]
) -> str:
...
@overload
def encode(
self,
parameters: Union[Sequence[Any], Mapping[str, Any]],
to_hex: Literal[False] = ...,
) -> bytes:
...
[docs] def encode(
self,
parameters: Union[Sequence[Any], Mapping[str, Any]],
to_hex: object = _dummy,
) -> Union[bytes, str]:
r"""Encode the parameters according to the function definition.
.. versionchanged:: 2.0.0
parameter ``to_hex`` is deprecated, use ``"0x" + result.hex()``
directly instead.
Parameters
----------
parameters : Sequence[Any] or Mapping[str, Any]
A list of parameters waiting to be encoded,
or a mapping from names to values.
to_hex : bool, default: False
If the return should be ``0x...`` hex string
Returns
-------
bytes
By default or if ``to_hex=False`` was passed.
str
If ``to_hex=True`` was passed.
Examples
--------
Encode sequence:
>>> func = Function({
... 'inputs': [{'internalType': 'string', 'name': '', 'type': 'string'}],
... 'outputs': [],
... 'name': 'myFunction',
... 'stateMutability': 'pure',
... 'type': 'function',
... })
>>> enc = func.encode(['foo'])
>>> assert enc == (
... func.selector
... + b'\x20'.rjust(32, b'\x00') # Address of argument
... + b'\x03'.rjust(32, b'\x00') # Length
... + b'foo'.ljust(32, b'\x00') # String itself
... )
Encode mapping:
>>> func = Function({
... 'inputs': [{'internalType': 'string', 'name': 'arg', 'type': 'string'}],
... 'outputs': [],
... 'name': 'myFunction',
... 'stateMutability': 'pure',
... 'type': 'function',
... })
>>> enc = func.encode({'arg': 'foo'})
>>> assert enc == (
... func.selector
... + b'\x20'.rjust(32, b'\x00') # Address of argument
... + b'\x03'.rjust(32, b'\x00') # Length
... + b'foo'.ljust(32, b'\x00') # String itself
... )
"""
my_bytes = super().encode(parameters)
if to_hex is not _dummy:
warnings.warn(
DeprecationWarning(
"to_hex parameter is deprecated. "
"Use ``'0x' + output.hex()`` instead to replicate that behaviour"
)
)
if to_hex and to_hex is not _dummy:
return "0x" + my_bytes.hex()
else:
return my_bytes
[docs] def decode(self, output_data: bytes) -> FunctionResult:
"""Decode function call output data back into human readable results.
The result is a dynamic subclass of
:class:`typing.NamedTuple` (:func:`collections.namedtuple` return type)
and :class:`FunctionResult`
.. versionchanged:: 2.0.0
Return type is not a dict anymore.
Parameters
----------
output_data : bytes
Data to decode.
Returns
-------
FunctionResult
Decoded data.
Examples
--------
>>> data = {
... "inputs": [],
... "name": "getStr",
... "outputs": [{"name": "memory", "type": "string"}],
... "stateMutability": "pure",
... "type": "function",
... }
>>> func = Function(data)
>>> memory = b"Hello world!" # encoded string
>>> binary = bytes.fromhex(
... "20".rjust(64, "0") # address of first argument
... + hex(len(memory))[2:].rjust(64, "0") # length of string
... + memory.hex().ljust(64, "0") # content
... )
>>> result = func.decode(binary)
>>> result.memory # Access by name
'Hello world!'
>>> result[0] # Access by index
'Hello world!'
>>> result.to_dict() # Convert to dictionary
{'memory': 'Hello world!'}
With unnamed attributes:
>>> data = {
... "inputs": [],
... "name": "getBool",
... "outputs": [{"internalType": "bool", "name": "", "type": "bool"}],
... "stateMutability": "pure",
... "type": "function",
... }
>>> func = Function(data)
>>> result = func.decode(bytes.fromhex("1".rjust(64, "0")))
>>> result.ret_0 # Access by name
True
>>> result[0] # Access by index
True
>>> result.to_dict() # Convert to dictionary
{'ret_0': True}
"""
outputs = self._definition["outputs"]
my_types = [self.make_proper_type(x) for x in outputs]
result_list = Coder.decode_list(my_types, output_data)
return self._to_final_type("OutType", result_list, self._definition["outputs"])
[docs] def encode_outputs(self, values: Union[Sequence[Any], Mapping[str, Any]]) -> bytes:
"""Encode the return values according to the function definition.
.. versionadded:: 2.0.0
Parameters
----------
values : Sequence[Any] or Mapping[str, Any]
A list of parameters waiting to be encoded,
or a mapping from names to values.
Returns
-------
bytes
Encoded output values.
Raises
------
ValueError
If mapping was given for unnamed parameters
or mapping keys are not the same as output names.
"""
outputs = self._definition["outputs"]
my_types = [self.make_proper_type(x) for x in outputs]
return Coder.encode_list(my_types, self._normalize_values(values, outputs))
[docs] @classmethod
def from_solidity(
cls,
*,
text: Optional[str] = None,
file: Optional[_PathT] = None,
name: Optional[str] = None,
version: Optional[str] = None,
) -> "Function":
"""Instantiate :class:`Function` from solidity definition.
.. versionadded:: 2.0.0
Parameters
----------
text: str or None (keyword-only)
Program text.
file: os.PathLike or Path or None (keyword-only)
File with program source.
name: str or None
Name of function to select. Do not filter by name if ``None``.
version: str or None (keyword-only)
Solidity version (supported by :func:`~solcx.install_solc`)
or ``None`` to use default.
Raises
------
ValueError
If required type (event or function) cannot be uniquely extracted.
:exc:`~solcx.exceptions.SolcError`
If input is not a valid solidity code.
See Also
--------
:external+solcx:doc:`index`: underlying library reference.
Examples
--------
>>> from pprint import pprint
>>> contract = '''
... contract A {
... function f(uint x) public returns(bool) {}
... }
... '''
>>> func = Function.from_solidity(text=contract)
>>> pprint(func._definition)
{'inputs': [{'internalType': 'uint256', 'name': 'x', 'type': 'uint256'}],
'name': 'f',
'outputs': [{'internalType': 'bool', 'name': '', 'type': 'bool'}],
'stateMutability': 'nonpayable',
'type': 'function'}
No matching function:
>>> Function.from_solidity(text='contract A { event E(int x); }')
Traceback (most recent call last):
ValueError: Missing value of expected type.
Many matching functions:
>>> contract = '''
... contract A {
... function f1(int x) public {}
... function f2() public {}
... }
... '''
>>> Function.from_solidity(text=contract)
Traceback (most recent call last):
ValueError: Ambiguous input: more than one function given.
Many matching functions, select by name:
>>> contract = '''
... contract A {
... function f1(int x) public {}
... function f2() public {}
... }
... '''
>>> func = Function.from_solidity(text=contract, name='f2')
>>> pprint(func._definition)
{'inputs': [],
'name': 'f2',
'outputs': [],
'stateMutability': 'nonpayable',
'type': 'function'}
Syntax error:
>>> Function.from_solidity(
... text='contract A { function x() {} }'
... ) # doctest:+IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
solcx.exceptions.SolcError: An error occurred during execution
"""
return super().from_solidity(text=text, file=file, name=name, version=version)
[docs] @deprecated_to_property
def get_selector(self) -> bytes:
"""First 4 bytes of function signature hash.
.. customtox-exclude::
.. deprecated:: 2.0.0
Use :attr:`selector` property instead.
"""
return self.selector
[docs]class Event(_WithName, Encodable[EventParameterT]):
"""ABI Event."""
def __init__(self, definition: EventT) -> None:
"""Initialize an Event with definition.
.. versionchanged:: 2.0.0
Argument renamed from ``e_definition`` to ``definition``.
Parameters
----------
e_definition : EventT
A dict with style of :const:`EVENT`.
Raises
------
ValueError
If number of indexed parameters exceeds the limit.
Invalid
If given definition is malformed.
"""
self._definition: EventT = EVENT(definition)
self._signature: bytes = calc_event_topic(self._definition)
self.indexed_params: List[EventParameterT] = [
x for x in self._definition["inputs"] if x["indexed"]
]
if len(self.indexed_params) - int(self.is_anonymous) > 3:
raise ValueError("Too much indexed parameters!")
self.unindexed_params: List[EventParameterT] = [
x for x in self._definition["inputs"] if not x["indexed"]
]
@property
def is_anonymous(self) -> bool:
"""Whether this event is anonymous.
.. versionadded:: 2.0.0
"""
return self._definition.get("anonymous", False)
@property
def signature(self) -> bytes:
"""First 4 bytes of event signature hash.
.. versionadded:: 2.0.0
"""
return self._signature
[docs] @classmethod
def is_dynamic_type(cls, t: str) -> bool:
"""Check if the input type requires hashing in indexed parameter.
All bytes, strings and dynamic arrays are dynamic, plus all structs and
fixed-size arrays are hashed (see `Specification`_).
""" # Reference is defined in `abi.rst`
return t in {"bytes", "string"} or "[" in t or t.startswith("tuple")
@staticmethod
def _strip_dynamic_part(type_: str) -> str:
return type_.split("[")[0]
[docs] @staticmethod
def pad(
data: Union[Sequence[bytes], bytes],
mod: int = 32,
to: Literal["r", "l"] = "l",
) -> bytes:
r"""Join sequence of bytes together and pad to multiple of ``mod``.
.. versionadded:: 2.0.0
Parameters
----------
data: bytes or Sequence[bytes]
Data to process.
mod: int, default: 32
Length unit (bytes are padded to multiple of this parameter)
to: Literal["r", "l"]
Pad to left or to right.
Returns
-------
bytes
Given sequence joined and padded to multiple of ``mod``.
Examples
--------
>>> Event.pad(b'foo', 32, 'l').hex()
'666f6f0000000000000000000000000000000000000000000000000000000000'
>>> Event.pad(b'\x07', 16, 'r').hex()
'00000000000000000000000000000007'
>>> Event.pad([b'foo', b'bar'], 32, 'l').hex()
'666f6f6261720000000000000000000000000000000000000000000000000000'
>>> Event.pad([b'\x07', b'\x04'], 16, 'r').hex()
'00000000000000000000000000000704'
"""
if not isinstance(data, (bytes, bytearray)):
data = b"".join(data)
length = len(data)
missing = (mod * (length // mod + 1) - length) % mod
if to == "l":
return bytes(data) + missing * b"\x00"
else:
return missing * b"\x00" + bytes(data)
[docs] @classmethod
def dynamic_type_to_topic(cls, type_: EventParameterT, value: Any) -> List[bytes]:
"""Encode single value according to given ``type_``."""
t_type = type_["type"]
new_type, demoted = cls._demote_type(type_)
if demoted:
return [
cls.pad(cls.dynamic_type_to_topic(new_type, v), 32, "l") for v in value
]
if t_type.startswith("tuple"):
return [
cls.pad(cls.dynamic_type_to_topic(t, v), 32, "l")
for t, v in izip(type_["components"], value)
]
if t_type == "string":
assert isinstance(value, str), 'Value of type "string" must be str'
return [value.encode("utf-8")]
elif t_type == "bytes":
assert isinstance(
value, (bytes, bytearray)
), 'Value of type "bytes" must be bytes'
return [value]
else:
return [Coder.encode_single(cls._strip_dynamic_part(t_type), value)]
[docs] def encode(
self, parameters: Union[Mapping[str, Any], Sequence[Any]]
) -> List[Optional[bytes]]:
r"""Assemble indexed keys into topics.
Commonly used to filter out logs of concerned topics, e.g. to filter out
`VIP180 <https://github.com/vechain/VIPs/blob/master/vips/VIP-180.md>`_
transfer logs of a certain wallet, certain amount.
Parameters
----------
parameters : Mapping[str, Any] or Sequence[Any]
A dict/list of indexed parameters of the given event.
Fill in :class:`None` to occupy the position, if you aren't sure
about the value.
Returns
-------
List[bytes or None]
Encoded parameters with :class:`None` preserved from input.
Raises
------
TypeError
Unknown parameters type (neither mapping nor sequence)
ValueError
If there is unnamed parameter in definition and dict of parameters is given,
or if parameters count doesn't match the definition.
Examples
--------
Let's say we have
.. code-block:: text
MyEvent(address from indexed, address to indexed, uint256 value)
Then corresponding event is
>>> event = Event({
... 'inputs': [
... {'name': 'from', 'indexed': True, 'type': 'address'},
... {'name': 'to', 'indexed': True, 'type': 'address'},
... {'name': 'value', 'indexed': False, 'type': 'uint256'},
... ],
... 'name': 'MyEvent',
... 'type': 'event',
... })
We can use it to encode all topics:
>>> address_from = '0x' + 'f' * 40
>>> address_to = '0x' + '9' * 40
>>> enc = event.encode([address_from, address_to])
>>> assert tuple(enc) == (
... event.signature,
... bytes.fromhex(hex(int(address_from, 16))[2:].rjust(64, '0')),
... bytes.fromhex(hex(int(address_to, 16))[2:].rjust(64, '0')),
... )
Note the interesting conversion here: ``address`` is equivalent to ``uint160``,
so one would expect just ``bytes.fromhex(address_from[2:])``, right?
Indexed event parameters are **always** padded to 32 bytes too, even if they
are shorter. Numbers are padded to the right (or as two's complement,
if negative), strings and bytes - to the left.
Or we can convert only some of params:
>>> enc = event.encode([address_from, None])
>>> assert tuple(enc) == (
... event.signature,
... bytes.fromhex(hex(int(address_from, 16))[2:].rjust(64, '0')),
... None,
... )
Mapping is also accepted for named parameters:
>>> enc = event.encode({'from': address_from, 'to': None})
>>> assert tuple(enc) == (
... event.signature,
... bytes.fromhex(hex(int(address_from, 16))[2:].rjust(64, '0')),
... None,
... )
"""
topics: List[Optional[bytes]] = []
parameters = self._normalize_values(parameters, self.indexed_params)
# not anonymous? topic[0] = signature.
if not self.is_anonymous:
topics.append(self.signature)
def encode(param: Any, definition: EventParameterT) -> bytes:
if self.is_dynamic_type(definition["type"]):
return keccak256(self.dynamic_type_to_topic(definition, param))[0]
else:
return Coder.encode_single(self.make_proper_type(definition), param)
if (
isinstance(parameters, Sequence)
and not isinstance(parameters, (bytes, bytearray))
# bytes are Sequence too!
):
for param, definition in izip(parameters, self.indexed_params):
topics.append(param if param is None else encode(param, definition))
else:
raise TypeError(
f"Expected sequence or mapping of parameters, got: {type(parameters)}"
)
return list(topics)
[docs] def encode_data(self, parameters: Union[Mapping[str, Any], Sequence[Any]]) -> bytes:
"""Encode unindexed parameters into bytes.
.. versionadded:: 2.0.0
Parameters
----------
parameters: Mapping[str, Any] or Sequence[Any]
A dict/list of unindexed parameters of the given event.
Returns
-------
bytes
Encoded result.
Examples
--------
>>> event = Event({
... 'inputs': [
... {'name': 'from', 'indexed': True, 'type': 'address'},
... {'name': 'value', 'indexed': False, 'type': 'uint256'},
... {'name': 'to', 'indexed': True, 'type': 'address'},
... {'name': 'value2', 'indexed': False, 'type': 'uint64'},
... ],
... 'name': 'MyEvent',
... 'type': 'event',
... })
We can use it to encode values as a sequence:
>>> enc = event.encode_data([256, 129]) # 256 == 0x100, 129 == 0x81
>>> assert enc.hex() == '100'.rjust(64, '0') + '81'.rjust(64, '0')
Or as a mapping:
>>> enc = event.encode_data({'value': 256, 'value2': 129})
>>> assert enc.hex() == '100'.rjust(64, '0') + '81'.rjust(64, '0')
"""
parameters = self._normalize_values(parameters, self.unindexed_params)
my_types = list(map(self.make_proper_type, self.unindexed_params))
return Coder.encode_list(my_types, parameters)
[docs] def encode_full(
self, parameters: Union[Mapping[str, Any], Sequence[Any]]
) -> Tuple[List[Optional[bytes]], bytes]:
r"""Encode both indexed and unindexed parameters.
.. versionadded:: 2.0.0
Parameters
----------
parameters: Mapping[str, Any] or Sequence[Any]
A dict/list of all parameters of the given event.
Returns
-------
Tuple[List[bytes or None], bytes]
Tuple
with first item being :meth:`Event.encode` result
and second item being :meth:`Event.encode_data` result.
Raises
------
ValueError
If some required parameters were missing,
of some extra parameters were given.
TypeError
If given parameters are neither sequence nor mapping.
Examples
--------
>>> event = Event({
... 'inputs': [
... {'name': 'from', 'indexed': True, 'type': 'address'},
... {'name': 'value', 'indexed': False, 'type': 'uint256'},
... {'name': 'to', 'indexed': True, 'type': 'address'},
... {'name': 'value2', 'indexed': False, 'type': 'uint64'},
... ],
... 'name': 'MyEvent',
... 'type': 'event',
... })
>>> address_from = '0x' + 'f' * 40
>>> address_to = '0x' + '9' * 40
Expected values:
>>> topics_enc = event.encode([address_from, address_to])
>>> data_enc = event.encode_data([256, 127])
Now with :meth:`Event.encode_full`:
>>> topics, data = event.encode_full([address_from, 256, address_to, 127])
>>> assert topics == topics_enc
>>> assert data == data_enc
Or in mapping form (note that order doesn't matter):
>>> topics, data = event.encode_full({
... 'to': address_to,
... 'value': 256,
... 'value2': 127,
... 'from': address_from,
... })
>>> assert topics == topics_enc
>>> assert data == data_enc
"""
unindexed: Union[List[Any], Dict[str, Any]]
indexed: Union[List[Any], Dict[str, Any]]
if isinstance(parameters, Mapping):
try:
unindexed = {
p["name"]: parameters[p["name"]] for p in self.unindexed_params
}
indexed = {
p["name"]: parameters[p["name"]] for p in self.indexed_params
}
except KeyError as e:
raise ValueError(f"Key '{e.args[0]}' is missing.")
if len(indexed) + len(unindexed) != len(parameters):
raise ValueError("Invalid keys count.")
elif isinstance(parameters, Sequence):
unindexed = [
v
for v, p in izip(parameters, self._definition["inputs"])
if not p["indexed"]
]
indexed = [
v
for v, p in izip(parameters, self._definition["inputs"])
if p["indexed"]
]
else:
raise TypeError("Sequence or mapping of parameters expected.")
return (self.encode(indexed), self.encode_data(unindexed))
[docs] def decode(
self,
data: bytes,
topics: Optional[Sequence[Optional[bytes]]] = None,
) -> FunctionResult:
r"""Decode "data" according to the "topic"s.
One output can contain an array of logs.
.. versionchanged:: 2.0.0
Return type is not a dict anymore.
Parameters
----------
data : bytes
Data to decode.
It should be ``b'\x00'`` for event without unindexed parameters.
topics : Sequence[bytes or None], optional
Sequence of topics.
Fill unknown or not important positions with :class:`None`,
it will be preserved.
:class:`None` is interpreted like list of proper length where
all items (except signature, if needed) are :class:`None`.
Returns
-------
FunctionResult
Decoded data.
Raises
------
ValueError
If topics count does not match the number of indexed parameters.
Notes
-----
One log contains mainly 3 entries:
- For a non-indexed parameters event::
"address": "The emitting contract address",
"topics": [
"signature of event"
],
"data": "0x..." # contains parameters values
- For an indexed parameters event::
"address": "The emitting contract address",
"topics": [
"signature of event",
"indexed param 1",
"indexed param 2",
# ...
# --> max 3 entries of indexed params.
],
"data": "0x..." # remaining unindexed parameters values
If the event is "anonymous" then the signature is not inserted into
the "topics" list, hence ``topics[0]`` is not the signature.
Examples
--------
Decode indexed topic that is not hashed:
>>> event = Event({
... 'inputs': [
... {'indexed': True, 'name': 'a1', 'type': 'bool'},
... ],
... 'name': 'MyEvent',
... 'type': 'event',
... })
>>> topics = [
... event.signature, # Not anonymous
... b'\x01'.rjust(32, b'\x00'), # True as 32-byte integer
... ]
>>> data = b'\x00' # No unindexed topics
>>> event.decode(data, topics).to_dict()
{'a1': True}
Decode mix of indexed and unindexed parameters:
>>> event = Event({
... 'inputs': [
... {'indexed': True, 'name': 't1', 'type': 'bool'},
... {'indexed': True, 'name': 't2', 'type': 'bool'},
... {'indexed': False, 'name': 'u1', 'type': 'string'},
... ],
... 'name': 'MyEvent',
... 'type': 'event',
... 'anonymous': True,
... })
>>> topics = [
... b'\x01'.rjust(32, b'\x00'), # True as 32-byte integer
... b'\x00'.rjust(32, b'\x00'), # False as 32-byte integer
... ]
>>> data = (
... b''
... + b'\x20'.rjust(32, b'\x00') # address of first argument
... + b'\x03'.rjust(32, b'\x00') # length of b'foo'
... + b'foo'.ljust(32, b'\x00') # b'foo'
... ) # string 'foo' encoded
>>> event.decode(data, topics).to_dict()
{'t1': True, 't2': False, 'u1': 'foo'}
"Decode" hashed topic:
>>> from thor_devkit.cry import keccak256
>>> event = Event({
... 'inputs': [
... {'indexed': True, 'name': 't1', 'type': 'string'},
... ],
... 'name': 'MyEvent',
... 'type': 'event',
... 'anonymous': True,
... })
>>> encoded_topic = b'foo'.ljust(32, b'\x00')
>>> topic = keccak256([encoded_topic])[0]
>>> assert event.decode(b'\x00', [topic]).t1 == topic
Note that we don't get a string as output due to the nature of
indexed parameters.
See Also
--------
:meth:`Function.decode`: for examples of result usage.
"""
indexed_count = len(self.indexed_params)
if topics is None:
topics = [None] * indexed_count
elif not self.is_anonymous:
# if not anonymous, topics[0] is the signature of event.
# we cut it out, because we already have self.signature
if not topics or topics[0] not in {self.signature, None}:
raise ValueError(
"First topic of non-anonymous event must be its signature"
)
_, *topics = topics
# Check topics count
topics_count = len(topics)
if indexed_count != topics_count:
raise ValueError(
f"Invalid topics count: expected {indexed_count}, got {topics_count}."
)
my_types = list(map(self.make_proper_type, self.unindexed_params))
result_list = Coder.decode_list(my_types, data)
unindexed_params = (
self.apply_recursive_names(value, typeinfo)
for typeinfo, value in izip(self.unindexed_params, result_list)
)
inputs = self._definition["inputs"]
topics = iter(topics)
r: List[Any] = []
for each in inputs:
if each["indexed"]:
topic = next(topics)
if self.is_dynamic_type(each["type"]) or topic is None:
r.append(topic)
else:
r.append(Coder.decode_single(each["type"], topic))
else:
r.append(next(unindexed_params))
try:
next(unindexed_params)
except StopIteration:
pass
else: # pragma: no cover
raise ValueError("Wrong unindexed parameters count, internal error.")
NewType = self._make_output_namedtuple_type("OutType", inputs)
return NewType(*r)
[docs] @classmethod
def from_solidity(
cls,
*,
text: Optional[str] = None,
file: Optional[_PathT] = None,
name: Optional[str] = None,
version: Optional[str] = None,
) -> "Event":
"""Instantiate :class:`Event` from solidity definition.
.. versionadded:: 2.0.0
Parameters
----------
text: str or None (keyword-only)
Program text.
file: os.PathLike or Path or None (keyword-only)
File with program source.
name: str or None
Name of event to select. Do not filter by name if ``None``.
version: str or None (keyword-only)
Solidity version (supported by :func:`~solcx.install_solc`)
or ``None`` to use default.
Raises
------
ValueError
If required type (event or function) cannot be uniquely extracted.
:exc:`~solcx.exceptions.SolcError`
If input is not a valid solidity code.
See Also
--------
:external+solcx:doc:`index`: underlying library reference.
Examples
--------
>>> from pprint import pprint
>>> contract = '''
... contract A {
... event E(uint x) anonymous;
... }
... '''
>>> ev = Event.from_solidity(text=contract)
>>> pprint(ev._definition)
{'anonymous': True,
'inputs': [{'indexed': False,
'internalType': 'uint256',
'name': 'x',
'type': 'uint256'}],
'name': 'E',
'type': 'event'}
No matching events:
>>> Event.from_solidity(text='contract A { function f(int x) public {} }')
Traceback (most recent call last):
ValueError: Missing value of expected type.
Many matching events:
>>> contract = '''
... contract A {
... event E1(int x) anonymous;
... event E2() ;
... }
... '''
>>> Event.from_solidity(text=contract)
Traceback (most recent call last):
ValueError: Ambiguous input: more than one event given.
Many matching events, use name:
>>> ev = Event.from_solidity(text=contract, name='E2')
>>> pprint(ev._definition)
{'anonymous': False, 'inputs': [], 'name': 'E2', 'type': 'event'}
Syntax error:
>>> Event.from_solidity(
... text='contract A { event E() {} }'
... ) # doctest:+IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
solcx.exceptions.SolcError: An error occurred during execution
"""
return super().from_solidity(text=text, file=file, name=name, version=version)
[docs] @deprecated_to_property
def get_signature(self) -> bytes:
"""Get signature.
.. customtox-exclude::
.. deprecated:: 2.0.0
Use :attr:`signature` property instead
"""
return self.signature