dazl

Source code for dazl.model.writing

# Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Write-Side types
----------------

The :mod:`dazl.model.writing` module contains the Python classes used on the write-side of the
Ledger API.

.. autoclass:: Command
   :members:

.. autoclass:: CreateCommand
   :members:

.. autoclass:: ExerciseCommand
   :members:
"""
import uuid
import warnings
from datetime import datetime, timedelta
from typing import Any, Collection, Dict, Generic, List, Mapping, Optional, Sequence, TypeVar, Union

from dataclasses import dataclass, fields

from .. import LOG
from .core import ContractId, Party
from .types import Type, TypeReference, UnresolvedTypeReference, TemplateChoice, \
    RecordType, UnsupportedType, VariantType, ContractIdType, ListType, OptionalType, TextMapType, \
    EnumType, scalar_type_dispatch_table, TypeEvaluationContext, type_evaluate_dispatch, \
    TemplateMeta, ChoiceMeta
from .types_store import PackageStore
from ..util.prim_types import DEFAULT_TYPE_CONVERTER
from ..util.typing import safe_cast, safe_optional_cast

TCommand = TypeVar('TCommand')
TValue = TypeVar('TValue')


CommandsOrCommandSequence = Union[None, 'Command', List[Optional['Command']]]
EventHandlerResponse = Union[CommandsOrCommandSequence, 'CommandBuilder', 'CommandPayload']


[docs]class Command: """ Base class for write-side commands. """
[docs]@dataclass(init=False, frozen=True) class CreateCommand(Command): """ A command that creates a contract without any predecessors. .. attribute:: CreateCommand.template Refers to the type of a template. This can be passed in as a ``str`` to the constructor, where it assumed to represent the ID or name of a template. .. attribute:: CreateCommand.arguments The arguments to the create (as a ``dict``). """ __slots__ = ('template', 'arguments') template: Type arguments: Dict[str, Any] def __init__(self, template: 'Union[str, Type]', arguments=None): object.__setattr__(self, 'template', template if isinstance(template, Type) else UnresolvedTypeReference(template)) object.__setattr__(self, 'arguments', arguments or dict())
[docs] def replace(self, template: Union[None, str, Type] = None, arguments=None): """ Create a new :class:`CreateCommand` with the same identifier as this command, but with new values for its parameters. :param template: The new value of the `template` field, or `None` to reuse the existing value. :param arguments: The new value of the `arguments` field, or `None` to reuse the existing value. """ if template is not None: template = template if isinstance(template, Type) \ else UnresolvedTypeReference(template) return CreateCommand( template if template is not None else self.template, arguments if arguments is not None else self.arguments)
def __repr__(self): return f'<create {self.template} {self.arguments}>'
[docs]@dataclass(init=False, frozen=True) class ExerciseCommand(Command): """ A command that exercises a choice on a pre-existing contract. .. attribute:: ExerciseCommand.contract The :class:`ContractId` on which a choice is being exercised. .. attribute:: ExerciseCommand.choice Refers to a choice (either a :class:`ChoiceRef` or a :class:`ChoiceMetadata`). This can be passed in as a ``str`` to the constructor, where it assumed to represent the name of a choice. .. attribute:: ExerciseCommand.arguments The arguments to the exercise choice (as a ``dict``). Note that when an ``ExerciseCommand`` is created, an additional ``template_id`` parameter can be supplied to the constructor to aid in disambiguation of the specific choice being invoked. In some situations involving composite commands, a ``template_id`` must eventually be supplied before a choice can be exercised. If this ``template_id`` is specified, the ``contract`` and ``choice`` are both tagged with this ID. Instance methods: .. automethod:: replace """ __slots__ = ('contract', 'choice', 'arguments') def __init__( self, contract: 'Union[str, ContractId]', choice: str, arguments=None, template_id=None): if isinstance(contract, str): warnings.warn('Untyped ContractId support will be removed with the removal of ' 'the deprecated REST API.', DeprecationWarning, stacklevel=2) with warnings.catch_warnings(): warnings.simplefilter('ignore') contract = ContractId(contract, template_id=template_id) elif template_id is not None: warnings.warn( 'Specifying template_id in an ExerciseChoice is no longer necessary. ' + 'Please avoid specifying it, as this parameter will be removed in the future.', DeprecationWarning, stacklevel=2) elif not isinstance(contract, ContractId): raise ValueError('ContractId expected here') object.__setattr__(self, 'contract', contract) object.__setattr__(self, 'choice', choice) object.__setattr__(self, 'arguments', dict(arguments) if arguments is not None else dict())
[docs] def replace(self, contract=None, choice=None, arguments=None, template_id=None): """ Create a new :class:`ExerciseCommand` with the same identifier as this command, but with new values for its parameters. :param contract: The new value of the `contract` field, or `None` to reuse the existing value. The same type coercion rules used in the constructor apply here. :param choice: The new value of the `choice` field, or `None` to reuse the existing value. The same type coercion rules used in the constructor apply here. :param arguments: The new value of the `choice` field, or `None` to reuse the existing value. :param template_id: The expected template type. """ return ExerciseCommand( contract if contract is not None else self.contract, choice if choice is not None else self.choice, arguments if arguments is not None else self.arguments, template_id)
def __repr__(self): return '<exercise \"{}\" with {} on {}>'.format( self.choice, self.arguments, self.contract.contract_id if hasattr(self.contract, 'contract_id') else self.contract)
@dataclass(frozen=True) class ExerciseByKeyCommand(Command): template: Type contract_key: Any choice: str choice_argument: Mapping[str, Any] @dataclass(frozen=True) class CreateAndExerciseCommand(Command): template: Type arguments: Mapping[str, Any] choice: str choice_argument: Mapping[str, Any] class CommandBuilder: """ Builder class for generating commands to be sent to the ledger. """ @classmethod def coerce(cls, obj, atomic_default=False) -> 'CommandBuilder': """ Create a :class:`CommandBuilder` from the objects that an event handler is allowed to return. :param obj: :param atomic_default: :return: """ if isinstance(obj, CommandBuilder): return obj builder = CommandBuilder(atomic_default=atomic_default) if obj is not None: builder.append(obj) return builder def __init__(self, atomic_default=False): self._atomic_default = atomic_default self._commands = [[]] # type: List[List[Command]] self._defaults = CommandDefaults() def defaults(self, party: Optional[Party] = None, ledger_id: Optional[str] = None, workflow_id: Optional[str] = None, application_id: Optional[str] = None, command_id: Optional[str] = None) -> None: if party is not None: self._defaults.default_party = party if ledger_id is not None: self._defaults.ledger_id = ledger_id if workflow_id is not None: self._defaults.default_workflow_id = workflow_id if application_id is not None: self._defaults.default_application_id = application_id if command_id is not None: self._defaults.default_command_id = command_id def create(self, template, arguments=None) -> 'CommandBuilder': return self.append(create(template, arguments=arguments)) def exercise(self, contract, choice, arguments=None) -> 'CommandBuilder': return self.append(exercise(contract, choice, arguments=arguments)) def create_and_exercise(self, template, create_arguments, choice_name, choice_arguments=None) \ -> 'CommandBuilder': return self.append(create_and_exercise( template, create_arguments, choice_name, choice_arguments)) def append(self, *commands: CommandsOrCommandSequence) -> 'CommandBuilder': """ Append one or more commands, or list of commands to the :class:`CommandBuilder` in flight. This method respects the value of ``atomic_default`` that this object was constructed with. In order to force commands to be submitted either atomically, use :meth:`append_atomically`. To allow these commands to be submitted in parallel use :meth:`append_nonatomically`. :param commands: One or more commands, or list of commands to be submitted to the ledger. :return: This object. """ if self._atomic_default: # a command builder that defaults to being atomic will put all commands in a single # transaction; build on the very first transaction self._commands[0].extend(flatten_command_sequence(commands)) return self else: return self.append_nonatomically(*commands) def append_atomically(self, *commands: Union[Command, Sequence[Command]]) -> 'CommandBuilder': self._commands.extend([flatten_command_sequence(commands)]) return self def append_nonatomically(self, *commands: Union[Command, Sequence[Command]]) -> \ 'CommandBuilder': self._commands.extend([[cmd] for cmd in flatten_command_sequence(commands)]) return self def build(self, defaults: 'Optional[CommandDefaults]' = None, now: Optional[datetime] = None) \ -> 'Collection[CommandPayload]': """ Return a collection of commands. """ if defaults is None: raise ValueError('defaults must currently be specified') command_id = defaults.default_command_id or self._defaults.default_command_id or \ uuid.uuid4().hex return [CommandPayload( party=defaults.default_party or self._defaults.default_party, ledger_id=defaults.default_ledger_id or self._defaults.default_ledger_id, workflow_id=defaults.default_workflow_id or self._defaults.default_workflow_id, application_id=defaults.default_application_id or self._defaults.default_application_id, command_id=command_id, ledger_effective_time=now, maximum_record_time=now + (defaults.default_ttl or self._defaults.default_ttl), commands=commands ) for i, commands in enumerate(self._commands) if commands] def __format__(self, format_spec): if format_spec == 'c': return str(self._commands) else: return repr(self) def __repr__(self): return f'CommandBuilder({self._commands})' def flatten_command_sequence(commands: Sequence[CommandsOrCommandSequence]) -> List[Command]: """ Convert a list of mixed commands, ``None``, and list of commands into an ordered sequence of non-``None`` commands. """ ret = [] # type: List[Command] errors = [] for i, obj in enumerate(commands): if obj is not None: if isinstance(obj, Command): ret.append(obj) else: try: cmd_iter = iter(obj) except TypeError: errors.append(((i,), obj)) continue for j, cmd in enumerate(cmd_iter): if isinstance(cmd, Command): ret.append(cmd) else: errors.append(((i, j), cmd)) if errors: raise ValueError(f'Failed to interpret some elements as Commands in the list: ' f'$[{index}] = {command}' for index, command in errors) return ret @dataclass class CommandDefaults: """ Values to use for a :class:`Command` when no value is specified with the creation of the command. """ default_party: Optional[Party] = None default_ledger_id: Optional[str] = None default_workflow_id: Optional[str] = None default_application_id: Optional[str] = None default_command_id: Optional[str] = None default_ttl: Optional[timedelta] = None @dataclass(frozen=True) class CommandPayload: """ A request to mutate active state of the ledger. .. attribute:: CommandPayload.party The party submitting the request. .. attribute:: CommandPayload.application_id: An optional application ID to accompany the request. .. attribute:: CommandPayload.command_id: A hash that represents the BIM commitment. .. attribute:: CommandPayload.ledger_effective_time: The effective time of this command. Should usually be set to ``datetime.now()``, but may have a different value when the server is operating in static time mode. .. attribute:: CommandPayload.maximum_record_time: The maximum time before the client should consider this command expired. .. attribute:: CommandPayload.commands A sequence of commands to submit to the ledger. These commands are submitted atomically (in other words, they all succeed or they all fail). """ party: Party ledger_id: str workflow_id: str application_id: str command_id: str ledger_effective_time: datetime maximum_record_time: datetime commands: Sequence[Command] def __post_init__(self): missing_fields = [field.name for field in fields(self) if getattr(self, field.name) is None] if missing_fields: raise ValueError(f'Some fields are set to None when they are required: ' f'{missing_fields}') def create(template, arguments=None): from .types_dynamic import NamedRecord, ProxyMeta template_type = type(template) if isinstance(template_type, TemplateMeta): # static codegen, instantiated type if arguments is not None: raise ValueError('arguments cannot be specified with an instantiated template') arguments = template._asdict() template = str(template_type) elif isinstance(template, NamedRecord): # dynamic "codegen", instantiated type if arguments is not None: raise ValueError('arguments cannot be specified with an instantiated template') template, arguments = template.name, template.arguments elif template_type == TemplateMeta: # static codegen, non-instantiated template = str(template) elif isinstance(template_type, ProxyMeta): # dynamic codegen, non-instantiated template = str(template_type) elif not isinstance(template, str): raise ValueError( 'template must be a string name, a template type, or an instantiated template') return CreateCommand(template, arguments) def exercise(contract, choice, arguments=None): from .types_dynamic import NamedRecord, ProxyMeta choice_type = type(choice) if isinstance(choice_type, ChoiceMeta): # static codegen, instantiated type if arguments is not None: raise ValueError('arguments cannot be specified with an instantiated template') arguments = choice._asdict() choice = str(choice_type) elif isinstance(choice, NamedRecord): # dynamic "codegen", instantiated type if arguments is not None: raise ValueError('arguments cannot be specified with an instantiated template') choice, arguments = choice.name, choice.arguments choice_start_idx = choice.rfind('.') if choice_start_idx >= 0: choice = choice[choice_start_idx + 1:] elif choice_type == ChoiceMeta: # static codegen, non-instantiated choice = str(choice) elif isinstance(choice_type, ProxyMeta): # dynamic codegen, non-instantiated choice = str(choice_type) choice_start_idx = choice.rfind('.') if choice_start_idx >= 0: choice = choice[choice_start_idx + 1:] elif not isinstance(choice, str): raise ValueError('choice must be a string name, a template type, ' 'or an instantiated template') return ExerciseCommand(contract, choice, arguments) def exercise_by_key(template, contract_key, choice_name, choice_argument): return ExerciseByKeyCommand(template, contract_key, choice_name, choice_argument) def create_and_exercise(template, create_arguments, choice_name, choice_argument): return CreateAndExerciseCommand(template, create_arguments, choice_name, choice_argument) #################################################################################################### # argument iteration support #################################################################################################### def arg_iter(value): """ Produce an iterator that walks over an argument tree and all of its values, recursing into lists and record/variant fields. """ if isinstance(value, str): # str is a common case, and it's also iterable (which we don't want to exploit here) yield value elif isinstance(value, dict): for sub_value in value.values(): yield sub_value elif hasattr(value, '__iter__'): for sub_value in value: yield sub_value else: yield value class Serializer(Generic[TCommand, TValue]): """ Serializer interface for objects on the write-side of the API. """ def serialize_value(self, type_token: Type, obj: Any) -> TValue: raise NotImplementedError('serialize_value requires an implementation') def serialize_command(self, command: Command) -> TCommand: raise NotImplementedError('serialize_command requires an implementation') class AbstractSerializer(Serializer[TCommand, TValue]): """ Implementation of :class:`Serializer` that helps enforce that all possible cases of type serialization have been implemented. """ def __init__(self, store: PackageStore, type_context: 'Optional[TypeEvaluationContext]' = None): self.store = safe_cast(PackageStore, store) self.type_context = safe_optional_cast(TypeEvaluationContext, type_context) or \ DEFAULT_TYPE_CONVERTER def serialize_value(self, tt: Type, obj: Any) -> TValue: context = TypeEvaluationContext.from_store(self.store) try: return self._serialize_dispatch(context, tt, obj) except: from ..util.fmt_py import python_example_object LOG.warning("Expected something like:") for line in str.splitlines(python_example_object(self.store, tt)): LOG.warning(' %s', line) LOG.warning("But got this instead:") LOG.warning(' %r', obj) raise def serialize_commands(self, commands: Sequence[Command]) -> Sequence[TCommand]: return [self.serialize_command(cmd) for cmd in commands] def serialize_command(self, command: Command) -> TCommand: if isinstance(command, CreateCommand): tt = _resolve_template_type(self.store, command.template) value = self.serialize_value(tt, command.arguments) return self.serialize_create_command(tt, value) elif isinstance(command, ExerciseCommand): template_type_ref = command.contract.template_id choice_name = command.choice choice_opts = self.store.resolve_choice(template_type_ref, choice_name) if len(choice_opts) == 0: msg = f'Could not resolve {template_type_ref} {choice_name} to any valid choices' LOG.error(msg) raise ValueError(msg) if len(choice_opts) > 1: msg = f'Could not uniquely resolve {template_type_ref} {choice_name} ' \ f'to a single valid choice' LOG.error(msg) raise ValueError(msg) tt, choice = next(iter(choice_opts.items())) args = self.serialize_value(choice.type, command.arguments) return self.serialize_exercise_command(command.contract, choice, args) elif isinstance(command, CreateAndExerciseCommand): tt = _resolve_template_type(self.store, command.template) create_value = self.serialize_value(tt, command.arguments) _, choice_info = next(iter(self.store.resolve_choice(tt, command.choice).items())) choice_args = self.serialize_value(choice_info.type, command.choice_argument) return self.serialize_create_and_exercise_command( tt, create_value, choice_info, choice_args) elif isinstance(command, ExerciseByKeyCommand): template, = self.store.resolve_template(command.template) key_value = self.serialize_value(template.key_type, command.contract_key) choices = self.store.resolve_choice(template, command.choice) _, choice_info = next(iter(choices.items())) choice_args = self.serialize_value(choice_info.type, command.choice_argument) return self.serialize_exercise_by_key_command( template.data_type.name, key_value, choice_info, choice_args) else: raise ValueError(f'unknown Command type: {command!r}') def serialize_create_command( self, template_type: RecordType, template_args: TValue) \ -> TCommand: raise NotImplementedError('serialize_create_command requires an implementation') def serialize_exercise_command( self, contract_id: ContractId, choice_info: TemplateChoice, choice_args: TValue) \ -> TCommand: raise NotImplementedError('serialize_exercise_command requires an implementation') def serialize_exercise_by_key_command( self, template_ref: TypeReference, key_arguments: Any, choice_info: TemplateChoice, choice_arguments: Any) -> TCommand: raise NotImplementedError( 'serialize_exercise_by_key_command requires an implementation') def serialize_create_and_exercise_command( self, template_type: RecordType, create_arguments: Any, choice_info: TemplateChoice, choice_arguments: Any) -> TCommand: raise NotImplementedError( 'serialize_create_and_exercise_command requires an implementation') def serialize_unit(self, context: TypeEvaluationContext, obj: Any) -> TValue: raise NotImplementedError('serialize_unit requires an implementation') def serialize_bool(self, context: TypeEvaluationContext, obj: Any) -> TValue: raise NotImplementedError('serialize_bool requires an implementation') def serialize_text(self, context: TypeEvaluationContext, obj: Any) -> TValue: raise NotImplementedError('serialize_bool requires an implementation') def serialize_int(self, context: TypeEvaluationContext, obj: Any) -> TValue: raise NotImplementedError('serialize_bool requires an implementation') def serialize_decimal(self, context: TypeEvaluationContext, obj: Any) -> TValue: raise NotImplementedError('serialize_bool requires an implementation') def serialize_party(self, context: TypeEvaluationContext, obj: Any) -> TValue: raise NotImplementedError('serialize_bool requires an implementation') def serialize_date(self, context: TypeEvaluationContext, obj: Any) -> TValue: raise NotImplementedError('serialize_bool requires an implementation') def serialize_datetime(self, context: TypeEvaluationContext, obj: Any) -> TValue: raise NotImplementedError('serialize_bool requires an implementation') def serialize_timedelta(self, context: TypeEvaluationContext, obj: Any) -> TValue: raise NotImplementedError('serialize_bool requires an implementation') def serialize_contract_id(self, context: TypeEvaluationContext, tt: ContractIdType, obj: Any) \ -> TValue: raise NotImplementedError('serialize_contract_id requires an implementation') def serialize_optional(self, context: TypeEvaluationContext, tt: OptionalType, obj: Any) \ -> TValue: raise NotImplementedError('serialize_optional requires an implementation') def serialize_list(self, context: TypeEvaluationContext, tt: ListType, obj: Any) -> TValue: raise NotImplementedError('serialize_list requires an implementation') def serialize_map(self, context: TypeEvaluationContext, tt: TextMapType, obj: Any) -> TValue: raise NotImplementedError('serialize_map requires an implementation') def serialize_record(self, context: TypeEvaluationContext, tt: RecordType, obj: Any) -> TValue: raise NotImplementedError('serialize_record requires an implementation') def serialize_variant(self, context: TypeEvaluationContext, tt: VariantType, obj: Any) \ -> TValue: raise NotImplementedError('serialize_variant requires an implementation') def serialize_enum(self, context: TypeEvaluationContext, tt: EnumType, obj: Any) -> TValue: raise NotImplementedError('serialize_enum requires an implementation') def serialize_unsupported(self, context: TypeEvaluationContext, tt: UnsupportedType, obj: Any) \ -> TValue: raise NotImplementedError('serialize_unsupported requires an implementation') def _serialize_dispatch(self, context: TypeEvaluationContext, tt: Type, obj: Any) -> TValue: eval_fn = type_evaluate_dispatch( lambda c, st: scalar_type_dispatch_table( lambda: self.serialize_unit(c, obj), lambda: self.serialize_bool(c, obj), lambda: self.serialize_text(c, obj), lambda: self.serialize_int(c, obj), lambda: self.serialize_decimal(c, obj), lambda: self.serialize_party(c, obj), lambda: self.serialize_date(c, obj), lambda: self.serialize_datetime(c, obj), lambda: self.serialize_timedelta(c, obj))(st), lambda c, ct: self.serialize_contract_id(c, ct, obj), lambda c, ot: self.serialize_optional(c, ot, obj), lambda c, lt: self.serialize_list(c, lt, obj), lambda c, mt: self.serialize_map(c, mt, obj), lambda c, rt: self.serialize_record(c, rt, obj), lambda c, vt: self.serialize_variant(c, vt, obj), lambda c, et: self.serialize_enum(c, et, obj), lambda c, ut: self.serialize_unsupported(c, ut, obj)) return eval_fn(context, tt) def _resolve_template_type(store: 'PackageStore', template) -> 'RecordType': candidates = store.resolve_template_type(template) if len(candidates) == 0: msg = f'Could not resolve {template} to any valid types' LOG.error(msg) raise ValueError(msg) elif len(candidates) > 1: msg = f'Could not uniquely resolve {template} to a single valid type' LOG.error(msg) raise ValueError(msg) tt, = candidates.values() if not isinstance(tt, RecordType): msg = f'CreateCommand requires a type that is a record (got {tt} instead)' LOG.error(msg) raise ValueError(msg) return tt