Skip to content

Commit c159be9

Browse files
authored
Support protocol version 2.0.0 to callback handlers orchestrated by CloudFormation service (#146)
1 parent f77a5df commit c159be9

File tree

15 files changed

+164
-469
lines changed

15 files changed

+164
-469
lines changed

cfn/cfn.go

Lines changed: 95 additions & 235 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,21 @@ package cfn
22

33
import (
44
"context"
5-
"encoding/json"
65
"errors"
6+
"io/ioutil"
77
"log"
88
"os"
9+
"path"
910
"time"
1011

11-
"github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/callback"
1212
"github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/cfnerr"
1313
"github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/credentials"
14-
"github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/encoding"
1514
"github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/handler"
1615
"github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/logging"
1716
"github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/metrics"
18-
"github.com/aws-cloudformation/cloudformation-cli-go-plugin/cfn/scheduler"
1917

2018
"github.com/aws/aws-lambda-go/lambda"
21-
"github.com/aws/aws-sdk-go/service/cloudformation"
2219
"github.com/aws/aws-sdk-go/service/cloudwatch"
23-
"github.com/aws/aws-sdk-go/service/cloudwatchevents"
2420
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
2521
)
2622

@@ -43,12 +39,6 @@ const (
4339
listAction = "LIST"
4440
)
4541

46-
// MaxRetries is the number of times to try to call the Handler after it fails to respond.
47-
var MaxRetries int = 3
48-
49-
// Timeout is the length of time to wait before giving up on a request.
50-
var Timeout time.Duration = 60 * time.Second
51-
5242
// Handler is the interface that all resource providers must implement
5343
//
5444
// Each method of Handler maps directly to a CloudFormation action.
@@ -63,11 +53,6 @@ type Handler interface {
6353
List(request handler.Request) handler.ProgressEvent
6454
}
6555

66-
// InvokeScheduler is the interface that all reinvocation schedulers must implement
67-
type InvokeScheduler interface {
68-
Reschedule(lambdaCtx context.Context, secsFromNow int64, callbackRequest string, invocationIDS *scheduler.ScheduleIDS) (*scheduler.Result, error)
69-
}
70-
7156
// Start is the entry point called from a resource's main function
7257
//
7358
// We define two lambda entry points; MakeEventFunc is the entry point to all
@@ -109,242 +94,80 @@ type testEventFunc func(ctx context.Context, event *testEvent) (handler.Progress
10994
// handlerFunc is the signature required for all actions
11095
type handlerFunc func(request handler.Request) handler.ProgressEvent
11196

112-
// router decides which handler should be invoked based on the action
113-
// It will return a route or an error depending on the action passed in
114-
func router(a string, h Handler) (handlerFunc, error) {
115-
// Figure out which action was called and have a "catch-all"
116-
switch a {
117-
case createAction:
118-
return h.Create, nil
119-
case readAction:
120-
return h.Read, nil
121-
case updateAction:
122-
return h.Update, nil
123-
case deleteAction:
124-
return h.Delete, nil
125-
case listAction:
126-
return h.List, nil
127-
default:
128-
// No action matched, we should fail and return an InvalidRequestErrorCode
129-
return nil, cfnerr.New(invalidRequestError, "No action/invalid action specified", nil)
130-
}
131-
}
132-
133-
// Invoke handles the invocation of the handerFn.
134-
func invoke(handlerFn handlerFunc, request handler.Request, metricsPublisher *metrics.Publisher, action string) (handler.ProgressEvent, error) {
135-
attempts := 0
136-
137-
for {
138-
attempts++
139-
// Create a context that is both manually cancellable and will signal
140-
// a cancel at the specified duration.
141-
ctx, cancel := context.WithTimeout(context.Background(), Timeout)
142-
//We always defer a cancel.
143-
defer cancel()
144-
145-
// Create a channel to received a signal that work is done.
146-
ch := make(chan handler.ProgressEvent, 1)
147-
148-
// Ask the goroutine to do some work for us.
149-
go func() {
150-
//start the timer
151-
start := time.Now()
152-
metricsPublisher.PublishInvocationMetric(time.Now(), string(action))
153-
154-
// Report the work is done.
155-
progEvt := handlerFn(request)
156-
157-
marshaled, _ := encoding.Marshal(progEvt.ResourceModel)
158-
log.Printf("Received event: %s\nMessage: %s\nBody: %s",
159-
progEvt.OperationStatus,
160-
progEvt.Message,
161-
marshaled,
162-
)
163-
164-
elapsed := time.Since(start)
165-
metricsPublisher.PublishDurationMetric(time.Now(), string(action), elapsed.Seconds()*1e3)
166-
ch <- progEvt
167-
}()
168-
169-
// Wait for the work to finish. If it takes too long move on. If the function returns an error, signal the error channel.
170-
select {
171-
case d := <-ch:
172-
//Return the response from the handler.
173-
return d, nil
174-
175-
case <-ctx.Done():
176-
if attempts == MaxRetries {
177-
log.Printf("Handler failed to respond, retrying... attempt: %v action: %s \n", attempts, action)
178-
//handler failed to respond.
179-
cfnErr := cfnerr.New(timeoutError, "Handler failed to respond in time", nil)
180-
metricsPublisher.PublishExceptionMetric(time.Now(), string(action), cfnErr)
181-
return handler.ProgressEvent{}, cfnErr
182-
}
183-
log.Printf("Handler failed to respond, retrying... attempt: %v action: %s \n", attempts, action)
184-
185-
}
186-
}
187-
}
188-
189-
func isMutatingAction(action string) bool {
190-
switch action {
191-
case createAction:
192-
return true
193-
case updateAction:
194-
return true
195-
case deleteAction:
196-
return true
197-
}
198-
return false
199-
}
200-
201-
func translateStatus(operationStatus handler.Status) callback.Status {
202-
switch operationStatus {
203-
case handler.Success:
204-
return callback.Success
205-
case handler.Failed:
206-
return callback.Failed
207-
case handler.InProgress:
208-
return callback.InProgress
209-
default:
210-
return callback.UnknownStatus
211-
}
212-
213-
}
214-
215-
func processinvoke(handlerFn handlerFunc, event *event, request handler.Request, metricsPublisher *metrics.Publisher) handler.ProgressEvent {
216-
progEvt, err := invoke(handlerFn, request, metricsPublisher, event.Action)
217-
if err != nil {
218-
log.Printf("Handler invocation failed: %v", err)
219-
return handler.NewFailedEvent(err)
220-
}
221-
return progEvt
222-
}
223-
224-
func reschedule(ctx context.Context, invokeScheduler InvokeScheduler, progEvt handler.ProgressEvent, event *event) (bool, error) {
225-
cusCtx, delay := marshalCallback(&progEvt)
226-
ids, err := scheduler.GenerateCloudWatchIDS()
227-
if err != nil {
228-
return false, err
229-
}
230-
// Add IDs to recall the function with Cloudwatch events
231-
event.RequestContext.CloudWatchEventsRuleName = ids.Handler
232-
event.RequestContext.CloudWatchEventsTargetID = ids.Target
233-
// Update model properties
234-
m, err := encoding.Marshal(progEvt.ResourceModel)
235-
if err != nil {
236-
return false, err
237-
}
238-
event.RequestData.ResourceProperties = m
239-
// Rebuild the context
240-
event.RequestContext.CallbackContext = cusCtx
241-
callbackRequest, err := json.Marshal(event)
242-
if err != nil {
243-
return false, err
244-
}
245-
scheResult, err := invokeScheduler.Reschedule(ctx, delay, string(callbackRequest), ids)
246-
if err != nil {
247-
return false, err
248-
}
249-
return scheResult.ComputeLocal, nil
250-
}
251-
25297
// MakeEventFunc is the entry point to all invocations of a custom resource
25398
func makeEventFunc(h Handler) eventFunc {
25499
return func(ctx context.Context, event *event) (response, error) {
255-
platformSession := credentials.SessionFromCredentialsProvider(&event.RequestData.PlatformCredentials)
256-
providerSession := credentials.SessionFromCredentialsProvider(&event.RequestData.ProviderCredentials)
257-
logsProvider, err := logging.NewCloudWatchLogsProvider(
258-
cloudwatchlogs.New(providerSession),
100+
//pls := credentials.SessionFromCredentialsProvider(&event.RequestData.PlatformCredentials)
101+
ps := credentials.SessionFromCredentialsProvider(&event.RequestData.ProviderCredentials)
102+
l, err := logging.NewCloudWatchLogsProvider(
103+
cloudwatchlogs.New(ps),
259104
event.RequestData.ProviderLogGroupName,
260105
)
261-
262106
// Set default logger to output to CWL in the provider account
263-
logging.SetProviderLogOutput(logsProvider)
264-
265-
metricsPublisher := metrics.New(cloudwatch.New(platformSession), event.AWSAccountID, event.ResourceType)
266-
callbackAdapter := callback.New(cloudformation.New(platformSession), event.BearerToken)
267-
invokeScheduler := scheduler.New(cloudwatchevents.New(platformSession))
268-
re := newReportErr(callbackAdapter, metricsPublisher)
269-
107+
logging.SetProviderLogOutput(l)
108+
m := metrics.New(cloudwatch.New(ps), event.AWSAccountID, event.ResourceType)
109+
re := newReportErr(m)
110+
if err := scrubFiles("/tmp"); err != nil {
111+
log.Printf("Error: %v", err)
112+
m.PublishExceptionMetric(time.Now(), event.Action, err)
113+
}
270114
handlerFn, err := router(event.Action, h)
271115
log.Printf("Handler received the %s action", event.Action)
272-
273116
if err != nil {
274117
return re.report(event, "router error", err, serviceInternalError)
275118
}
276-
277119
if err := validateEvent(event); err != nil {
278120
return re.report(event, "validation error", err, invalidRequestError)
279121
}
280-
281-
// If this invocation was triggered by a 're-invoke' CloudWatch Event, clean it up.
282-
if event.RequestContext.CallbackContext != nil {
283-
err := invokeScheduler.CleanupEvents(event.RequestContext.CloudWatchEventsRuleName, event.RequestContext.CloudWatchEventsTargetID)
284-
285-
if err != nil {
286-
// We will log the error in the metric, but carry on.
287-
cfnErr := cfnerr.New(serviceInternalError, "Cloudwatch Event clean up error", err)
288-
metricsPublisher.PublishExceptionMetric(time.Now(), string(event.Action), cfnErr)
289-
}
122+
request := handler.NewRequest(
123+
event.RequestData.LogicalResourceID,
124+
event.CallbackContext,
125+
credentials.SessionFromCredentialsProvider(&event.RequestData.CallerCredentials),
126+
event.RequestData.PreviousResourceProperties,
127+
event.RequestData.ResourceProperties,
128+
)
129+
p := invoke(handlerFn, request, m, event.Action)
130+
r, err := newResponse(&p, event.BearerToken)
131+
if err != nil {
132+
log.Printf("Error creating response: %v", err)
133+
return re.report(event, "Response error", err, unmarshalingError)
290134
}
291-
292-
if len(event.RequestContext.CallbackContext) == 0 || event.RequestContext.Invocation == 0 {
293-
// Acknowledge the task for first time invocation.
294-
if err := callbackAdapter.ReportInitialStatus(); err != nil {
295-
return re.report(event, "callback initial report error", err, serviceInternalError)
296-
}
135+
if !isMutatingAction(event.Action) && r.OperationStatus == handler.InProgress {
136+
return re.report(event, "Response error", errors.New("READ and LIST handlers must return synchronous"), invalidRequestError)
297137
}
138+
return r, nil
139+
}
140+
}
298141

299-
re.setPublishSatus(true)
300-
for {
301-
request := handler.NewRequest(
302-
event.RequestData.LogicalResourceID,
303-
event.RequestContext.CallbackContext,
304-
credentials.SessionFromCredentialsProvider(&event.RequestData.CallerCredentials),
305-
event.RequestData.PreviousResourceProperties,
306-
event.RequestData.ResourceProperties,
307-
)
308-
event.RequestContext.Invocation = event.RequestContext.Invocation + 1
309-
310-
progEvt := processinvoke(handlerFn, event, request, metricsPublisher)
311-
312-
r, err := newResponse(&progEvt, event.BearerToken)
313-
if err != nil {
314-
log.Printf("Error creating response: %v", err)
315-
return re.report(event, "Response error", err, unmarshalingError)
316-
}
317-
318-
if !isMutatingAction(event.Action) && r.OperationStatus == handler.InProgress {
319-
return re.report(event, "Response error", errors.New("READ and LIST handlers must return synchronous"), invalidRequestError)
320-
}
321-
322-
if isMutatingAction(event.Action) {
323-
m, err := encoding.Marshal(progEvt.ResourceModel)
324-
if err != nil {
325-
log.Printf("Error reporting status: %v", err)
326-
return re.report(event, "Error", err, unmarshalingError)
327-
}
328-
callbackAdapter.ReportStatus(translateStatus(progEvt.OperationStatus), m, progEvt.Message, string(r.ErrorCode))
329-
}
330-
331-
switch r.OperationStatus {
332-
case handler.InProgress:
333-
local, err := reschedule(ctx, invokeScheduler, progEvt, event)
334-
335-
if err != nil {
336-
return re.report(event, "Reschedule error", err, serviceInternalError)
337-
}
338-
339-
// If not computing local, exit and return response.
340-
if !local {
341-
return r, nil
342-
}
343-
default:
344-
return r, nil
345-
}
142+
func scrubFiles(dir string) error {
143+
names, err := ioutil.ReadDir(dir)
144+
if err != nil {
145+
return err
146+
}
147+
for _, entery := range names {
148+
os.RemoveAll(path.Join([]string{dir, entery.Name()}...))
149+
}
150+
return nil
151+
}
346152

347-
}
153+
// router decides which handler should be invoked based on the action
154+
// It will return a route or an error depending on the action passed in
155+
func router(a string, h Handler) (handlerFunc, error) {
156+
// Figure out which action was called and have a "catch-all"
157+
switch a {
158+
case createAction:
159+
return h.Create, nil
160+
case readAction:
161+
return h.Read, nil
162+
case updateAction:
163+
return h.Update, nil
164+
case deleteAction:
165+
return h.Delete, nil
166+
case listAction:
167+
return h.List, nil
168+
default:
169+
// No action matched, we should fail and return an InvalidRequestErrorCode
170+
return nil, cfnerr.New(invalidRequestError, "No action/invalid action specified", nil)
348171
}
349172
}
350173

@@ -367,3 +190,40 @@ func makeTestEventFunc(h Handler) testEventFunc {
367190
return progEvt, nil
368191
}
369192
}
193+
194+
// Invoke handles the invocation of the handerFn.
195+
func invoke(handlerFn handlerFunc, request handler.Request, metricsPublisher *metrics.Publisher, action string) handler.ProgressEvent {
196+
197+
// Create a channel to received a signal that work is done.
198+
ch := make(chan handler.ProgressEvent, 1)
199+
200+
// Ask the goroutine to do some work for us.
201+
go func() {
202+
//start the timer
203+
s := time.Now()
204+
metricsPublisher.PublishInvocationMetric(time.Now(), string(action))
205+
206+
// Report the work is done.
207+
pe := handlerFn(request)
208+
log.Printf("Received event: %s\nMessage: %s\n",
209+
pe.OperationStatus,
210+
pe.Message,
211+
)
212+
e := time.Since(s)
213+
metricsPublisher.PublishDurationMetric(time.Now(), string(action), e.Seconds()*1e3)
214+
ch <- pe
215+
}()
216+
return <-ch
217+
}
218+
219+
func isMutatingAction(action string) bool {
220+
switch action {
221+
case createAction:
222+
return true
223+
case updateAction:
224+
return true
225+
case deleteAction:
226+
return true
227+
}
228+
return false
229+
}

0 commit comments

Comments
 (0)