Skip to content

Latest commit

 

History

History
 
 

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

OpenFunction function - Pubsub

Subscriber

FUNC_CONTEXT

Prepare a context as follows, name it function.json. (You can refer to OpenFunction Context Specs to learn more about the OpenFunction Context)

{
  "name": "subscriber",
  "version": "v1",
  "requestID": "a0f2ad8d-5062-4812-91e9-95416489fb01",
  "port": "50003",
  "inputs": {
    "sub": {
      "uri": "sample-topic",
      "componentName": "msg",
      "componentType": "pubsub.kafka"
    }
  },
  "outputs": {},
  "runtime": "Async"
}

Create an environment variable FUNC_CONTEXT and assign the above context to it.

export FUNC_CONTEXT='{"name":"subscriber","version":"v1","requestID":"a0f2ad8d-5062-4812-91e9-95416489fb01","port":"50003","inputs":{"sub":{"uri":"sample-topic","componentName":"msg","componentType":"pubsub.kafka"}},"outputs":{},"runtime":"Async"}'
export CONTEXT_MODE='self-host'

Run

Start the service and watch the logs.

cd sub/
go mod tidy
dapr run --app-id subscriber \
    --app-protocol grpc \
    --app-port 50003 \
    --dapr-grpc-port 50001 \
    --components-path ../../components \
    go run ./main.go

Producer

FUNC_CONTEXT

You also need a definition of producer.

{
  "name": "producer",
  "version": "v1",
  "requestID": "a0f2ad8d-5062-4812-91e9-95416489fb01",
  "port": "50004",
  "inputs": {
    "cron": {
      "componentName": "cron_input",
      "componentType": "bindings.cron"
    }
  },
  "outputs": {
    "pub": {
      "uri": "sample-topic",
      "componentName": "msg",
      "componentType": "pubsub.kafka"
    }
  },
  "runtime": "Async"
}

Create an environment variable FUNC_CONTEXT and assign the above context to it.

export FUNC_CONTEXT='{"name":"producer","version":"v1","requestID":"a0f2ad8d-5062-4812-91e9-95416489fb01","port":"50004","inputs":{"cron":{"componentName":"cron_input","componentType":"bindings.cron"}},"outputs":{"pub":{"uri":"sample-topic","componentName":"msg","componentType":"pubsub.kafka"}},"runtime":"Async"}'
export CONTEXT_MODE='self-host'
export DAPR_GRPC_PORT="50002"

Run

Start the service with another terminal to publish message.

cd pub/
go mod tidy
dapr run --app-id producer \
    --app-protocol grpc \
    --app-port 50004 \
    --dapr-grpc-port 50002 \
    --components-path ../../components \
    go run ./main.go

Results

View detailed producer logs.
== APP == dapr client initializing for: 127.0.0.1:50002
== APP == I0308 12:00:57.853501 2463688 framework.go:110] Plugins for pre-hook stage:
== APP == I0308 12:00:57.853575 2463688 framework.go:118] Plugins for post-hook stage:
== APP == I0308 12:00:57.855762 2463688 async.go:111] registered bindings handler: cron_input
== APP == I0308 12:00:57.855780 2463688 async.go:53] Async Function serving grpc: listening on port 50004
INFO[0001] application discovered on port 50004          app_id=producer instance=crab scope=dapr.runtime type=log ver=1.5.1
INFO[0001] actor runtime started. actor idle timeout: 1h0m0s. actor scan interval: 30s  app_id=producer instance=crab scope=dapr.runtime.actor type=log ver=1.5.1
INFO[0001] dapr initialized. Status: Running. Init Elapsed 1666.519766ms  app_id=producer instance=crab scope=dapr.runtime type=log ver=1.5.1
INFO[0001] placement tables updated, version: 0          app_id=producer instance=crab scope=dapr.runtime.actor.internal.placement type=log ver=1.5.1
== APP == I0308 12:00:59.008107 2463688 pub.go:22] send msg and receive result:
== APP == I0308 12:01:01.003672 2463688 pub.go:22] send msg and receive result:
== APP == I0308 12:01:03.010105 2463688 pub.go:22] send msg and receive result:
== APP == I0308 12:01:05.028111 2463688 pub.go:22] send msg and receive result:
View detailed subscriber logs.
== APP == I0308 11:59:54.548833 2459950 framework.go:110] Plugins for pre-hook stage:
== APP == I0308 11:59:54.549080 2459950 framework.go:118] Plugins for post-hook stage:
== APP == dapr client initializing for: 127.0.0.1:50001
== APP == I0308 11:59:54.555141 2459950 async.go:143] registered pubsub handler: msg, topic: sample-topic
== APP == I0308 11:59:54.555166 2459950 async.go:53] Async Function serving grpc: listening on port 50003
INFO[0006] application discovered on port 50003          app_id=subscriber instance=crab scope=dapr.runtime type=log ver=1.5.1
INFO[0006] actor runtime started. actor idle timeout: 1h0m0s. actor scan interval: 30s  app_id=subscriber instance=crab scope=dapr.runtime.actor type=log ver=1.5.1
INFO[0006] app is subscribed to the following topics: [sample-topic] through pubsub=msg  app_id=subscriber instance=crab scope=dapr.runtime type=log ver=1.5.1
INFO[0006] placement tables updated, version: 0          app_id=subscriber instance=crab scope=dapr.runtime.actor.internal.placement type=log ver=1.5.1
INFO[0009] dapr initialized. Status: Running. Init Elapsed 9673.663393ms  app_id=subscriber instance=crab scope=dapr.runtime type=log ver=1.5.1
== APP == 2022/03/08 12:00:59 event - Data: {"hello":"world"}
== APP == 2022/03/08 12:00:59 event - Data: map[hello:world]
== APP == 2022/03/08 12:01:01 event - Data: {"hello":"world"}
== APP == 2022/03/08 12:01:01 event - Data: map[hello:world]
== APP == 2022/03/08 12:01:03 event - Data: {"hello":"world"}
== APP == 2022/03/08 12:01:03 event - Data: map[hello:world]
== APP == 2022/03/08 12:01:05 event - Data: {"hello":"world"}
== APP == 2022/03/08 12:01:05 event - Data: map[hello:world]