dazl

Message Ingester

This example sets up a workflow involving two parties, Alice and Bob. When writing a DAML application, we recommend the following steps:

  1. Describe your workflow as a series of contracts in DAML.

  2. Write one or more DAML test scenarios that walk through each step of the workflow, starting from creation of the genesis contract, through a sequence of exercise commands, to the final state of the workflow. (Note: The DAML test scenarios are used for verifying the steps of the workflow in a sequential manner. However, when the workflow is deployed to a live platflorm, the events that prompt steps in the workflow to move forward cannot be assumed to occur sequentially.)

  3. Write your application.

  4. Test your application on the sandbox. The ledger server implements the same API, so performance testing can be done with the same application.

The Message Ingester workflow is as follows:

  1. The genesis contract, OperatorRole, is created. The OperatorRole contract is a contract that describes the operation(s) that the operator of this workflow can perform. In this example, we have assigned the party name Alice to the OperatorRole.

  2. Alice ingests an input message, which causes the generation of a TradeRequest contract.

  3. Bob is the requestProcessingParty on the TradeRequest contract, and exercises the AcceptMessage choice on the TradeRequest. This causes the generation of a TradeResponse contract.

  4. Alice exercises the Acknowledge choice on the TradeResponse contract. This causes the generation of a WorkflowCompleted contract.

DAML Model

This example assumes the following DAML:

daml 1.0
module MessageIngester where

template OperatorRole
    with
        operator: Party
    where
        signatory operator
        controller operator can
            IngestMessage with requestProcessingParty: Party
                returning ContractId TradeRequest
                to do
                    tchoose <- getTime
                    create TradeRequest with messageIngester=operator; requestProcessingParty; originalMessageIngestedTime=tchoose;

template TradeRequest
    with
        messageIngester: Party
        requestProcessingParty: Party
        originalMessageIngestedTime: Time
    where
        signatory messageIngester

        controller requestProcessingParty can
            AcceptMessage with acknowledgingParty: Party
                returning ContractId TradeResponse
                to do
                    tchoose <- getTime
                    create TradeResponse with acknowledgingParty=messageIngester; tradeRequestAcceptedTime=tchoose; originalMessageIngestedTime;
            RejectMessage
                returning {}
                to return {}

template TradeResponse
    with
        acknowledgingParty: Party
        tradeRequestAcceptedTime: Time
        originalMessageIngestedTime: Time
    where
        signatory acknowledgingParty

        controller acknowledgingParty can
            Acknowledge
                returning ContractId WorkflowCompleted
                to do
                    create WorkflowCompleted with acknowledgingParty; tradeRequestAcceptedTime; originalMessageIngestedTime;

template WorkflowCompleted
    with
        acknowledgingParty: Party
        tradeRequestAcceptedTime: Time
        originalMessageIngestedTime: Time
    where
        signatory acknowledgingParty

        controller acknowledgingParty can
            Archive
                returning {}
                to return {}

test messageIngesterTest = 
    scenario
        operatorRoleCid <- 'Alice' commits create OperatorRole with operator='Alice'
        -- DOC_BEGIN: SAMPLE_DAML_SCENARIO_INGEST_MESSAGE
        tradeRequestCid <- 'Alice' commits exercise operatorRoleCid IngestMessage with requestProcessingParty='Bob'
        -- DOC_END: SAMPLE_DAML_SCENARIO_INGEST_MESSAGE
        tradeResponseCid <- 'Bob' commits exercise tradeRequestCid AcceptMessage with acknowledgingParty='Alice'
        workflowCompleted <- 'Alice' commits exercise tradeResponseCid Acknowledge
        'Alice' commits exercise workflowCompleted Archive 

The messageIngesterTest scenario describes a sample execution of the workflow, and is the basis from from which our Python application will be designed.

Python Application

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Copyright (c) 2019 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from dazl import create, exercise
from dazl.client import create_client
from dazl.plugins import LedgerCapturePlugin

parties = [ 'Alice','Bob']
url = 'http://localhost:7600/'


def genesis_contract(_, __):
    return create('MessageIngester.OperatorRole', { 'operator' : 'Alice' })

# DOC_BEGIN: FUNCTION_INGEST_THE_MESSAGE
def ingest_the_message(cid, cdata):
    ingest_message = [ exercise(cid, 'IngestMessage', {'requestProcessingParty': 'Bob'})]
    return ingest_message
    # DOC_BEGIN: FUNCTION_INGEST_THE_MESSAGE


def accept_the_message(cid, cdata):
    accept_message_action = [ exercise(cid, 'AcceptMessage', {'acknowledgingParty': 'alice'})]
    return accept_message_action


def acknowledge_the_message(cid, cdata):
    acknowledge_message_action = [ exercise(cid, 'Acknowledge', {})]
    return acknowledge_message_action


def register_event_handlers(client_mgr):
    #define a sandbox client, associated with the 'operator' party, and how it reacts to events
    alice_client = client_mgr.new_client('Alice')
    alice_client.on_ready(genesis_contract)
    # DOC_BEGIN: SAMPLE_CALLBACK_ONCREATED
    alice_client.on_created('MessageIngester.OperatorRole', ingest_the_message)
    # DOC_END: SAMPLE_CALLBACK_ONCREATED
    alice_client.on_created('MessageIngester.TradeResponse', acknowledge_the_message)

    #define a sandbox client, associated with the 'requestor' party, and how it reacts to events
    bob_client = client_mgr.new_client('Bob')
    bob_client.on_created('MessageIngester.TradeRequest', accept_the_message)


def run():
    with create_client(parties=parties, participant_url=url) as client_mgr:
        inspector = LedgerCapturePlugin.stdout()
        try:
            client_mgr.register(inspector)
            register_event_handlers(client_mgr)
            ledger_run = client_mgr.run_until_complete()
            return ledger_run.exit_code
        finally:
            inspector.dump_all()

if __name__ == "__main__":
    run()
To run this code sample:
  1. Download the SDK

  2. create a new project: da new my-project-name-here

  3. cd my-project-name-here

  4. Create a file, MessageIngester.daml, that contains the above listed DAML.

  5. Createa file, message_ingester.py, that contains the above listed Python code.

  6. Download the dazl-starter template (which also creates a Python venv): da project add dazl-starter

  7. Start the sandbox: da sandbox

  8. Run the application: ./venv/bin/python3 message_ingester.py

run() configures client_mgr such that it will invoke the work() function after it has successfully connected to the platform/sandbox.

work() contains a command to create the genesis contract, and a series of callback registrations, each of which provide a reference to a custom python function that shall be invoked when a certain leger event occurs. For example, this registration:

    alice_client.on_created('MessageIngester.OperatorRole', ingest_the_message)

indicates that the ingest_the_message() shall be invoked after the OperatorRole contract is created.

ingest_the_message() describes what will happen when at on_created event occurs:
  1. The DAZL framework, upon detecting the specified on_created event, will invoke this function and pass it the contract id (cid) and corresponding contract parameters (cdata).

  2. An exercise choice will be performed on the contract (in this case, it’s an OperatorRole contract)

  3. That exercise choice will be invoked on a DAML contract with a contract id of cid, with the specified parameters.

  4. Since this is a non-consuming choice, the OperatorRole contract will remain active.

def ingest_the_message(cid, cdata):
    ingest_message = [ exercise(cid, 'IngestMessage', {'requestProcessingParty': 'Bob'})]
    return ingest_message

The exercise() call in the above code snippet corresponds to this line in our DAML test scenario:

        tradeRequestCid <- 'Alice' commits exercise operatorRoleCid IngestMessage with requestProcessingParty='Bob'

Thus, a typical application would have a similar structure to work() in that it will contain only one command to create the genesis contract, and all other code will describe the callback handlers and the situations under which those callbacks shall be invoked.

Application Output

This application will produce this output:

2 total contracts over 2 templates
+- party 'Alice' (block heights 1 to 5)
|+ party 'Bob' (block heights 2 to 5)
||

MessageIngester.OperatorRole (1 contract) ------------------------------------------------------------------------------
#cid operator
C  0:0_ Alice

MessageIngester.WorkflowCompleted (1 contract) -------------------------------------------------------------------------
#cid acknowledgingParty originalMessageIngestedTime TradeResponseAcknowledgedTime
C  3:2_ Alice              1970-01-01T00:00:00Z        1970-01-01T00:00:00Z

The TradeRequest and TradeResponse contracts were created, and subsequently archived during the course of the workflow, thus only OperatorRole WorkflowCompleted and (which has no consuming choices) are active on the ledger when this application terminates.