|
| 1 | +// Copyright (c) 2017 Uber Technologies, Inc. |
| 2 | +// |
| 3 | +// Permission is hereby granted, free of charge, to any person obtaining a copy |
| 4 | +// of this software and associated documentation files (the "Software"), to deal |
| 5 | +// in the Software without restriction, including without limitation the rights |
| 6 | +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| 7 | +// copies of the Software, and to permit persons to whom the Software is |
| 8 | +// furnished to do so, subject to the following conditions: |
| 9 | +// |
| 10 | +// The above copyright notice and this permission notice shall be included in |
| 11 | +// all copies or substantial portions of the Software. |
| 12 | +// |
| 13 | +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 14 | +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 15 | +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 16 | +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 17 | +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 18 | +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| 19 | +// THE SOFTWARE. |
| 20 | + |
| 21 | +/* |
| 22 | +Package activity contains functions and types used to implement Cadence activities. |
| 23 | +
|
| 24 | +The activity is an implementation of a task to be performed as part of a larger workflow. There is no limitation of |
| 25 | +what an activity can do. In the context of a workflow, it is in the activities where all operations that affect the |
| 26 | +desired results must be implemented. |
| 27 | +
|
| 28 | +Overview |
| 29 | +
|
| 30 | +The client library for Cadence does all the heavy lifting of handling the async communication between the Cadence |
| 31 | +managed service and the worker running the activity. As such, the implementation of the activity can, for the most |
| 32 | +part, focus on the business logic. The sample code below shows the implementation of a simple activity that accepts a |
| 33 | +string parameter, appends a word to it and then returns the result. |
| 34 | +
|
| 35 | + import ( |
| 36 | + "context" |
| 37 | +
|
| 38 | + "go.uber.org/cadence/activity" |
| 39 | + "go.uber.org/zap" |
| 40 | + ) |
| 41 | +
|
| 42 | + func init() { |
| 43 | + activity.Register(SimpleActivity) |
| 44 | + } |
| 45 | +
|
| 46 | + func SimpleActivity(ctx context.Context, value string) (string, error) { |
| 47 | + activity.GetLogger(ctx).Info("SimpleActivity called.", zap.String("Value", value)) |
| 48 | + return "Processed: ” + value, nil |
| 49 | + } |
| 50 | +
|
| 51 | +The following sections explore the elements of the above code. |
| 52 | +
|
| 53 | +Declaration |
| 54 | +
|
| 55 | +In the Cadence programing model, an activity is implemented with a function. The function declaration specifies the |
| 56 | +parameters the activity accepts as well as any values it might return. An activity function can take zero or many |
| 57 | +activity specific parameters and can return one or two values. It must always at least return an error value. The |
| 58 | +activity function can accept as parameters and return as results any serializable type. |
| 59 | +
|
| 60 | + func SimpleActivity(ctx context.Context, value string) (string, error) |
| 61 | +
|
| 62 | +The first parameter to the function is context.Context. This is an optional parameter and can be omitted. This |
| 63 | +parameter is the standard Go context. |
| 64 | +
|
| 65 | +The second string parameter is a custom activity-specific parameter that can be used to pass in data into the activity |
| 66 | +on start. An activity can have one or more such parameters. All parameters to an activity function must be |
| 67 | +serializable, which essentially means that params can’t be channels, functions, variadic, or unsafe pointer. |
| 68 | +
|
| 69 | +The activity declares two return values: (string, error). The string return value is used to return the result of the |
| 70 | +activity. The error return value is used to indicate an error was encountered during execution. |
| 71 | +
|
| 72 | +Implementation |
| 73 | +
|
| 74 | +There is nothing special about activity code. You can write activity implementation code the same way you would any |
| 75 | +other Go service code. You can use the usual loggers and metrics collectors. You can use the standard Go concurrency |
| 76 | +constructs. |
| 77 | +
|
| 78 | +Failing the activity |
| 79 | +
|
| 80 | +To mark an activity as failed, all that needs to happen is for the activity function to return an error via the error |
| 81 | +return value. |
| 82 | +
|
| 83 | +Activity Heartbeating |
| 84 | +
|
| 85 | +For long running activities, Cadence provides an API for the activity code to report both liveness and progress back to |
| 86 | +the Cadence managed service. |
| 87 | +
|
| 88 | + progress := 0 |
| 89 | + for hasWork { |
| 90 | + // send heartbeat message to the server |
| 91 | + activity.RecordActivityHeartbeat(ctx, progress) |
| 92 | + // do some work |
| 93 | + ... |
| 94 | + progress++ |
| 95 | + } |
| 96 | +
|
| 97 | +When the activity times out due to a missed heartbeat, the last value of the details (progress in the above sample) is |
| 98 | +returned from the workflow.ExecuteActivity function as the details field of TimeoutError with TimeoutType_HEARTBEAT. |
| 99 | +
|
| 100 | +It is also possible to heartbeat an activity from an external source: |
| 101 | +
|
| 102 | + // instantiate a Cadence service Client |
| 103 | + client.Client client = client.NewClient(...) |
| 104 | +
|
| 105 | + // record heartbeat |
| 106 | + err := client.RecordActivityHeartbeat(taskToken, details) |
| 107 | +
|
| 108 | +The parameters of the RecordActivityHeartbeat function are: |
| 109 | +
|
| 110 | + - taskToken: This is the value of the binary “TaskToken” field of the |
| 111 | + “ActivityInfo” struct retrieved inside the activity |
| 112 | + - details: This is the serializable payload containing progress information |
| 113 | +
|
| 114 | +Activity Cancellation |
| 115 | +
|
| 116 | +When an activity is cancelled (or its workflow execution is completed or failed) the context passed into its function |
| 117 | +is cancelled which sets its Done channel’s closed state. So an activity can use that to perform any necessary cleanup |
| 118 | +and abort its execution. Currently cancellation is delivered only to activities that call RecordActivityHeartbeat. |
| 119 | +
|
| 120 | +Async/“Manual” Activity Completion |
| 121 | +
|
| 122 | +In certain scenarios completing an activity upon completion of its function is not possible or desirable. |
| 123 | +
|
| 124 | +One example would be the UberEATS order processing workflow that gets kicked off once an eater pushes the “Place Order” |
| 125 | +button. Here is how that workflow could be implemented using Cadence and the “async activity completion”: |
| 126 | +
|
| 127 | + - Activity 1: send order to restaurant |
| 128 | + - Activity 2: wait for restaurant to accept order |
| 129 | + - Activity 3: schedule pickup of order |
| 130 | + - Activity 4: wait for courier to pick up order |
| 131 | + - Activity 5: send driver location updates to eater |
| 132 | + - Activity 6: complete order |
| 133 | +
|
| 134 | +Activities 2 & 4 in the above flow require someone in the restaurant to push a button in the Uber app to complete the |
| 135 | +activity. The activities could be implemented with some sort of polling mechanism. However, they can be implemented |
| 136 | +much simpler and much less resource intensive as a Cadence activity that is completed asynchronously. |
| 137 | +
|
| 138 | +There are 2 parts to implementing an asynchronously completed activity. The first part is for the activity to provide |
| 139 | +the information necessary to be able to be completed from an external system and notify the Cadence service that it is |
| 140 | +waiting for that outside callback: |
| 141 | +
|
| 142 | + // retrieve activity information needed to complete activity asynchronously |
| 143 | + activityInfo := activity.GetActivityInfo(ctx) |
| 144 | + taskToken := activityInfo.TaskToken |
| 145 | +
|
| 146 | + // send the taskToken to external service that will complete the activity |
| 147 | + ... |
| 148 | +
|
| 149 | + // return from activity function indicating the Cadence should wait for an async completion message |
| 150 | + return "", activity.ErrResultPending |
| 151 | +
|
| 152 | +The second part is then for the external service to call the Cadence service to complete the activity. To complete the |
| 153 | +activity successfully you would do the following: |
| 154 | +
|
| 155 | + // instantiate a Cadence service Client |
| 156 | + // the same client can be used complete or fail any number of activities |
| 157 | + client.Client client = client.NewClient(...) |
| 158 | +
|
| 159 | + // complete the activity |
| 160 | + client.CompleteActivity(taskToken, result, nil) |
| 161 | +
|
| 162 | +And here is how you would fail the activity: |
| 163 | +
|
| 164 | + // fail the activity |
| 165 | + client.CompleteActivity(taskToken, nil, err) |
| 166 | +
|
| 167 | +The parameters of the CompleteActivity function are: |
| 168 | +
|
| 169 | + - taskToken: This is the value of the binary “TaskToken” field of the |
| 170 | + “ActivityInfo” struct retrieved inside the activity. |
| 171 | + - result: This is the return value that should be recorded for the activity. |
| 172 | + The type of this value needs to match the type of the return value |
| 173 | + declared by the activity function. |
| 174 | + - err: The error code to return if the activity should terminate with an |
| 175 | + error. |
| 176 | +
|
| 177 | +If error is not null the value of the result field is ignored. |
| 178 | +
|
| 179 | +For a full example of implementing this pattern see the Expense sample. |
| 180 | +
|
| 181 | +Registration |
| 182 | +
|
| 183 | +In order to for some workflow execution to be able to invoke an activity type, the worker process needs to be aware of |
| 184 | +all the implementations it has access to. An activity is registered with the following call: |
| 185 | +
|
| 186 | + activity.RegisterActivity(SimpleActivity) |
| 187 | +
|
| 188 | +This call essentially creates an in-memory mapping inside the worker process between the fully qualified function name |
| 189 | +and the implementation. Unlike in Amazon SWF, workflow and activity types are not registered with the managed service. |
| 190 | +If the worker receives a request to start an activity execution for an activity type it does not know it will fail that |
| 191 | +request. |
| 192 | +
|
| 193 | +*/ |
| 194 | +package activity |
0 commit comments