Skip to content

Commit

Permalink
fix decision state machine (#532)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc authored Jul 30, 2018
1 parent 5b2441d commit 4be6b26
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 53 deletions.
3 changes: 2 additions & 1 deletion internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ func Test_TimeoutError(t *testing.T) {
decisionsHelper: newDecisionsHelper(),
dataConverter: newDefaultDataConverter(),
}
h := newDecisionsHelper()
var actualErr error
activityID := "activityID"
context.decisionsHelper.scheduledEventIDToActivityID[5] = activityID
di := newActivityDecisionStateMachine(
di := h.newActivityDecisionStateMachine(
&shared.ScheduleActivityTaskDecisionAttributes{ActivityId: common.StringPtr(activityID)})
di.state = decisionStateInitiated
di.setData(&scheduledActivity{
Expand Down
61 changes: 37 additions & 24 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type (
state decisionState
history []string
data interface{}
helper *decisionsHelper
}

activityDecisionStateMachine struct {
Expand Down Expand Up @@ -207,67 +208,68 @@ func makeDecisionID(decisionType decisionType, id string) decisionID {
return decisionID{decisionType: decisionType, id: id}
}

func newDecisionStateMachineBase(decisionType decisionType, id string) *decisionStateMachineBase {
func (h *decisionsHelper) newDecisionStateMachineBase(decisionType decisionType, id string) *decisionStateMachineBase {
return &decisionStateMachineBase{
id: makeDecisionID(decisionType, id),
state: decisionStateCreated,
history: []string{decisionStateCreated.String()},
helper: h,
}
}

func newActivityDecisionStateMachine(attributes *s.ScheduleActivityTaskDecisionAttributes) *activityDecisionStateMachine {
base := newDecisionStateMachineBase(decisionTypeActivity, attributes.GetActivityId())
func (h *decisionsHelper) newActivityDecisionStateMachine(attributes *s.ScheduleActivityTaskDecisionAttributes) *activityDecisionStateMachine {
base := h.newDecisionStateMachineBase(decisionTypeActivity, attributes.GetActivityId())
return &activityDecisionStateMachine{
decisionStateMachineBase: base,
attributes: attributes,
}
}

func newTimerDecisionStateMachine(attributes *s.StartTimerDecisionAttributes) *timerDecisionStateMachine {
base := newDecisionStateMachineBase(decisionTypeTimer, attributes.GetTimerId())
func (h *decisionsHelper) newTimerDecisionStateMachine(attributes *s.StartTimerDecisionAttributes) *timerDecisionStateMachine {
base := h.newDecisionStateMachineBase(decisionTypeTimer, attributes.GetTimerId())
return &timerDecisionStateMachine{
decisionStateMachineBase: base,
attributes: attributes,
}
}

func newChildWorkflowDecisionStateMachine(attributes *s.StartChildWorkflowExecutionDecisionAttributes) *childWorkflowDecisionStateMachine {
base := newDecisionStateMachineBase(decisionTypeChildWorkflow, attributes.GetWorkflowId())
func (h *decisionsHelper) newChildWorkflowDecisionStateMachine(attributes *s.StartChildWorkflowExecutionDecisionAttributes) *childWorkflowDecisionStateMachine {
base := h.newDecisionStateMachineBase(decisionTypeChildWorkflow, attributes.GetWorkflowId())
return &childWorkflowDecisionStateMachine{
decisionStateMachineBase: base,
attributes: attributes,
}
}

func newNaiveDecisionStateMachine(decisionType decisionType, id string, decision *s.Decision) *naiveDecisionStateMachine {
base := newDecisionStateMachineBase(decisionType, id)
func (h *decisionsHelper) newNaiveDecisionStateMachine(decisionType decisionType, id string, decision *s.Decision) *naiveDecisionStateMachine {
base := h.newDecisionStateMachineBase(decisionType, id)
return &naiveDecisionStateMachine{
decisionStateMachineBase: base,
decision: decision,
}
}

func newMarkerDecisionStateMachine(id string, attributes *s.RecordMarkerDecisionAttributes) *markerDecisionStateMachine {
func (h *decisionsHelper) newMarkerDecisionStateMachine(id string, attributes *s.RecordMarkerDecisionAttributes) *markerDecisionStateMachine {
d := createNewDecision(s.DecisionTypeRecordMarker)
d.RecordMarkerDecisionAttributes = attributes
return &markerDecisionStateMachine{
naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeMarker, id, d),
naiveDecisionStateMachine: h.newNaiveDecisionStateMachine(decisionTypeMarker, id, d),
}
}

func newCancelExternalWorkflowStateMachine(attributes *s.RequestCancelExternalWorkflowExecutionDecisionAttributes, cancellationID string) *cancelExternalWorkflowDecisionStateMachine {
func (h *decisionsHelper) newCancelExternalWorkflowStateMachine(attributes *s.RequestCancelExternalWorkflowExecutionDecisionAttributes, cancellationID string) *cancelExternalWorkflowDecisionStateMachine {
d := createNewDecision(s.DecisionTypeRequestCancelExternalWorkflowExecution)
d.RequestCancelExternalWorkflowExecutionDecisionAttributes = attributes
return &cancelExternalWorkflowDecisionStateMachine{
naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeCancellation, cancellationID, d),
naiveDecisionStateMachine: h.newNaiveDecisionStateMachine(decisionTypeCancellation, cancellationID, d),
}
}

func newSignalExternalWorkflowStateMachine(attributes *s.SignalExternalWorkflowExecutionDecisionAttributes, signalID string) *signalExternalWorkflowDecisionStateMachine {
func (h *decisionsHelper) newSignalExternalWorkflowStateMachine(attributes *s.SignalExternalWorkflowExecutionDecisionAttributes, signalID string) *signalExternalWorkflowDecisionStateMachine {
d := createNewDecision(s.DecisionTypeSignalExternalWorkflowExecution)
d.SignalExternalWorkflowExecutionDecisionAttributes = attributes
return &signalExternalWorkflowDecisionStateMachine{
naiveDecisionStateMachine: newNaiveDecisionStateMachine(decisionTypeSignal, signalID, d),
naiveDecisionStateMachine: h.newNaiveDecisionStateMachine(decisionTypeSignal, signalID, d),
}
}

Expand Down Expand Up @@ -295,6 +297,13 @@ func (d *decisionStateMachineBase) moveState(newState decisionState, event strin
d.history = append(d.history, event)
d.state = newState
d.history = append(d.history, newState.String())

if newState == decisionStateCompleted {
if elem, ok := d.helper.decisions[d.getID()]; ok {
d.helper.orderedDecisions.Remove(elem)
delete(d.helper.decisions, d.getID())
}
}
}

func (d *decisionStateMachineBase) failStateTransition(event string) {
Expand Down Expand Up @@ -660,12 +669,16 @@ func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine {
}

func (h *decisionsHelper) addDecision(decision decisionStateMachine) {
if _, ok := h.decisions[decision.getID()]; ok {
panicMsg := fmt.Sprintf("adding duplicate decision %v", decision)
panic(panicMsg)
}
element := h.orderedDecisions.PushBack(decision)
h.decisions[decision.getID()] = element
}

func (h *decisionsHelper) scheduleActivityTask(attributes *s.ScheduleActivityTaskDecisionAttributes) decisionStateMachine {
decision := newActivityDecisionStateMachine(attributes)
decision := h.newActivityDecisionStateMachine(attributes)
h.addDecision(decision)
return decision
}
Expand Down Expand Up @@ -739,7 +752,7 @@ func (h *decisionsHelper) recordVersionMarker(changeID string, version Version,
Details: details, // Keep
}

decision := newMarkerDecisionStateMachine(markerID, recordMarker)
decision := h.newMarkerDecisionStateMachine(markerID, recordMarker)
h.addDecision(decision)
return decision
}
Expand All @@ -750,7 +763,7 @@ func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int32, data []byte
MarkerName: common.StringPtr(sideEffectMarkerName),
Details: data,
}
decision := newMarkerDecisionStateMachine(markerID, attributes)
decision := h.newMarkerDecisionStateMachine(markerID, attributes)
h.addDecision(decision)
return decision
}
Expand All @@ -761,7 +774,7 @@ func (h *decisionsHelper) recordLocalActivityMarker(activityID string, result []
MarkerName: common.StringPtr(localActivityMarkerName),
Details: result,
}
decision := newMarkerDecisionStateMachine(markerID, attributes)
decision := h.newMarkerDecisionStateMachine(markerID, attributes)
h.addDecision(decision)
return decision
}
Expand All @@ -772,13 +785,13 @@ func (h *decisionsHelper) recordMutableSideEffectMarker(mutableSideEffectID stri
MarkerName: common.StringPtr(mutableSideEffectMarkerName),
Details: data,
}
decision := newMarkerDecisionStateMachine(markerID, attributes)
decision := h.newMarkerDecisionStateMachine(markerID, attributes)
h.addDecision(decision)
return decision
}

func (h *decisionsHelper) startChildWorkflowExecution(attributes *s.StartChildWorkflowExecutionDecisionAttributes) decisionStateMachine {
decision := newChildWorkflowDecisionStateMachine(attributes)
decision := h.newChildWorkflowDecisionStateMachine(attributes)
h.addDecision(decision)
return decision
}
Expand Down Expand Up @@ -833,7 +846,7 @@ func (h *decisionsHelper) requestCancelExternalWorkflowExecution(domain, workflo
Control: []byte(cancellationID),
ChildWorkflowOnly: common.BoolPtr(false),
}
decision := newCancelExternalWorkflowStateMachine(attributes, cancellationID)
decision := h.newCancelExternalWorkflowStateMachine(attributes, cancellationID)
h.addDecision(decision)

return decision
Expand Down Expand Up @@ -893,7 +906,7 @@ func (h *decisionsHelper) signalExternalWorkflowExecution(domain, workflowID, ru
Control: []byte(signalID),
ChildWorkflowOnly: common.BoolPtr(childWorkflowOnly),
}
decision := newSignalExternalWorkflowStateMachine(attributes, signalID)
decision := h.newSignalExternalWorkflowStateMachine(attributes, signalID)
h.addDecision(decision)
return decision
}
Expand Down Expand Up @@ -925,7 +938,7 @@ func (h *decisionsHelper) getSignalID(initiatedEventID int64) string {
}

func (h *decisionsHelper) startTimer(attributes *s.StartTimerDecisionAttributes) decisionStateMachine {
decision := newTimerDecisionStateMachine(attributes)
decision := h.newTimerDecisionStateMachine(attributes)
h.addDecision(decision)
return decision
}
Expand Down
12 changes: 6 additions & 6 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,9 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
event *m.HistoryEvent,
isReplay bool,
isLast bool,
) (result []*m.Decision, err error) {
) (err error) {
if event == nil {
return nil, errors.New("nil event provided")
return errors.New("nil event provided")
}
defer func() {
if p := recover(); p != nil {
Expand Down Expand Up @@ -746,7 +746,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
}

if err != nil {
return nil, err
return err
}

// When replaying histories to get stack trace or current state the last event might be not
Expand All @@ -756,7 +756,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
weh.workflowDefinition.OnDecisionTaskStarted()
}

return weh.decisionsHelper.getDecisions(true), nil
return nil
}

func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(queryType string, queryArgs []byte) ([]byte, error) {
Expand Down Expand Up @@ -928,7 +928,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(markerDa
return nil
}

func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *localActivityResult) ([]*m.Decision, error) {
func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *localActivityResult) error {
// convert local activity result and error to marker data
lamd := localActivityMarkerData{
ActivityID: lar.task.activityID,
Expand All @@ -945,7 +945,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *lo
// encode marker data
markerData, err := weh.encodeArg(lamd)
if err != nil {
return nil, err
return err
}

// create marker event for local activity result
Expand Down
50 changes: 28 additions & 22 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type (
workflowExecutionEventHandler interface {
// Process a single event and return the assosciated decisions.
// Return List of decisions made, any error.
ProcessEvent(event *s.HistoryEvent, isReplay bool, isLast bool) ([]*s.Decision, error)
ProcessEvent(event *s.HistoryEvent, isReplay bool, isLast bool) error
// ProcessQuery process a query request.
ProcessQuery(queryType string, queryArgs []byte) ([]byte, error)
StackTrace() string
Expand Down Expand Up @@ -293,8 +293,7 @@ OrderEvents:
}

func isPreloadMarkerEvent(event *s.HistoryEvent) bool {
return event.GetEventType() == s.EventTypeMarkerRecorded &&
event.MarkerRecordedEventAttributes.GetMarkerName() != localActivityMarkerName
return event.GetEventType() == s.EventTypeMarkerRecorded
}

// newWorkflowTaskHandler returns an implementation of workflow task handler.
Expand Down Expand Up @@ -653,9 +652,12 @@ ProcessEvents:
}
// Markers are from the events that are produced from the current decision
for _, m := range markers {
_, err := eventHandler.ProcessEvent(m, true, false)
if err != nil {
return nil, err
if m.MarkerRecordedEventAttributes.GetMarkerName() != localActivityMarkerName {
// local activity marker needs to be applied after decision task started event
err := eventHandler.ProcessEvent(m, true, false)
if err != nil {
return nil, err
}
}
}

Expand All @@ -672,23 +674,26 @@ ProcessEvents:
return nil, err
}

eventDecisions, err := eventHandler.ProcessEvent(event, isInReplay, isLast)
err = eventHandler.ProcessEvent(event, isInReplay, isLast)
if err != nil {
return nil, err
}
}

if eventDecisions != nil {
if !isInReplay {
w.newDecisions = append(w.newDecisions, eventDecisions...)
} else if !skipReplayCheck {
replayDecisions = append(replayDecisions, eventDecisions...)
// now apply local activity markers
for _, m := range markers {
if m.MarkerRecordedEventAttributes.GetMarkerName() == localActivityMarkerName {
err := eventHandler.ProcessEvent(m, true, false)
if err != nil {
return nil, err
}
}

if w.isWorkflowCompleted {
// If workflow is already completed then we can break from processing
// further decisions.
break ProcessEvents
}
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
if isReplay {
eventDecisions := eventHandler.decisionsHelper.getDecisions(true)
if len(eventDecisions) > 0 && !skipReplayCheck {
replayDecisions = append(replayDecisions, eventDecisions...)
}
}
}
Expand Down Expand Up @@ -728,15 +733,11 @@ ProcessEvents:
}

func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(lar *localActivityResult) (interface{}, error) {
eventDecisions, err := w.eventHandler.ProcessLocalActivityResult(lar)
err := w.eventHandler.ProcessLocalActivityResult(lar)
if err != nil {
return nil, err
}

if eventDecisions != nil {
w.newDecisions = append(w.newDecisions, eventDecisions...)
}

return w.CompleteDecisionTask(true), nil
}

Expand All @@ -760,6 +761,11 @@ func (w *workflowExecutionContextImpl) CompleteDecisionTask(waitLocalActivities
}
}

eventDecisions := w.eventHandler.decisionsHelper.getDecisions(true)
if len(eventDecisions) > 0 {
w.newDecisions = append(w.newDecisions, eventDecisions...)
}

completeRequest := w.wth.completeWorkflow(w.eventHandler, w.currentDecisionTask, w, w.newDecisions, !waitLocalActivities)
w.clearCurrentTask()

Expand Down

0 comments on commit 4be6b26

Please sign in to comment.