Skip to content

Commit 01756b3

Browse files
Merge pull request #7497 from deutschebank/db-contrib/waltz-7488-maker-checker-workflow-enhancements-v1.81.0
Db contrib/waltz 7488 maker checker workflow enhancements v1.81.0
2 parents 7e551a6 + 5280100 commit 01756b3

28 files changed

Lines changed: 504 additions & 179 deletions

File tree

waltz-data/src/main/java/org/finos/waltz/data/entity_workflow/EntityWorkflowStateDao.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class EntityWorkflowStateDao {
5555
.lastUpdatedAt(r.getLastUpdatedAt().toLocalDateTime())
5656
.lastUpdatedBy(r.getLastUpdatedBy())
5757
.provenance(r.getProvenance())
58+
.version(r.getVersion())
5859
.build();
5960
};
6061

@@ -94,15 +95,17 @@ public void createWorkflowState(Long workflowDefId,
9495
stateRecord.insert();
9596
}
9697

97-
public long updateState(Long workflowDefId, EntityReference ref, String user, String workflowState) {
98+
public long updateState(Long workflowDefId, EntityReference ref, String user, EntityWorkflowState workflowState) {
9899
return dsl
99100
.update(ENTITY_WORKFLOW_STATE)
100-
.set(ENTITY_WORKFLOW_STATE.STATE, workflowState)
101+
.set(ENTITY_WORKFLOW_STATE.STATE, workflowState.state())
101102
.set(ENTITY_WORKFLOW_STATE.LAST_UPDATED_AT, Timestamp.valueOf(nowUtc()))
102103
.set(ENTITY_WORKFLOW_STATE.LAST_UPDATED_BY, user)
104+
.set(ENTITY_WORKFLOW_STATE.VERSION, (workflowState.version() + 1L))
103105
.where(ENTITY_WORKFLOW_STATE.WORKFLOW_ID.eq(workflowDefId)
104106
.and(ENTITY_WORKFLOW_STATE.ENTITY_ID.eq(ref.id()))
105107
.and(ENTITY_WORKFLOW_STATE.ENTITY_KIND.eq(ref.kind().name())))
108+
.and(ENTITY_WORKFLOW_STATE.VERSION.eq(workflowState.version()))
106109
.execute();
107110
}
108111
}

waltz-data/src/main/java/org/finos/waltz/data/proposed_flow/ProposedFlowDao.java

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import org.jooq.CommonTableExpression;
2323
import org.jooq.Condition;
2424
import org.jooq.DSLContext;
25-
import org.jooq.Field;
26-
import org.jooq.Record;
2725
import org.jooq.Record;
2826
import org.jooq.Record2;
2927
import org.jooq.Record3;
@@ -33,7 +31,6 @@
3331
import org.jooq.SelectUnionStep;
3432
import org.jooq.Table;
3533
import org.jooq.TableField;
36-
import org.jooq.impl.DSL;
3734
import org.slf4j.Logger;
3835
import org.slf4j.LoggerFactory;
3936
import org.springframework.beans.factory.annotation.Autowired;
@@ -114,10 +111,22 @@ public Long saveProposedFlow(String username, ProposedFlowCommand proposedFlowCo
114111
proposedFlowRecord.setTargetEntityId(proposedFlowCommand.target().id());
115112
proposedFlowRecord.setTargetEntityKind(proposedFlowCommand.target().kind().name());
116113
proposedFlowRecord.setProposalType(proposedFlowCommand.proposalType().name());
114+
proposedFlowCommand.logicalFlowId().ifPresent(proposedFlowRecord::setLogicalFlowId);
115+
proposedFlowCommand.physicalFlowId().ifPresent(proposedFlowRecord::setPhysicalFlowId);
116+
proposedFlowCommand.specification().id().ifPresent(proposedFlowRecord::setSpecificationId);
117117
proposedFlowRecord.store();
118118
return proposedFlowRecord.getId();
119119
}
120120

121+
public int updateLogicalFlowPhysicalFlowAndSpecIdsInProposedFlowRecord(long proposedFlowId, Long logicalFlowId, Long physicalFlowId, Long specificationId) {
122+
return dsl.update(PROPOSED_FLOW)
123+
.set(PROPOSED_FLOW.LOGICAL_FLOW_ID, logicalFlowId)
124+
.set(PROPOSED_FLOW.PHYSICAL_FLOW_ID, physicalFlowId)
125+
.set(PROPOSED_FLOW.SPECIFICATION_ID, specificationId)
126+
.where(PROPOSED_FLOW.ID.eq(proposedFlowId))
127+
.execute();
128+
}
129+
121130
public ProposedFlowResponse getProposedFlowResponseById(long id) {
122131
ProposedFlowRecord proposedFlowRecord = getProposedFlowById(id);
123132
checkNotNull(proposedFlowRecord, format("ProposedFlow not found: %d", proposedFlowRecord.getId()));
@@ -138,17 +147,17 @@ public ProposedFlowResponse getProposedFlowResponseById(long id) {
138147
.stream()
139148
.filter(e -> e.kind().equals(LOGICAL_DATA_FLOW))
140149
.findFirst()
141-
.map(EntityReference::id).orElse(flowDefinition.logicalFlowId().orElse(null)))
150+
.map(EntityReference::id).orElse(proposedFlowRecord.getLogicalFlowId()))
142151
.physicalFlowId(entityWorkflowView.entityWorkflowResultList()
143152
.stream()
144153
.filter(e -> e.kind().equals(PHYSICAL_FLOW))
145154
.findFirst()
146-
.map(EntityReference::id).orElse(flowDefinition.physicalFlowId().orElse(null)))
155+
.map(EntityReference::id).orElse(proposedFlowRecord.getPhysicalFlowId()))
147156
.specificationId(entityWorkflowView.entityWorkflowResultList()
148157
.stream()
149158
.filter(e -> e.kind().equals(PHYSICAL_SPECIFICATION))
150159
.findFirst()
151-
.map(EntityReference::id).orElse(flowDefinition.specification().id().orElse(null)))
160+
.map(EntityReference::id).orElse(proposedFlowRecord.getSpecificationId()))
152161
.build();
153162

154163
} catch (JsonProcessingException e) {
@@ -427,25 +436,8 @@ public Set<Long> findPhysicalFlowIdsInPendingProposals(Set<Long> logicalFlowIds,
427436
return Collections.emptySet();
428437
}
429438

430-
// Field representing the 'logicalFlowId' extracted from the JSON
431-
Field<Long> logicalFlowIdField = DSL.field(
432-
"JSON_VALUE({0}, {1})",
433-
Long.class,
434-
PROPOSED_FLOW.FLOW_DEF,
435-
DSL.val("$.logicalFlowId"));
436-
437-
// Condition to filter proposals based on the logicalFlowId
438-
Condition logicalFlowMatch = logicalFlowIdField.in(logicalFlowIds);
439-
440-
// Field representing the 'physicalFlowId' extracted from the JSON
441-
Field<Long> physicalFlowIdField = DSL.field(
442-
"JSON_VALUE({0}, {1})",
443-
Long.class,
444-
PROPOSED_FLOW.FLOW_DEF,
445-
DSL.val("$.physicalFlowId"));
446-
447439
return dsl
448-
.selectDistinct(physicalFlowIdField)
440+
.selectDistinct(PROPOSED_FLOW.PHYSICAL_FLOW_ID)
449441
.from(PROPOSED_FLOW)
450442
.join(ENTITY_WORKFLOW_STATE).on(PROPOSED_FLOW.ID.eq(ENTITY_WORKFLOW_STATE.ENTITY_ID)
451443
.and(ENTITY_WORKFLOW_STATE.WORKFLOW_ID.eq(workflowId))
@@ -454,10 +446,9 @@ public Set<Long> findPhysicalFlowIdsInPendingProposals(Set<Long> logicalFlowIds,
454446
ProposalType.EDIT.name(),
455447
ProposalType.DELETE.name()))
456448
.and(ENTITY_WORKFLOW_STATE.STATE.notIn(END_STATES))
457-
.and(logicalFlowMatch)
458-
// Use the templated field for the IS NOT NULL check as well
459-
.and(physicalFlowIdField.isNotNull())
460-
.fetchSet(physicalFlowIdField);
449+
.and(PROPOSED_FLOW.LOGICAL_FLOW_ID.in(logicalFlowIds))
450+
.and(PROPOSED_FLOW.PHYSICAL_FLOW_ID.isNotNull())
451+
.fetchSet(PROPOSED_FLOW.PHYSICAL_FLOW_ID);
461452
}
462453

463454
private Condition getProposalTypeCondition(ProposalType proposalType) {

waltz-integration-test/src/test/java/org/finos/waltz/integration_test/inmem/service/ProposedFlowWorkflowServiceTest.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,19 @@
6868
import org.jooq.DSLContext;
6969
import org.junit.jupiter.api.BeforeEach;
7070
import org.junit.jupiter.api.Test;
71+
import org.slf4j.Logger;
72+
import org.slf4j.LoggerFactory;
7173
import org.springframework.beans.factory.annotation.Autowired;
7274

7375
import java.time.LocalDateTime;
76+
import java.util.ArrayList;
77+
import java.util.List;
7478
import java.util.Optional;
7579
import java.util.Set;
80+
import java.util.concurrent.CountDownLatch;
81+
import java.util.concurrent.TimeUnit;
7682

83+
import static java.lang.String.format;
7784
import static org.finos.waltz.common.DateTimeUtilities.nowUtc;
7885
import static org.finos.waltz.model.EntityKind.APPLICATION;
7986
import static org.finos.waltz.model.EntityReference.mkRef;
@@ -102,6 +109,8 @@
102109

103110
public class ProposedFlowWorkflowServiceTest extends BaseInMemoryIntegrationTest {
104111

112+
private static final Logger LOG = LoggerFactory.getLogger(ProposedFlowWorkflowServiceTest.class);
113+
105114
private final String USER_NAME = "testUser";
106115

107116
@Autowired
@@ -652,4 +661,129 @@ public void validateProposedFlow_shouldPassIfFlowAttributeIsDifferent() {
652661
// 3. Assert: Validation should pass
653662
assertNull(validationResponse, "Validation should pass when a flow attribute (e.g., frequency) is different");
654663
}
664+
665+
@Test
666+
public void validateNoConcurrentActionsOnProposedFlows() {
667+
/*
668+
Step 1. Create a flow, with the user as source and target approver
669+
Step 2. Parallelly execute approval actions
670+
Step 3. One Action succeeds, the other fails
671+
*/
672+
673+
String testStem = mkName("validateNoConcurrentActionsOnProposedFlows");
674+
ProposedFlowCommandResponse proposedFlowCommandResponse = proposedFlowWorkflowService.proposeNewFlow(testStem, baseCreateCommand);
675+
676+
677+
Long personA = personHelper.createPerson(testStem);
678+
679+
long involvementKind = involvementHelper.mkInvolvementKind("_rel");
680+
involvementHelper.createInvolvement(personA, involvementKind, baseCreateCommand.source());
681+
involvementHelper.createInvolvement(personA, involvementKind, baseCreateCommand.target());
682+
683+
InvolvementGroupRecord ig = permissionHelper.setupInvolvementGroup(involvementKind, "_rel_ig");
684+
685+
permissionHelper.setupPermissionGroupForProposedFlow(baseCreateCommand.source(), ig, "_rel_pg", Operation.APPROVE);
686+
permissionHelper.setupPermissionGroupForProposedFlow(baseCreateCommand.target(), ig, "_rel_pg", Operation.APPROVE);
687+
688+
List<ProposedFlowResponse> failures = new ArrayList<>();
689+
List<ProposedFlowResponse> responses = new ArrayList<>();
690+
691+
// Run actions in parallel
692+
CountDownLatch start = new CountDownLatch(1);
693+
CountDownLatch end = new CountDownLatch(2);
694+
695+
Runnable approveTaskA = () -> mkRunnable(proposedFlowCommandResponse.proposedFlowId(), testStem, responses, failures, start, end);
696+
Runnable approveTaskB = () -> mkRunnable(proposedFlowCommandResponse.proposedFlowId(), testStem, responses, failures, start, end);
697+
new Thread(approveTaskA, "task_A").start();
698+
new Thread(approveTaskB, "task_B").start();
699+
700+
try {
701+
start.countDown();
702+
assertTrue(end.await(10, TimeUnit.SECONDS), "Concurrent approve tasks did not finish in time");
703+
} catch (InterruptedException e) {
704+
LOG.error("Thread interrupted", e);
705+
Thread.currentThread().interrupt();
706+
}
707+
708+
assertEquals(1, responses.size());
709+
assertEquals(1, failures.size());
710+
711+
// further non-concurrent action should lead to a fully approved state
712+
ProposedFlowResponse fullyApprovedActionResponse = proposedFlowWorkflowService.proposedFlowAction(proposedFlowCommandResponse.proposedFlowId(),
713+
APPROVE,
714+
testStem,
715+
ImmutableProposedFlowActionCommand
716+
.builder()
717+
.comment(format("Approved by %s", testStem))
718+
.build());
719+
720+
assertEquals(FULLY_APPROVED.name(), fullyApprovedActionResponse.workflowState().state());
721+
}
722+
723+
private void mkRunnable(Long flowId,
724+
String user,
725+
List<ProposedFlowResponse> responses,
726+
List<ProposedFlowResponse> failures,
727+
CountDownLatch start,
728+
CountDownLatch end) {
729+
try {
730+
start.await(5, TimeUnit.SECONDS);
731+
ProposedFlowResponse proposedFlowActionResponse = proposedFlowWorkflowService.proposedFlowAction(flowId,
732+
APPROVE,
733+
user,
734+
ImmutableProposedFlowActionCommand
735+
.builder()
736+
.comment(format("Approved by %s", user))
737+
.build());
738+
739+
if(proposedFlowActionResponse.outcome() == CommandOutcome.SUCCESS) {
740+
synchronized (responses) {
741+
responses.add(proposedFlowActionResponse);
742+
}
743+
} else {
744+
synchronized (failures) {
745+
failures.add(proposedFlowActionResponse);
746+
}
747+
}
748+
} catch (InterruptedException e) {
749+
LOG.error("Thread interrupted", e);
750+
} finally {
751+
end.countDown();
752+
}
753+
}
754+
755+
@Test
756+
public void testProposedFlowRecordHas_logicalFlowId_physicalFlowId_specificationId() {
757+
758+
// 1. Arrange ----------------------------------------------------------
759+
Reason reason = proposedFlowWorkflowHelper.getReason();
760+
EntityReference owningEntity = proposedFlowWorkflowHelper.getOwningEntity();
761+
PhysicalSpecification physicalSpecification = proposedFlowWorkflowHelper.getPhysicalSpecification(owningEntity);
762+
FlowAttributes flowAttributes = proposedFlowWorkflowHelper.getFlowAttributes();
763+
Set<Long> dataTypeIdSet = proposedFlowWorkflowHelper.getDataTypeIdSet();
764+
765+
ProposedFlowCommand command = ImmutableProposedFlowCommand.builder()
766+
.source(mkRef(APPLICATION, 101))
767+
.target(mkRef(APPLICATION, 202))
768+
.logicalFlowId(12345)
769+
.physicalFlowId(12345)
770+
.reason(reason)
771+
.specification(physicalSpecification)
772+
.flowAttributes(flowAttributes)
773+
.dataTypeIds(dataTypeIdSet)
774+
.proposalType(ProposalType.valueOf("CREATE"))
775+
.build();
776+
777+
ProposedFlowCommandResponse response = proposedFlowWorkflowService.proposeNewFlow(USER_NAME, command);
778+
779+
// 2. Act --------------------------------------------------------------
780+
ProposedFlowResponse proposedFlowResponse = proposedFlowWorkflowService.getProposedFlowResponseById(response.proposedFlowId());
781+
782+
// 3. Assert -----------------------------------------------------------
783+
assertNotNull(response);
784+
assertNotNull(proposedFlowResponse);
785+
assertNotNull(proposedFlowResponse.logicalFlowId());
786+
assertNotNull(proposedFlowResponse.physicalFlowId());
787+
assertNotNull(proposedFlowResponse.specificationId());
788+
}
655789
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.finos.waltz.model.attestation;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
5+
import org.finos.waltz.model.command.CommandOutcome;
6+
import org.immutables.value.Value;
7+
8+
import java.util.Optional;
9+
10+
@Value.Immutable
11+
@JsonSerialize(as = ImmutableAttestationPreCheckCommandResponse.class)
12+
@JsonDeserialize(as = ImmutableAttestationPreCheckCommandResponse.class)
13+
public abstract class AttestationPreCheckCommandResponse {
14+
public abstract CommandOutcome outcome();
15+
public abstract Optional<String> message();
16+
}

waltz-model/src/main/java/org/finos/waltz/model/entity_workflow/EntityWorkflowState.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,7 @@ public abstract class EntityWorkflowState implements
3737
public abstract long workflowId();
3838
public abstract EntityReference entityReference();
3939
public abstract String state();
40+
41+
@Value.Default
42+
public Long version() {return 0L;};
4043
}

waltz-ng/client/attestation/components/confirmation/attestation-confirmation.html

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@
4848
<waltz-markdown text="$ctrl.message"></waltz-markdown>
4949
</div>
5050

51+
<div style="margin-top: 0.5em"
52+
ng-if="!$ctrl.disabled && $ctrl.message">
53+
<waltz-markdown text="$ctrl.message"></waltz-markdown>
54+
<div style="margin-top: 0.5em ; margin-bottom: 0.5em">
55+
<a ui-sref="main.data-flow.dashboard"
56+
class="clickable">
57+
Go to Data Flow Dashboard
58+
</a>
59+
</div>
60+
</div>
61+
5162
<div
5263
style="display: grid; grid-template-columns: 50px 100px; align-items: center;"
5364
ng-if="$ctrl.attestationKind === 'MEASURABLE_CATEGORY'

waltz-ng/client/attestation/components/confirmation/attestation-confirmation.js

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {initialiseData, invokeFunction} from "../../../common/index";
2121
import template from "./attestation-confirmation.html";
2222
import {CORE_API} from "../../../common/services/core-api-utils";
2323
import {isDataFlowProposalsEnabled} from "../../../common/utils/settings-util";
24-
import {DATAFLOW_PROPOSAL_SETTING_NAME} from "../../../common/constants";
24+
import {PROPOSAL_OUTCOMES} from "../../../common/constants";
2525

2626

2727
const bindings = {
@@ -58,6 +58,16 @@ function controller($q, serviceBroker, settingsService) {
5858
vm.disabled = false;
5959
}
6060

61+
function disableSubmissionWithProposedFlow(response) {
62+
vm.message = response.message;
63+
vm.disabled = true;
64+
}
65+
66+
function enableSubmissionWithProposedFlow(response) {
67+
vm.message = response.message;
68+
vm.disabled = false;
69+
}
70+
6171
function validateLogicalFlows() {
6272
serviceBroker
6373
.loadViewData(CORE_API.AttestationPreCheckStore.logicalFlowCheck, [vm.parentEntityRef])
@@ -71,9 +81,9 @@ function controller($q, serviceBroker, settingsService) {
7181
serviceBroker
7282
.loadViewData(CORE_API.AttestationPreCheckStore.logicalFlowWithProposedFlowCheck, [vm.parentEntityRef])
7383
.then(r => r.data)
74-
.then(failures => _.isEmpty(failures)
75-
? enableSubmission()
76-
: disableSubmission(failures))
84+
.then(response => response.outcome === PROPOSAL_OUTCOMES.SUCCESS
85+
? enableSubmissionWithProposedFlow(response)
86+
: disableSubmissionWithProposedFlow(response))
7787
}
7888

7989
function validateViewpoints() {
@@ -89,10 +99,9 @@ function controller($q, serviceBroker, settingsService) {
8999
vm.$onInit = () => {
90100
switch (vm.attestationKind) {
91101
case "LOGICAL_DATA_FLOW":
92-
settingsService
93-
.findOrDefault(DATAFLOW_PROPOSAL_SETTING_NAME,'false')
94-
.then(dataFlowProposalSettingValue => {
95-
vm.dataFlowProposalsEnabled = dataFlowProposalSettingValue === 'true';
102+
isDataFlowProposalsEnabled(settingsService)
103+
.then(value => {
104+
vm.dataFlowProposalsEnabled = value;
96105
if (vm.dataFlowProposalsEnabled) {
97106
validateLogicalFlowsWithProposedFlow();
98107
} else {

0 commit comments

Comments
 (0)