dazl

Source code for dazl.model.reading

# Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# TODO: `automodule reading` directive doesn't appear to work here, have to list each class individually.
"""
Read-Side Types
---------------

This module contains models used on the read-side of the Ledger API.

.. autoclass:: InitEvent
  :members:

.. autoclass::  InitEvent
  :members:

.. autoclass::  OffsetEvent
  :members:

.. autoclass::  ReadyEvent
  :members:

.. autoclass::  ActiveContractSetEvent
  :members:

.. autoclass::  BaseTransactionEvent
  :members:

.. autoclass::  TransactionStartEvent
  :members:

.. autoclass::  TransactionEndEvent
  :members:

.. autoclass::  ContractEvent
  :members:

.. autoclass::  ContractCreateEvent
  :members:

.. autoclass::  ContractExercisedEvent
  :members:

.. autoclass::  ContractArchiveEvent
  :members:

.. autoclass::  PackagesAddedEvent
  :members:

.. autoclass::  TransactionFilter
  :members:

.. autoclass::  EventKey
  :members:

"""

from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Collection, Optional, Sequence, TypeVar, Union, Tuple

from .core import ContractId, ContractData, ContractContextualData, Party
from .lookup import template_reverse_globs, validate_template
from .types import Type, TypeReference
from .types_store import PackageStore


T = TypeVar('T')


@dataclass(frozen=True)
class BaseEvent:
    """
    Superclass of all dazl events.
    """

    client: 'Any'
    party: Optional[Party]
    time: Optional[datetime]
    ledger_id: str
    package_store: PackageStore

    def acs_find_active(self, template: Union[TypeReference, str], match=None):
        return self.client.find_active(template, match)

    def acs_find_by_id(self, cid: Union[str, ContractId]) -> Optional[ContractContextualData]:
        return self.client.find_by_id(cid)

    def acs_find_one(self, template: Union[TypeReference, str], match=None):
        return self.client.find_one(template, match=match)

    def acs_find_historical(self, template: Union[TypeReference, str], match=None):
        return self.client.find_historical(template, match)

    def acs_find_nonempty(self, template: Union[TypeReference, str], match=None):
        return self.client.find_nonempty(template, match=match)

    def __repr__(self):
        fields = ', '.join(f"{k}={v!r}" for k, v in self.__dict__.items()
                           if not k.startswith('_') and k != 'client' and k != 'package_store' and
                           k != 'ledger_id')
        return f'{self.__class__.__name__}({fields})'


[docs]@dataclass(frozen=True) class InitEvent(BaseEvent): """ Event raised when dazl is initialized, but before it has begun reading from the Active Contract Set (ACS). """
[docs]@dataclass(frozen=True) class OffsetEvent(BaseEvent): """ Event raised when dazl is ready to begin processing new events. At this point, the Active Contract Set (ACS) is populated with the current state of the ledger. """ offset: str
[docs]@dataclass(frozen=True) class ReadyEvent(OffsetEvent): """ Event raised when dazl is ready to begin processing new events. At this point, the Active Contract Set (ACS) is populated with the current state of the ledger. """
[docs]@dataclass(frozen=True) class ActiveContractSetEvent(OffsetEvent): """ Event raised on initial read of the active contract set. """ contract_events: 'Sequence[ContractCreateEvent]'
[docs]@dataclass(frozen=True) class BaseTransactionEvent(OffsetEvent): """ Event raised when dazl encounters a new transaction. This is raised before any corresponding :class:`ContractCreateEvent` or :class:`ContractArchiveEvent`. """ command_id: str workflow_id: str
[docs]@dataclass(frozen=True) class TransactionStartEvent(BaseTransactionEvent): """ Event raised when dazl encounters a new transaction. This is raised before any corresponding :class:`ContractCreateEvent` or :class:`ContractArchiveEvent`. """ contract_events: 'Sequence[ContractEvent]'
[docs]@dataclass(frozen=True) class TransactionEndEvent(BaseTransactionEvent): """ Event raised when dazl encounters the end of a transaction. This is raised after any corresponding :class:`ContractCreateEvent` or :class:`ContractArchiveEvent`. """ contract_events: 'Sequence[ContractEvent]'
[docs]@dataclass(frozen=True) class ContractEvent(BaseTransactionEvent): """ Event raised when dazl automation detects a new create or an archive. The Active Contract Set (ACS) reflects this event, as well as all other events that occurred in the same transaction. """ cid: ContractId cdata: ContractData command_id: str workflow_id: str event_id: str witness_parties: Sequence[str]
[docs]@dataclass(frozen=True) class ContractCreateEvent(ContractEvent): """ Event raised when dazl automation detects a contract create. The Active Contract Set (ACS) reflects this event, as well as all other events that occurred in the same transaction. """
[docs]@dataclass(frozen=True) class ContractExercisedEvent(ContractEvent): """ Event raised when dazl automation detects a contract exercised. """ contract_creating_event_id: str choice: str choice_args: Any acting_parties: Sequence[str] consuming: True child_event_ids: Sequence[str]
[docs]@dataclass(frozen=True) class ContractArchiveEvent(ContractEvent): """ Event raised when dazl automation detects a contract archive. The Active Contract Set (ACS) reflects this event, as well as all other events that occurred in the same transaction. """
[docs]@dataclass(frozen=True) class PackagesAddedEvent(BaseEvent): """ Event raised when new packages have been detected. """ initial: bool
def create_dispatch( on_init: Callable[[InitEvent], T], on_ready: Callable[[ReadyEvent], T], on_offset: Callable[[OffsetEvent], T], on_transaction_start: Callable[[TransactionStartEvent], T], on_transaction_end: Callable[[TransactionEndEvent], T], on_contract_created: Callable[[ContractCreateEvent], T], on_contract_exercised: Callable[[ContractExercisedEvent], T], on_contract_archived: Callable[[ContractArchiveEvent], T], on_packages_added: Callable[[PackagesAddedEvent], T]) \ -> Callable[[BaseEvent], T]: def handle(event: BaseEvent) -> T: if isinstance(event, ContractCreateEvent): return on_contract_created(event) elif isinstance(event, ContractExercisedEvent): return on_contract_exercised(event) elif isinstance(event, ContractArchiveEvent): return on_contract_archived(event) elif isinstance(event, TransactionStartEvent): return on_transaction_start(event) elif isinstance(event, TransactionEndEvent): return on_transaction_end(event) elif isinstance(event, ReadyEvent): return on_ready(event) elif isinstance(event, InitEvent): return on_init(event) elif isinstance(event, OffsetEvent): return on_offset(event) elif isinstance(event, PackagesAddedEvent): return on_packages_added(event) else: raise ValueError(f'unknown subclass of BaseEvent: {event!r}') return handle
[docs]@dataclass(frozen=True) class TransactionFilter: ledger_id: str current_offset: Optional[str] destination_offset: Optional[str] templates: Optional[Collection[Type]] max_blocks: Optional[int] party_groups: Optional[Collection[str]]
[docs]class EventKey: from_event = create_dispatch( on_init=lambda _: EventKey.init(), on_ready=lambda _: EventKey.ready(), on_offset=lambda _: EventKey.offset(), on_transaction_start=lambda _: EventKey.transaction_start(), on_transaction_end=lambda _: EventKey.transaction_end(), on_contract_created=lambda event: EventKey.contract_created(False, event.cid.template_id), on_contract_exercised=lambda event: EventKey.contract_exercised( False, event.cid.template_id, event.choice), on_contract_archived=lambda event: EventKey.contract_archived(False, event.cid.template_id), on_packages_added=lambda event: EventKey.packages_added( initial=event.initial, changed=not event.initial))
[docs] @staticmethod def init() -> Collection[str]: """ Return the names of events that get raised in response to an :class:`InitEvent`. This is currently only ``'init'``. """ return 'init',
[docs] @staticmethod def ready() -> Collection[str]: """ Return the names of events that get raised in response to a :class:`ReadyEvent`. This is currently only ``'ready'``. """ return 'ready',
[docs] @staticmethod def offset() -> Collection[str]: """ Return the names of events that get raised in response to a :class:`OffsetEvent`. This is currently only ``'offset'``. """ return 'offset',
[docs] @staticmethod def transaction_start() -> Collection[str]: """ Return the names of events that get raised in response to a :class:`TransactionStartEvent`. This is currently only ``'transaction-start'``. """ return 'transaction-start',
[docs] @staticmethod def transaction_end() -> Collection[str]: """ Return the names of events that get raised in response to a :class:`TransactionEndEvent`. This is currently only ``'transaction-end'``. """ return 'transaction-end',
[docs] @staticmethod def contract_created(primary_only: bool, template: Any) -> Collection[str]: """ Return the names of events that get raised in response to a :class:`ContractCreateEvent` of the specified template type. """ return EventKey._contract(primary_only, 'create', template)
[docs] @staticmethod def contract_exercised(primary_only: bool, template: Any, choice: Any) -> Collection[str]: """ Return the names of events that get raised in response to a :class:`ContractExercisedEvent` of the specified choice. """ return [f'{key}/{choice}' for key in EventKey._contract(primary_only, 'exercised', template)]
[docs] @staticmethod def contract_archived(primary_only: bool, template: Any) -> Collection[str]: """ Return the names of events that get raised in response to a :class:`ContractCreateEvent` of the specified template type. """ return EventKey._contract(primary_only, 'archive', template)
@staticmethod def packages_added(initial: bool, changed: bool) -> 'Collection[str]': keys = [] if initial: keys.append('packages-added/initial') if changed: keys.append('packages-added/changed') return tuple(keys) @staticmethod def _contract(primary_only: bool, prefix: str, template: Any) -> Collection[str]: m, t = validate_template(template) return tuple(f'{prefix}/{g}' for g in template_reverse_globs(primary_only, m, t))
def max_offset(offsets: 'Collection[str]') -> 'Optional[str]': """ Return the most "recent" offset from a collection of offsets. :param offsets: A collection of offsets to examine. :return: The largest offset, or ``None`` if unknown. """ return max(offsets, key=sortable_offset_height) if offsets else None def sortable_offset_height(value: str) -> int: if value: components = value.split('-', 3) if len(components) == 1: return int(value) elif len(components) >= 1: return int(components[1]) return 0