Skip to content

Commit

Permalink
Pull request #330: CTCTOWALTZ-2680 si action queue job 6860
Browse files Browse the repository at this point in the history
Merge in WALTZ/waltz from WALTZ/waltz-jws:CTCTOWALTZ-2680-si-action-queue-job-6860 to db-feature/waltz-6860-si-action-queu-job

* commit 'e4e55f2adeb4f83c2a947c7140d50e151844be7a':
  as you
  Adding change logs to SurveyActionQueueService
  Adding change logs to SurveyActionQueueService
  Added more survey instance params to the action queue params
  Integration tests for survey instance action queue
  Integration tests for survey instance action queue
  Add junit bom dependency to keep junit dependency versions aligned (jupiter tests failing to start)
  Add job to scheduledJobService to run and action any pending entries
  Add job to scheduledJobService to run and action any pending entries
  • Loading branch information
jessica-woodland-scott-db authored and db-waltz committed Nov 24, 2023
2 parents 4156921 + e4e55f2 commit a18315e
Show file tree
Hide file tree
Showing 25 changed files with 1,052 additions and 103 deletions.
34 changes: 8 additions & 26 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,24 +310,6 @@


<!-- test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<version>1.7.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
Expand All @@ -342,14 +324,6 @@
<scope>test</scope>
</dependency>

<!--intellij required as workaround https://youtrack.jetbrains.com/issue/IDEA-231927 -->
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.tngtech.archunit</groupId>
<artifactId>archunit</artifactId>
Expand All @@ -363,6 +337,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>${junit.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,11 @@ public List<Tally<String>> getContributionScoresForUsers(List<String> userIds) {
}


public int write(ChangeLog changeLog) {
public int write(Optional<DSLContext> tx, ChangeLog changeLog) {
checkNotNull(changeLog, "changeLog must not be null");
DSLContext dslContext = tx.orElse(dsl);

return dsl
return dslContext
.insertInto(CHANGE_LOG)
.set(CHANGE_LOG.MESSAGE, changeLog.message())
.set(CHANGE_LOG.PARENT_ID, changeLog.parentReference().id())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package org.finos.waltz.data.survey;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.finos.waltz.common.DateTimeUtilities;
import org.finos.waltz.model.survey.ImmutableSurveyInstanceActionQueueItem;
import org.finos.waltz.model.survey.SurveyInstanceAction;
import org.finos.waltz.model.survey.SurveyInstanceActionParams;
import org.finos.waltz.model.survey.SurveyInstanceActionQueueItem;
import org.finos.waltz.model.survey.SurveyInstanceActionStatus;
import org.finos.waltz.model.survey.SurveyInstanceStatus;
import org.finos.waltz.schema.tables.records.SurveyInstanceActionQueueRecord;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.RecordMapper;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;

import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static org.finos.waltz.common.JacksonUtilities.getJsonMapper;
import static org.finos.waltz.schema.Tables.SURVEY_INSTANCE_ACTION_QUEUE;

@Repository
public class SurveyInstanceActionQueueDao {

private final DSLContext dsl;

public static final RecordMapper<Record, SurveyInstanceActionQueueItem> TO_DOMAIN_MAPPER = r -> {

SurveyInstanceActionQueueRecord record = r.into(SURVEY_INSTANCE_ACTION_QUEUE);

Optional<SurveyInstanceActionParams> surveyInstanceActionParams = readParams(getJsonMapper(), record.getActionParams());

return ImmutableSurveyInstanceActionQueueItem.builder()
.id(record.getId())
.action(SurveyInstanceAction.valueOf(record.getAction()))
.surveyInstanceId(record.getSurveyInstanceId())
.actionParams(surveyInstanceActionParams)
.initialState(SurveyInstanceStatus.valueOf(record.getInitialState()))
.submittedAt(DateTimeUtilities.toLocalDateTime(record.getSubmittedAt()))
.submittedBy(record.getSubmittedBy())
.actionedAt(ofNullable(record.getActionedAt()).map(Timestamp::toLocalDateTime).orElse(null))
.status(SurveyInstanceActionStatus.valueOf(record.getStatus()))
.message(record.getMessage())
.provenance(record.getProvenance())
.build();
};


private static Optional<SurveyInstanceActionParams> readParams(ObjectMapper jsonMapper, String actionParams) {
if(actionParams == null) {
return Optional.empty();
} else {
try {
return Optional.ofNullable(jsonMapper.readValue(actionParams, SurveyInstanceActionParams.class));
} catch (JsonProcessingException e) {
return Optional.empty();
}
}
}


@Autowired
public SurveyInstanceActionQueueDao(DSLContext dsl) {
this.dsl = dsl;
}


public List<SurveyInstanceActionQueueItem> findPendingActions() {
Condition isPending = SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.PENDING.name());
return mkSelectByCondition(dsl, isPending)
.orderBy(SURVEY_INSTANCE_ACTION_QUEUE.SUBMITTED_AT)
.fetch(TO_DOMAIN_MAPPER);
}


public SurveyInstanceActionQueueItem getById(long id) {
Condition idCondition = SURVEY_INSTANCE_ACTION_QUEUE.ID.eq(id);
return mkSelectByCondition(dsl, idCondition)
.fetchOne(TO_DOMAIN_MAPPER);
}


private SelectConditionStep<Record> mkSelectByCondition(DSLContext dslContext, Condition condition) {
return dslContext
.select(SURVEY_INSTANCE_ACTION_QUEUE.fields())
.from(SURVEY_INSTANCE_ACTION_QUEUE)
.where(condition);
}


public void updateActionStatus(DSLContext tx, Long actionId, SurveyInstanceActionStatus instanceActionStatus, String msg) {
int updated = tx
.update(SURVEY_INSTANCE_ACTION_QUEUE)
.set(SURVEY_INSTANCE_ACTION_QUEUE.ACTIONED_AT, DateTimeUtilities.nowUtcTimestamp())
.set(SURVEY_INSTANCE_ACTION_QUEUE.STATUS, instanceActionStatus.name())
.set(SURVEY_INSTANCE_ACTION_QUEUE.MESSAGE, msg)
.where(SURVEY_INSTANCE_ACTION_QUEUE.ID.eq(actionId)
.and(SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.IN_PROGRESS.name())))
.execute();

if (updated != 1) {
String messageString = "Unable to update action queue item with id: %d as %d records were updated. " +
"Reverting all action changes, this action will be attempted again in future as will be rolled back to 'PENDING'";

throw new IllegalStateException(format(
messageString,
actionId,
updated));
}
}


public void markActionInProgress(DSLContext tx, Long actionId) {

SelectConditionStep<Record1<Long>> inProgressAction = DSL
.select(SURVEY_INSTANCE_ACTION_QUEUE.ID)
.from(SURVEY_INSTANCE_ACTION_QUEUE)
.where(SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.IN_PROGRESS.name()));

int updated = tx
.update(SURVEY_INSTANCE_ACTION_QUEUE)
.set(SURVEY_INSTANCE_ACTION_QUEUE.STATUS, SurveyInstanceActionStatus.IN_PROGRESS.name())
.where(SURVEY_INSTANCE_ACTION_QUEUE.ID.eq(actionId)
.and(SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.PENDING.name()))
.and(DSL.notExists(inProgressAction)))
.execute();

if (updated != 1) {

String messageString = "Unable to mark action %d as 'IN_PROGRESS', either the action id was not found, the action is no longer pending or there is another action currently marked 'IN_PROGRESS'";

throw new IllegalStateException(format(
messageString,
actionId,
updated));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,12 @@ public long create(SurveyInstanceCreateCommand command) {
}


public long createPreviousVersion(SurveyInstance currentInstance) {
public long createPreviousVersion(Optional<DSLContext> tx, SurveyInstance currentInstance) {
checkNotNull(currentInstance, "currentInstance cannot be null");

SurveyInstanceRecord record = dsl.newRecord(si);
DSLContext dslContext = tx.orElse(dsl);

SurveyInstanceRecord record = dslContext.newRecord(si);
record.setSurveyRunId(currentInstance.surveyRunId());
record.setEntityKind(currentInstance.surveyEntity().kind().name());
record.setEntityId(currentInstance.surveyEntity().id());
Expand Down Expand Up @@ -313,10 +315,10 @@ public int deleteForSurveyRun(long surveyRunId) {
}


public int updateStatus(long instanceId, SurveyInstanceStatus newStatus) {
public int updateStatus(Optional<DSLContext> tx, long instanceId, SurveyInstanceStatus newStatus) {
checkNotNull(newStatus, "newStatus cannot be null");

return dsl
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.STATUS, newStatus.name())
.where(si.STATUS.notEqual(newStatus.name())
Expand Down Expand Up @@ -367,11 +369,10 @@ public int updateOwningRoleForSurveyRun(long surveyRunId, String role) {
.execute();
}


public int markSubmitted(long instanceId, String userName) {
public int markSubmitted(Optional<DSLContext> tx, long instanceId, String userName) {
checkNotNull(userName, "userName cannot be null");

return dsl
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.STATUS, SurveyInstanceStatus.COMPLETED.name())
.set(si.SUBMITTED_AT, Timestamp.valueOf(nowUtc()))
Expand All @@ -385,10 +386,10 @@ public int markSubmitted(long instanceId, String userName) {
}


public int markApproved(long instanceId, String userName) {
public int markApproved(Optional<DSLContext> tx, long instanceId, String userName) {
checkNotNull(userName, "userName cannot be null");

return dsl
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.APPROVED_AT, Timestamp.valueOf(nowUtc()))
.set(si.APPROVED_BY, userName)
Expand All @@ -399,16 +400,23 @@ public int markApproved(long instanceId, String userName) {
.execute();
}

public int reopenSurvey(Optional<DSLContext> tx,
long instanceId,
LocalDate dueDate,
LocalDate approvalDueDate) {

public int reopenSurvey(long instanceId) {
return dsl
DSLContext dslContext = tx.orElse(dsl);

return dslContext
.update(si)
.set(si.STATUS, SurveyInstanceStatus.IN_PROGRESS.name())
.set(si.APPROVED_AT, (Timestamp) null)
.set(si.APPROVED_BY, (String) null)
.set(si.SUBMITTED_AT, (Timestamp) null)
.set(si.SUBMITTED_BY, (String) null)
.set(si.ISSUED_ON, toSqlDate(nowUtcTimestamp())) //update the issued on to the current date
.set(si.DUE_DATE, toSqlDate(dueDate))
.set(si.APPROVAL_DUE_DATE, toSqlDate(approvalDueDate))
.where(si.ID.eq(instanceId)
.and(si.ORIGINAL_INSTANCE_ID.isNull())
.and(si.STATUS.in(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,11 @@ public List<SurveyInstanceQuestionResponse> findForInstance(long surveyInstanceI
}


public int deletePreviousResponse(List<SurveyInstanceQuestionResponse> previousResponses) {
public int deletePreviousResponse(Optional<DSLContext> tx, List<SurveyInstanceQuestionResponse> previousResponses) {
checkNotNull(previousResponses, "responses cannot be null");

DSLContext dslContext = tx.orElse(dsl);

if (!previousResponses.isEmpty()) {
Set<Long> instanceIds = map(
previousResponses,
Expand All @@ -198,13 +201,13 @@ public int deletePreviousResponse(List<SurveyInstanceQuestionResponse> previousR
previousResponses,
qr -> qr.questionResponse().questionId());

int rmSingleCount = dsl
int rmSingleCount = dslContext
.deleteFrom(Tables.SURVEY_QUESTION_RESPONSE)
.where(SURVEY_QUESTION_RESPONSE.SURVEY_INSTANCE_ID.eq(instanceId))
.and(SURVEY_QUESTION_RESPONSE.QUESTION_ID.in(previousResponseIds))
.execute();

int rmListCount = dsl
int rmListCount = dslContext
.deleteFrom(SURVEY_QUESTION_LIST_RESPONSE)
.where(SURVEY_QUESTION_LIST_RESPONSE.SURVEY_INSTANCE_ID.eq(instanceId))
.and(SURVEY_QUESTION_LIST_RESPONSE.QUESTION_ID.in(previousResponseIds))
Expand Down Expand Up @@ -328,8 +331,11 @@ private void saveEntityListResponse(DSLContext txDsl,
}


public void cloneResponses(long sourceSurveyInstanceId, long targetSurveyInstanceId) {
List<SurveyQuestionResponseRecord> responseRecords = dsl
public void cloneResponses(Optional<DSLContext> tx, long sourceSurveyInstanceId, long targetSurveyInstanceId) {

DSLContext dslContext = tx.orElse(dsl);

List<SurveyQuestionResponseRecord> responseRecords = dslContext
.select(SURVEY_QUESTION_RESPONSE.fields())
.select(entityNameField)
.from(SURVEY_QUESTION_RESPONSE)
Expand All @@ -343,7 +349,7 @@ public void cloneResponses(long sourceSurveyInstanceId, long targetSurveyInstanc
})
.collect(toList());

List<SurveyQuestionListResponseRecord> listResponseRecords = dsl
List<SurveyQuestionListResponseRecord> listResponseRecords = dslContext
.select(SURVEY_QUESTION_LIST_RESPONSE.fields())
.from(SURVEY_QUESTION_LIST_RESPONSE)
.where(SURVEY_QUESTION_LIST_RESPONSE.SURVEY_INSTANCE_ID.eq(sourceSurveyInstanceId))
Expand All @@ -356,7 +362,7 @@ public void cloneResponses(long sourceSurveyInstanceId, long targetSurveyInstanc
})
.collect(toList());

dsl.transaction(configuration -> {
dslContext.transaction(configuration -> {
DSLContext txDsl = DSL.using(configuration);

txDsl.batchInsert(responseRecords)
Expand Down
Loading

0 comments on commit a18315e

Please sign in to comment.