diff --git a/internal/error_test.go b/internal/error_test.go index 529c8260c..c94b07bbf 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -98,10 +98,11 @@ func Test_TimeoutError(t *testing.T) { decisionsHelper: newDecisionsHelper(), dataConverter: newDefaultDataConverter(), } + h := newDecisionsHelper() var actualErr error activityID := "activityID" context.decisionsHelper.scheduledEventIDToActivityID[5] = activityID - di := newActivityDecisionStateMachine( + di := h.newActivityDecisionStateMachine( &shared.ScheduleActivityTaskDecisionAttributes{ActivityId: common.StringPtr(activityID)}) di.state = decisionStateInitiated di.setData(&scheduledActivity{ diff --git a/internal/internal_decision_state_machine.go b/internal/internal_decision_state_machine.go index 3c9bb6311..97938fadf 100644 --- a/internal/internal_decision_state_machine.go +++ b/internal/internal_decision_state_machine.go @@ -65,6 +65,7 @@ type ( state decisionState history []string data interface{} + helper *decisionsHelper } activityDecisionStateMachine struct { @@ -207,67 +208,68 @@ func makeDecisionID(decisionType decisionType, id string) decisionID { return decisionID{decisionType: decisionType, id: id} } -func newDecisionStateMachineBase(decisionType decisionType, id string) *decisionStateMachineBase { +func (h *decisionsHelper) newDecisionStateMachineBase(decisionType decisionType, id string) *decisionStateMachineBase { return &decisionStateMachineBase{ id: makeDecisionID(decisionType, id), state: decisionStateCreated, history: []string{decisionStateCreated.String()}, + helper: h, } } -func newActivityDecisionStateMachine(attributes *s.ScheduleActivityTaskDecisionAttributes) *activityDecisionStateMachine { - base := newDecisionStateMachineBase(decisionTypeActivity, attributes.GetActivityId()) +func (h *decisionsHelper) newActivityDecisionStateMachine(attributes *s.ScheduleActivityTaskDecisionAttributes) *activityDecisionStateMachine { + base := h.newDecisionStateMachineBase(decisionTypeActivity, attributes.GetActivityId()) return &activityDecisionStateMachine{ decisionStateMachineBase: base, attributes: attributes, } } -func newTimerDecisionStateMachine(attributes *s.StartTimerDecisionAttributes) *timerDecisionStateMachine { - base := newDecisionStateMachineBase(decisionTypeTimer, attributes.GetTimerId()) +func (h *decisionsHelper) newTimerDecisionStateMachine(attributes *s.StartTimerDecisionAttributes) *timerDecisionStateMachine { + base := h.newDecisionStateMachineBase(decisionTypeTimer, attributes.GetTimerId()) return &timerDecisionStateMachine{ decisionStateMachineBase: base, attributes: attributes, } } -func newChildWorkflowDecisionStateMachine(attributes *s.StartChildWorkflowExecutionDecisionAttributes) *childWorkflowDecisionStateMachine { - base := newDecisionStateMachineBase(decisionTypeChildWorkflow, attributes.GetWorkflowId()) +func (h *decisionsHelper) newChildWorkflowDecisionStateMachine(attributes *s.StartChildWorkflowExecutionDecisionAttributes) *childWorkflowDecisionStateMachine { + base := h.newDecisionStateMachineBase(decisionTypeChildWorkflow, attributes.GetWorkflowId()) return &childWorkflowDecisionStateMachine{ decisionStateMachineBase: base, attributes: attributes, } } -func newNaiveDecisionStateMachine(decisionType decisionType, id string, decision *s.Decision) *naiveDecisionStateMachine { - base := newDecisionStateMachineBase(decisionType, id) +func (h *decisionsHelper) newNaiveDecisionStateMachine(decisionType decisionType, id string, decision *s.Decision) *naiveDecisionStateMachine { + base := h.newDecisionStateMachineBase(decisionType, id) return &naiveDecisionStateMachine{ decisionStateMachineBase: base, decision: decision, } } -func newMarkerDecisionStateMachine(id string, attributes *s.RecordMarkerDecisionAttributes) *markerDecisionStateMachine { +func (h *decisionsHelper) newMarkerDecisionStateMachine(id string, attributes *s.RecordMarkerDecisionAttributes) *markerDecisionStateMachine { d := createNewDecision(s.DecisionTypeRecordMarker) d.RecordMarkerDecisionAttributes = attributes return &markerDecisionStateMachine{ - naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeMarker, id, d), + naiveDecisionStateMachine: h.newNaiveDecisionStateMachine(decisionTypeMarker, id, d), } } -func newCancelExternalWorkflowStateMachine(attributes *s.RequestCancelExternalWorkflowExecutionDecisionAttributes, cancellationID string) *cancelExternalWorkflowDecisionStateMachine { +func (h *decisionsHelper) newCancelExternalWorkflowStateMachine(attributes *s.RequestCancelExternalWorkflowExecutionDecisionAttributes, cancellationID string) *cancelExternalWorkflowDecisionStateMachine { d := createNewDecision(s.DecisionTypeRequestCancelExternalWorkflowExecution) d.RequestCancelExternalWorkflowExecutionDecisionAttributes = attributes return &cancelExternalWorkflowDecisionStateMachine{ - naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeCancellation, cancellationID, d), + naiveDecisionStateMachine: h.newNaiveDecisionStateMachine(decisionTypeCancellation, cancellationID, d), } } -func newSignalExternalWorkflowStateMachine(attributes *s.SignalExternalWorkflowExecutionDecisionAttributes, signalID string) *signalExternalWorkflowDecisionStateMachine { +func (h *decisionsHelper) newSignalExternalWorkflowStateMachine(attributes *s.SignalExternalWorkflowExecutionDecisionAttributes, signalID string) *signalExternalWorkflowDecisionStateMachine { d := createNewDecision(s.DecisionTypeSignalExternalWorkflowExecution) d.SignalExternalWorkflowExecutionDecisionAttributes = attributes return &signalExternalWorkflowDecisionStateMachine{ - naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeSignal, signalID, d), + naiveDecisionStateMachine: h.newNaiveDecisionStateMachine(decisionTypeSignal, signalID, d), } } @@ -295,6 +297,13 @@ func (d *decisionStateMachineBase) moveState(newState decisionState, event strin d.history = append(d.history, event) d.state = newState d.history = append(d.history, newState.String()) + + if newState == decisionStateCompleted { + if elem, ok := d.helper.decisions[d.getID()]; ok { + d.helper.orderedDecisions.Remove(elem) + delete(d.helper.decisions, d.getID()) + } + } } func (d *decisionStateMachineBase) failStateTransition(event string) { @@ -660,12 +669,16 @@ func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine { } func (h *decisionsHelper) addDecision(decision decisionStateMachine) { + if _, ok := h.decisions[decision.getID()]; ok { + panicMsg := fmt.Sprintf("adding duplicate decision %v", decision) + panic(panicMsg) + } element := h.orderedDecisions.PushBack(decision) h.decisions[decision.getID()] = element } func (h *decisionsHelper) scheduleActivityTask(attributes *s.ScheduleActivityTaskDecisionAttributes) decisionStateMachine { - decision := newActivityDecisionStateMachine(attributes) + decision := h.newActivityDecisionStateMachine(attributes) h.addDecision(decision) return decision } @@ -739,7 +752,7 @@ func (h *decisionsHelper) recordVersionMarker(changeID string, version Version, Details: details, // Keep } - decision := newMarkerDecisionStateMachine(markerID, recordMarker) + decision := h.newMarkerDecisionStateMachine(markerID, recordMarker) h.addDecision(decision) return decision } @@ -750,7 +763,7 @@ func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int32, data []byte MarkerName: common.StringPtr(sideEffectMarkerName), Details: data, } - decision := newMarkerDecisionStateMachine(markerID, attributes) + decision := h.newMarkerDecisionStateMachine(markerID, attributes) h.addDecision(decision) return decision } @@ -761,7 +774,7 @@ func (h *decisionsHelper) recordLocalActivityMarker(activityID string, result [] MarkerName: common.StringPtr(localActivityMarkerName), Details: result, } - decision := newMarkerDecisionStateMachine(markerID, attributes) + decision := h.newMarkerDecisionStateMachine(markerID, attributes) h.addDecision(decision) return decision } @@ -772,13 +785,13 @@ func (h *decisionsHelper) recordMutableSideEffectMarker(mutableSideEffectID stri MarkerName: common.StringPtr(mutableSideEffectMarkerName), Details: data, } - decision := newMarkerDecisionStateMachine(markerID, attributes) + decision := h.newMarkerDecisionStateMachine(markerID, attributes) h.addDecision(decision) return decision } func (h *decisionsHelper) startChildWorkflowExecution(attributes *s.StartChildWorkflowExecutionDecisionAttributes) decisionStateMachine { - decision := newChildWorkflowDecisionStateMachine(attributes) + decision := h.newChildWorkflowDecisionStateMachine(attributes) h.addDecision(decision) return decision } @@ -833,7 +846,7 @@ func (h *decisionsHelper) requestCancelExternalWorkflowExecution(domain, workflo Control: []byte(cancellationID), ChildWorkflowOnly: common.BoolPtr(false), } - decision := newCancelExternalWorkflowStateMachine(attributes, cancellationID) + decision := h.newCancelExternalWorkflowStateMachine(attributes, cancellationID) h.addDecision(decision) return decision @@ -893,7 +906,7 @@ func (h *decisionsHelper) signalExternalWorkflowExecution(domain, workflowID, ru Control: []byte(signalID), ChildWorkflowOnly: common.BoolPtr(childWorkflowOnly), } - decision := newSignalExternalWorkflowStateMachine(attributes, signalID) + decision := h.newSignalExternalWorkflowStateMachine(attributes, signalID) h.addDecision(decision) return decision } @@ -925,7 +938,7 @@ func (h *decisionsHelper) getSignalID(initiatedEventID int64) string { } func (h *decisionsHelper) startTimer(attributes *s.StartTimerDecisionAttributes) decisionStateMachine { - decision := newTimerDecisionStateMachine(attributes) + decision := h.newTimerDecisionStateMachine(attributes) h.addDecision(decision) return decision } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 430c45eec..1e44061bb 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -593,9 +593,9 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( event *m.HistoryEvent, isReplay bool, isLast bool, -) (result []*m.Decision, err error) { +) (err error) { if event == nil { - return nil, errors.New("nil event provided") + return errors.New("nil event provided") } defer func() { if p := recover(); p != nil { @@ -746,7 +746,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( } if err != nil { - return nil, err + return err } // When replaying histories to get stack trace or current state the last event might be not @@ -756,7 +756,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( weh.workflowDefinition.OnDecisionTaskStarted() } - return weh.decisionsHelper.getDecisions(true), nil + return nil } func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(queryType string, queryArgs []byte) ([]byte, error) { @@ -928,7 +928,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(markerDa return nil } -func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *localActivityResult) ([]*m.Decision, error) { +func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *localActivityResult) error { // convert local activity result and error to marker data lamd := localActivityMarkerData{ ActivityID: lar.task.activityID, @@ -945,7 +945,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *lo // encode marker data markerData, err := weh.encodeArg(lamd) if err != nil { - return nil, err + return err } // create marker event for local activity result diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 7395e42ac..22f612deb 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -54,7 +54,7 @@ type ( workflowExecutionEventHandler interface { // Process a single event and return the assosciated decisions. // Return List of decisions made, any error. - ProcessEvent(event *s.HistoryEvent, isReplay bool, isLast bool) ([]*s.Decision, error) + ProcessEvent(event *s.HistoryEvent, isReplay bool, isLast bool) error // ProcessQuery process a query request. ProcessQuery(queryType string, queryArgs []byte) ([]byte, error) StackTrace() string @@ -293,8 +293,7 @@ OrderEvents: } func isPreloadMarkerEvent(event *s.HistoryEvent) bool { - return event.GetEventType() == s.EventTypeMarkerRecorded && - event.MarkerRecordedEventAttributes.GetMarkerName() != localActivityMarkerName + return event.GetEventType() == s.EventTypeMarkerRecorded } // newWorkflowTaskHandler returns an implementation of workflow task handler. @@ -653,9 +652,12 @@ ProcessEvents: } // Markers are from the events that are produced from the current decision for _, m := range markers { - _, err := eventHandler.ProcessEvent(m, true, false) - if err != nil { - return nil, err + if m.MarkerRecordedEventAttributes.GetMarkerName() != localActivityMarkerName { + // local activity marker needs to be applied after decision task started event + err := eventHandler.ProcessEvent(m, true, false) + if err != nil { + return nil, err + } } } @@ -672,23 +674,26 @@ ProcessEvents: return nil, err } - eventDecisions, err := eventHandler.ProcessEvent(event, isInReplay, isLast) + err = eventHandler.ProcessEvent(event, isInReplay, isLast) if err != nil { return nil, err } + } - if eventDecisions != nil { - if !isInReplay { - w.newDecisions = append(w.newDecisions, eventDecisions...) - } else if !skipReplayCheck { - replayDecisions = append(replayDecisions, eventDecisions...) + // now apply local activity markers + for _, m := range markers { + if m.MarkerRecordedEventAttributes.GetMarkerName() == localActivityMarkerName { + err := eventHandler.ProcessEvent(m, true, false) + if err != nil { + return nil, err } } - - if w.isWorkflowCompleted { - // If workflow is already completed then we can break from processing - // further decisions. - break ProcessEvents + } + isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) + if isReplay { + eventDecisions := eventHandler.decisionsHelper.getDecisions(true) + if len(eventDecisions) > 0 && !skipReplayCheck { + replayDecisions = append(replayDecisions, eventDecisions...) } } } @@ -728,15 +733,11 @@ ProcessEvents: } func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(lar *localActivityResult) (interface{}, error) { - eventDecisions, err := w.eventHandler.ProcessLocalActivityResult(lar) + err := w.eventHandler.ProcessLocalActivityResult(lar) if err != nil { return nil, err } - if eventDecisions != nil { - w.newDecisions = append(w.newDecisions, eventDecisions...) - } - return w.CompleteDecisionTask(true), nil } @@ -760,6 +761,11 @@ func (w *workflowExecutionContextImpl) CompleteDecisionTask(waitLocalActivities } } + eventDecisions := w.eventHandler.decisionsHelper.getDecisions(true) + if len(eventDecisions) > 0 { + w.newDecisions = append(w.newDecisions, eventDecisions...) + } + completeRequest := w.wth.completeWorkflow(w.eventHandler, w.currentDecisionTask, w, w.newDecisions, !waitLocalActivities) w.clearCurrentTask()