dazl

Source code for dazl.client.bots

# Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import inspect
import operator
from abc import abstractmethod
from asyncio import ensure_future, Future, gather, get_event_loop, InvalidStateError
from collections import defaultdict
from dataclasses import dataclass, field, replace
from datetime import timedelta
from enum import auto, Enum
from functools import wraps
from threading import RLock
from typing import overload, AbstractSet, Any, Awaitable, Callable, Collection, DefaultDict, List, \
    Optional, Sequence, TypeVar, Union, TYPE_CHECKING
from uuid import uuid4

from .. import LOG
from ..model.core import Party, SourceLocation
from ..model.reading import BaseEvent, EventKey
from ..model.writing import Command, CommandBuilder
from ..util.asyncio_util import LongRunningAwaitable, completed, propagate, failed, Signal

if TYPE_CHECKING:
    from .api import PartyClient

DEFAULT_BOT_STOP_TIMEOUT = timedelta(seconds=30)


E = TypeVar('E', bound=BaseEvent)
BotCallback = Callable[[E], Any]
BotFilter = Callable[[E], bool]


class _BotRunLevel(Enum):
    CONTINUE = auto()
    SUSPEND = auto()
    TERMINATE = auto()


[docs]class Bot: def __init__(self, party_client: 'Optional[PartyClient]', name: str): self._handlers: DefaultDict[str, List[BotEntry]] = defaultdict(list) self._party_client = party_client self._id = uuid4().hex self._name = name self._queue = [] self._signal = None # type: Optional[Signal] self._idle = True self._run_level = _BotRunLevel.CONTINUE
[docs] def event_keys(self) -> 'AbstractSet[str]': """ Return the set of keys that event handlers in this bot are configured to handle. """ return frozenset(self._handlers)
[docs] def wants_any_keys(self, keys: 'Collection[str]') -> bool: return bool(set(keys).intersection(self._handlers))
[docs] def add_event_handler( self, keys: 'Union[str, Collection[str]]', handler: 'BotCallback', filter_fn: 'Optional[BotFilter]' = None) -> None: """ Add a new event handler to this bot for the specified event. :param keys: The key(s) of the event (as returned by :meth:`EventKey.from_event`). :param filter_fn: An optional callback that returns `True` or `False` on whether the corresponding callback should be invoked. This cannot be a coroutine function. :param handler: An event handler to be invoked when an event with the specified key is raised. """ if isinstance(keys, str): keys = [keys] else: keys = tuple(keys) for key in keys: if not isinstance(key, str): raise ValueError('expected a string key') if not callable(handler): raise ValueError('handler must be callable') # noinspection PyBroadException try: source_file = inspect.getsourcefile(inspect.unwrap(handler)) lines, start_line = inspect.getsourcelines(handler) end_line = start_line + len(lines) source_loc = SourceLocation(source_file, start_line, end_line) except Exception: # noqa LOG.warning('Could not compute original source information for %r', handler, exc_info=True) source_loc = None if self._party_client is not None: # noinspection PyProtectedMember impl = self._party_client._impl handler = wrap_as_command_submission(impl.write_commands, handler, filter_fn) for key in keys: self._handlers[key].append(BotEntry(key, handler, filter_fn, source_loc))
[docs] def ledger_created(self, template: Any): def _register_created(fn): self.add_event_handler(EventKey.contract_created(True, template), fn) return _register_created
async def _main(self): """ The "main" coroutine of the bot. Invokes handlers on each event as they come in. """ # noinspection PyBroadException try: self._signal = Signal() while self._run_level != _BotRunLevel.TERMINATE: # the main queue contains either events we have not yet processed yet or ``None`` # markers that merely indicate running status should be "re-checked" self._idle = False while self._queue: invocation = self._queue.pop(0) # noinspection PyBroadException try: fut = ensure_future(self._handle_event(invocation.event)) propagate(fut, invocation.future) await invocation.future except Exception: # noqa LOG.exception(f'An event handler in a bot has thrown an exception! ' f'(offending event: {invocation.event})') self._idle = True await self._signal.wait() self._signal = None except Exception: # noqa LOG.exception('A bot thread died abnormally.') async def _handle_event(self, event: 'BaseEvent') -> None: """ Process an event, mostly by calling appropriate callbacks. :param event: The event to process. """ # The _PartyClientImpl and Bot classes are the ones raising events, but to the user, they # registered from the perspective of a client with a certain thread affinity. Replace the # "source" of the event with the client that the user originally used. if self._party_client is not None: new_event = replace(event, client=self._party_client) else: new_event = event for event_key in EventKey.from_event(new_event): LOG.debug('Party %s dispatching event %r to its bots...', self.party, event_key) handlers = self._handlers.get(event_key) if handlers is not None: for handler in handlers: # noinspection PyBroadException try: if handler.filter is None or handler.filter(new_event): await handler.callback(new_event) except Exception: # noqa LOG.exception('An event handler in a bot has thrown an exception!')
[docs] def notify(self, event: 'BaseEvent') -> 'Awaitable[None]': """ Notifies handler(s) associated with this bot that the given event has occurred. Note that this notification is asynchronous: in other words, event handlers will not have processed this event by the time this function returns. :param event: The event to raise. """ if not isinstance(event, BaseEvent): raise ValueError('expected a BaseEvent') if self._queue is None: raise InvalidStateError('Cannot notify on a bot whose main method is not running') bot_invocation = BotInvocation(event) self._queue.append(bot_invocation) if self._signal is not None: self._signal.notify_all() return bot_invocation.future
def _dispatch(self, event: 'BaseEvent') -> None: for event_key in EventKey.from_event(event): self._handlers.get(event_key) # <editor-fold desc="State control functions">
[docs] def pause(self) -> None: """ Immediately change the state of this bot to ``PAUSING``, and pause event handler invocations. The event handler currently running is allowed to complete. When that is completed, the state is changed to ``PAUSED``. """ LOG.info('Pausing the bot thread for party %r...', self.party) self._run_level = _BotRunLevel.SUSPEND if self._signal is not None: self._signal.notify_all()
[docs] def resume(self) -> None: """ Immediately change the state of this bot to ``RESUMING`` and process any events that have queued up while the bot was paused. When this queue is fully drained, the state is changed to ``RUNNING``. """ LOG.info('Resuming the bot thread for party %r...', self.party) self._run_level = _BotRunLevel.CONTINUE if self._signal is not None: self._signal.notify_all()
[docs] def stop(self): """ Permanently stop this bot. If you need to be able to "restart" a stopped bot, use :meth:`pause` and :meth:`resume` instead. """ LOG.info('Stopping the bot thread for party %r...', self.party) self._run_level = _BotRunLevel.TERMINATE if self._signal is not None: self._signal.notify_all()
# </editor-fold> # <editor-fold desc="State query properties/functions"> @property def id(self) -> str: """ The ID of this bot, generated at runtime. """ return self._id @property def name(self): """ The name of this bot. Defaults to the name of the original event handler if unspecified. """ return self._name @property def party(self) -> 'Party': """ Primary party that this bot receives events for (and potentially generates commands for). """ return self._party_client.party if self._party_client is not None else None
[docs] def entries(self) -> 'Sequence[BotEntry]': """ The collection of individual event handlers in a bot, in the order that they will be executed. """ return tuple(entry for collection in self._handlers.values() for entry in collection)
@property def state(self) -> 'BotState': """ Current running state of the bot. """ if self._run_level == _BotRunLevel.CONTINUE: return BotState.RUNNING if self._signal is not None else BotState.STARTING elif self._run_level == _BotRunLevel.TERMINATE: return BotState.STOPPED if self._idle else BotState.STOPPING elif self._run_level == _BotRunLevel.SUSPEND: return BotState.PAUSED if self._idle else BotState.PAUSING @property def running(self) -> bool: """ Return ``True`` if this bot is currently processing events. """ return self._signal is not None and \ (self._run_level == _BotRunLevel.CONTINUE or not self._idle)
# </editor-fold>
[docs]class BotCollection(Sequence[Bot]): """ A collection of bots for a party. This class is thread-safe except for :meth:`notify` and :meth:`_main` in order to support adding event handlers from any thread. The most common use of this is for ``SimplePartyClient`` instances, where event registration is done from the main thread (from the perspective of the caller) and event notifications are done on an asyncio event loop thread (hidden from the caller). """ def __init__(self, party: 'Optional[Party]'): self.party = party self._bots = [] # type: List[Bot] self._fut = None # type: Optional[LongRunningAwaitable] self._lock = RLock() def __len__(self) -> int: with self._lock: return len(self._bots) @overload @abstractmethod def __getitem__(self, i: int) -> Bot: ... @overload @abstractmethod def __getitem__(self, s: slice) -> Sequence[Bot]: ... def __getitem__(self, i: int) -> Bot: with self._lock: return operator.getitem(self._bots, i) def __iter__(self): with self._lock: bots = list(self._bots) return iter(bots)
[docs] def add_new(self, name: str, party_client: 'Optional[PartyClient]' = None) -> 'Bot': bot = Bot(party_client, name) with self._lock: self._bots.append(bot) if self._fut is not None: # the _main coroutine has already started, so just add this bot's main method to the # set of coroutines we track # noinspection PyProtectedMember self._fut.append(ensure_future(bot._main())) return bot
[docs] def add_single( self, keys: 'Union[str, Sequence[str]]', handler: 'BotCallback', filter_fn: 'Optional[BotFilter]' = None, name: 'Optional[str]' = None, party_client: 'Optional[PartyClient]' = None) -> 'Bot': """ Convenience method for creating a bot with a single event handler. """ if name is None: # by default, the name of a single-event-handler bot is simply the name of the passed # in function name = handler.__name__ if isinstance(keys, str): keys = [keys] bot = self.add_new(name, party_client) for key in keys: bot.add_event_handler(key, handler, filter_fn) return bot
[docs] def notify(self, event: 'BaseEvent'): futures = [] try: event_keys = EventKey.from_event(event) for bot in self._bots: if bot.wants_any_keys(event_keys): futures.append(ensure_future(bot.notify(event))) except: # noqa # This exception indicates a problem with the scheduling code and not user bot code. # Exceptions thrown by user code would be propagated through the Future LOG.exception('Failed to notify event for party %s: %r', self.party, event) raise if len(futures) == 0: return completed(None) elif len(futures) == 1: return futures[0] else: return gather(*futures, return_exceptions=True)
async def _main(self): if self.party is not None: LOG.info('Party %s bots coroutine started.', self.party) else: LOG.info('Network bots coroutine started.') with self._lock: self._fut = LongRunningAwaitable() # noinspection PyProtectedMember self._fut.extend(ensure_future(bot._main()) for bot in self._bots) await self._fut with self._lock: self._fut = None if self.party is not None: LOG.info('Party %s bots coroutine finished.', self.party) else: LOG.info('Network bots coroutine finished.')
[docs] def stop_all(self): with self._lock: # LongRunningAwaitable suspends itself until all futures are completed. It also keeps # itself open waiting for the very first future. In the case that no other futures have # been added, take this opportunity to feed LongRunningAwaitable a future that # immediately resolves, which will have the effect of resolving that overall future # if there are no other futures (or the other futures are resolved as well); otherwise # this has no effect. self._fut.append(completed(None)) for bot in list(self._bots): bot.stop() self._bots.clear()
[docs]@dataclass(frozen=True) class BotEntry: event_key: str callback: BotCallback filter: Optional[BotFilter] = None source_location: Optional[SourceLocation] = None
[docs]@dataclass(frozen=True) class BotInvocation: event: BaseEvent future: Future = field(default_factory=lambda: get_event_loop().create_future())
[docs]class BotState(Enum): """ Possible states of a :class:`Bot`. """ STARTING = 'STARTING' #: The bot is starting (has not yet received the "ready" event). PAUSING = 'PAUSING' #: This bot has been told to pause, but has not yet completed processing events in flight. PAUSED = 'PAUSED' #: RESUMING = 'RESUMING' RUNNING = 'RUNNING' STOPPING = 'STOPPING' STOPPED = 'STOPPED'
# noinspection PyShadowingBuiltins
[docs]def wrap_as_command_submission(submit_fn, callback, filter) \ -> Callable[[BaseEvent], Awaitable[Any]]: """ Normalize a callback to something that takes a single contract ID and contract data, and return an awaitable that is resolved when the underlying command has been fully submitted. """ import inspect @wraps(callback) def implementation(*args, **kwargs): if filter is not None and not filter(*args, **kwargs): return completed(None) try: ret = callback(*args, **kwargs) except BaseException as exception: LOG.exception('The callback %r threw an exception!', callback) return failed(exception) if ret is None: return completed(None) elif isinstance(ret, (CommandBuilder, Command, list, tuple)): try: ret_fut = submit_fn(ret) except BaseException as exception: LOG.exception('The callback %r returned commands that could not be submitted! (%s)', callback, ret) return failed(exception) return ret_fut elif inspect.isawaitable(ret): # the user-provided callback returned an Awaitable cmd_fut = ensure_future(ret) if cmd_fut.done(): if cmd_fut.cancelled() or cmd_fut.exception() is not None: # a cancelled or failed user-provided callback Future is the same as the # command submission itself failing return cmd_fut # functionally equivalent to the non-Awaitable case if the Awaitable has already # completed return submit_fn(cmd_fut.result()) else: # create `fut`, which we'll give to the user; wait for `cmd_fut` to finish, then # take the result of that awaitable and try to submit a command with that result fut = get_event_loop().create_future() def cmd_future_finished(_): ret = cmd_fut.result() if ret is None: fut.set_result(None) elif isinstance(ret, (CommandBuilder, Command, list, tuple)): propagate(ensure_future(submit_fn(ret)), fut) elif inspect.isawaitable(ret): LOG.error('A callback cannot return an Awaitable of an Awaitable') raise InvalidStateError( 'A callback cannot return an Awaitable of an Awaitable') cmd_fut.add_done_callback(cmd_future_finished) return fut else: LOG.error('the callback %r returned a value of an unexpected type: %s', callback, ret) raise ValueError('unexpected return type from a callback') return implementation