Skip to content

Commit

Permalink
Merge pull request #6878 from deutschebank/db-contrib/waltz-6860-si-a…
Browse files Browse the repository at this point in the history
…ction-queu-job

Db contrib/waltz 6860 si action queu job
  • Loading branch information
davidwatkins73 authored Nov 24, 2023
2 parents 08a07b6 + a18315e commit 209e6b0
Show file tree
Hide file tree
Showing 25 changed files with 1,052 additions and 103 deletions.
34 changes: 8 additions & 26 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,24 +310,6 @@


<!-- test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<version>1.7.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
Expand All @@ -342,14 +324,6 @@
<scope>test</scope>
</dependency>

<!--intellij required as workaround https://youtrack.jetbrains.com/issue/IDEA-231927 -->
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.tngtech.archunit</groupId>
<artifactId>archunit</artifactId>
Expand All @@ -363,6 +337,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>${junit.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,11 @@ public List<Tally<String>> getContributionScoresForUsers(List<String> userIds) {
}


public int write(ChangeLog changeLog) {
public int write(Optional<DSLContext> tx, ChangeLog changeLog) {
checkNotNull(changeLog, "changeLog must not be null");
DSLContext dslContext = tx.orElse(dsl);

return dsl
return dslContext
.insertInto(CHANGE_LOG)
.set(CHANGE_LOG.MESSAGE, changeLog.message())
.set(CHANGE_LOG.PARENT_ID, changeLog.parentReference().id())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package org.finos.waltz.data.survey;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.finos.waltz.common.DateTimeUtilities;
import org.finos.waltz.model.survey.ImmutableSurveyInstanceActionQueueItem;
import org.finos.waltz.model.survey.SurveyInstanceAction;
import org.finos.waltz.model.survey.SurveyInstanceActionParams;
import org.finos.waltz.model.survey.SurveyInstanceActionQueueItem;
import org.finos.waltz.model.survey.SurveyInstanceActionStatus;
import org.finos.waltz.model.survey.SurveyInstanceStatus;
import org.finos.waltz.schema.tables.records.SurveyInstanceActionQueueRecord;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.RecordMapper;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;

import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static org.finos.waltz.common.JacksonUtilities.getJsonMapper;
import static org.finos.waltz.schema.Tables.SURVEY_INSTANCE_ACTION_QUEUE;

@Repository
public class SurveyInstanceActionQueueDao {

private final DSLContext dsl;

public static final RecordMapper<Record, SurveyInstanceActionQueueItem> TO_DOMAIN_MAPPER = r -> {

SurveyInstanceActionQueueRecord record = r.into(SURVEY_INSTANCE_ACTION_QUEUE);

Optional<SurveyInstanceActionParams> surveyInstanceActionParams = readParams(getJsonMapper(), record.getActionParams());

return ImmutableSurveyInstanceActionQueueItem.builder()
.id(record.getId())
.action(SurveyInstanceAction.valueOf(record.getAction()))
.surveyInstanceId(record.getSurveyInstanceId())
.actionParams(surveyInstanceActionParams)
.initialState(SurveyInstanceStatus.valueOf(record.getInitialState()))
.submittedAt(DateTimeUtilities.toLocalDateTime(record.getSubmittedAt()))
.submittedBy(record.getSubmittedBy())
.actionedAt(ofNullable(record.getActionedAt()).map(Timestamp::toLocalDateTime).orElse(null))
.status(SurveyInstanceActionStatus.valueOf(record.getStatus()))
.message(record.getMessage())
.provenance(record.getProvenance())
.build();
};


private static Optional<SurveyInstanceActionParams> readParams(ObjectMapper jsonMapper, String actionParams) {
if(actionParams == null) {
return Optional.empty();
} else {
try {
return Optional.ofNullable(jsonMapper.readValue(actionParams, SurveyInstanceActionParams.class));
} catch (JsonProcessingException e) {
return Optional.empty();
}
}
}


@Autowired
public SurveyInstanceActionQueueDao(DSLContext dsl) {
this.dsl = dsl;
}


public List<SurveyInstanceActionQueueItem> findPendingActions() {
Condition isPending = SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.PENDING.name());
return mkSelectByCondition(dsl, isPending)
.orderBy(SURVEY_INSTANCE_ACTION_QUEUE.SUBMITTED_AT)
.fetch(TO_DOMAIN_MAPPER);
}


public SurveyInstanceActionQueueItem getById(long id) {
Condition idCondition = SURVEY_INSTANCE_ACTION_QUEUE.ID.eq(id);
return mkSelectByCondition(dsl, idCondition)
.fetchOne(TO_DOMAIN_MAPPER);
}


private SelectConditionStep<Record> mkSelectByCondition(DSLContext dslContext, Condition condition) {
return dslContext
.select(SURVEY_INSTANCE_ACTION_QUEUE.fields())
.from(SURVEY_INSTANCE_ACTION_QUEUE)
.where(condition);
}


public void updateActionStatus(DSLContext tx, Long actionId, SurveyInstanceActionStatus instanceActionStatus, String msg) {
int updated = tx
.update(SURVEY_INSTANCE_ACTION_QUEUE)
.set(SURVEY_INSTANCE_ACTION_QUEUE.ACTIONED_AT, DateTimeUtilities.nowUtcTimestamp())
.set(SURVEY_INSTANCE_ACTION_QUEUE.STATUS, instanceActionStatus.name())
.set(SURVEY_INSTANCE_ACTION_QUEUE.MESSAGE, msg)
.where(SURVEY_INSTANCE_ACTION_QUEUE.ID.eq(actionId)
.and(SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.IN_PROGRESS.name())))
.execute();

if (updated != 1) {
String messageString = "Unable to update action queue item with id: %d as %d records were updated. " +
"Reverting all action changes, this action will be attempted again in future as will be rolled back to 'PENDING'";

throw new IllegalStateException(format(
messageString,
actionId,
updated));
}
}


public void markActionInProgress(DSLContext tx, Long actionId) {

SelectConditionStep<Record1<Long>> inProgressAction = DSL
.select(SURVEY_INSTANCE_ACTION_QUEUE.ID)
.from(SURVEY_INSTANCE_ACTION_QUEUE)
.where(SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.IN_PROGRESS.name()));

int updated = tx
.update(SURVEY_INSTANCE_ACTION_QUEUE)
.set(SURVEY_INSTANCE_ACTION_QUEUE.STATUS, SurveyInstanceActionStatus.IN_PROGRESS.name())
.where(SURVEY_INSTANCE_ACTION_QUEUE.ID.eq(actionId)
.and(SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.PENDING.name()))
.and(DSL.notExists(inProgressAction)))
.execute();

if (updated != 1) {

String messageString = "Unable to mark action %d as 'IN_PROGRESS', either the action id was not found, the action is no longer pending or there is another action currently marked 'IN_PROGRESS'";

throw new IllegalStateException(format(
messageString,
actionId,
updated));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,12 @@ public long create(SurveyInstanceCreateCommand command) {
}


public long createPreviousVersion(SurveyInstance currentInstance) {
public long createPreviousVersion(Optional<DSLContext> tx, SurveyInstance currentInstance) {
checkNotNull(currentInstance, "currentInstance cannot be null");

SurveyInstanceRecord record = dsl.newRecord(si);
DSLContext dslContext = tx.orElse(dsl);

SurveyInstanceRecord record = dslContext.newRecord(si);
record.setSurveyRunId(currentInstance.surveyRunId());
record.setEntityKind(currentInstance.surveyEntity().kind().name());
record.setEntityId(currentInstance.surveyEntity().id());
Expand Down Expand Up @@ -313,10 +315,10 @@ public int deleteForSurveyRun(long surveyRunId) {
}


public int updateStatus(long instanceId, SurveyInstanceStatus newStatus) {
public int updateStatus(Optional<DSLContext> tx, long instanceId, SurveyInstanceStatus newStatus) {
checkNotNull(newStatus, "newStatus cannot be null");

return dsl
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.STATUS, newStatus.name())
.where(si.STATUS.notEqual(newStatus.name())
Expand Down Expand Up @@ -367,11 +369,10 @@ public int updateOwningRoleForSurveyRun(long surveyRunId, String role) {
.execute();
}


public int markSubmitted(long instanceId, String userName) {
public int markSubmitted(Optional<DSLContext> tx, long instanceId, String userName) {
checkNotNull(userName, "userName cannot be null");

return dsl
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.STATUS, SurveyInstanceStatus.COMPLETED.name())
.set(si.SUBMITTED_AT, Timestamp.valueOf(nowUtc()))
Expand All @@ -385,10 +386,10 @@ public int markSubmitted(long instanceId, String userName) {
}


public int markApproved(long instanceId, String userName) {
public int markApproved(Optional<DSLContext> tx, long instanceId, String userName) {
checkNotNull(userName, "userName cannot be null");

return dsl
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.APPROVED_AT, Timestamp.valueOf(nowUtc()))
.set(si.APPROVED_BY, userName)
Expand All @@ -399,16 +400,23 @@ public int markApproved(long instanceId, String userName) {
.execute();
}

public int reopenSurvey(Optional<DSLContext> tx,
long instanceId,
LocalDate dueDate,
LocalDate approvalDueDate) {

public int reopenSurvey(long instanceId) {
return dsl
DSLContext dslContext = tx.orElse(dsl);

return dslContext
.update(si)
.set(si.STATUS, SurveyInstanceStatus.IN_PROGRESS.name())
.set(si.APPROVED_AT, (Timestamp) null)
.set(si.APPROVED_BY, (String) null)
.set(si.SUBMITTED_AT, (Timestamp) null)
.set(si.SUBMITTED_BY, (String) null)
.set(si.ISSUED_ON, toSqlDate(nowUtcTimestamp())) //update the issued on to the current date
.set(si.DUE_DATE, toSqlDate(dueDate))
.set(si.APPROVAL_DUE_DATE, toSqlDate(approvalDueDate))
.where(si.ID.eq(instanceId)
.and(si.ORIGINAL_INSTANCE_ID.isNull())
.and(si.STATUS.in(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,11 @@ public List<SurveyInstanceQuestionResponse> findForInstance(long surveyInstanceI
}


public int deletePreviousResponse(List<SurveyInstanceQuestionResponse> previousResponses) {
public int deletePreviousResponse(Optional<DSLContext> tx, List<SurveyInstanceQuestionResponse> previousResponses) {
checkNotNull(previousResponses, "responses cannot be null");

DSLContext dslContext = tx.orElse(dsl);

if (!previousResponses.isEmpty()) {
Set<Long> instanceIds = map(
previousResponses,
Expand All @@ -198,13 +201,13 @@ public int deletePreviousResponse(List<SurveyInstanceQuestionResponse> previousR
previousResponses,
qr -> qr.questionResponse().questionId());

int rmSingleCount = dsl
int rmSingleCount = dslContext
.deleteFrom(Tables.SURVEY_QUESTION_RESPONSE)
.where(SURVEY_QUESTION_RESPONSE.SURVEY_INSTANCE_ID.eq(instanceId))
.and(SURVEY_QUESTION_RESPONSE.QUESTION_ID.in(previousResponseIds))
.execute();

int rmListCount = dsl
int rmListCount = dslContext
.deleteFrom(SURVEY_QUESTION_LIST_RESPONSE)
.where(SURVEY_QUESTION_LIST_RESPONSE.SURVEY_INSTANCE_ID.eq(instanceId))
.and(SURVEY_QUESTION_LIST_RESPONSE.QUESTION_ID.in(previousResponseIds))
Expand Down Expand Up @@ -328,8 +331,11 @@ private void saveEntityListResponse(DSLContext txDsl,
}


public void cloneResponses(long sourceSurveyInstanceId, long targetSurveyInstanceId) {
List<SurveyQuestionResponseRecord> responseRecords = dsl
public void cloneResponses(Optional<DSLContext> tx, long sourceSurveyInstanceId, long targetSurveyInstanceId) {

DSLContext dslContext = tx.orElse(dsl);

List<SurveyQuestionResponseRecord> responseRecords = dslContext
.select(SURVEY_QUESTION_RESPONSE.fields())
.select(entityNameField)
.from(SURVEY_QUESTION_RESPONSE)
Expand All @@ -343,7 +349,7 @@ public void cloneResponses(long sourceSurveyInstanceId, long targetSurveyInstanc
})
.collect(toList());

List<SurveyQuestionListResponseRecord> listResponseRecords = dsl
List<SurveyQuestionListResponseRecord> listResponseRecords = dslContext
.select(SURVEY_QUESTION_LIST_RESPONSE.fields())
.from(SURVEY_QUESTION_LIST_RESPONSE)
.where(SURVEY_QUESTION_LIST_RESPONSE.SURVEY_INSTANCE_ID.eq(sourceSurveyInstanceId))
Expand All @@ -356,7 +362,7 @@ public void cloneResponses(long sourceSurveyInstanceId, long targetSurveyInstanc
})
.collect(toList());

dsl.transaction(configuration -> {
dslContext.transaction(configuration -> {
DSLContext txDsl = DSL.using(configuration);

txDsl.batchInsert(responseRecords)
Expand Down
Loading

0 comments on commit 209e6b0

Please sign in to comment.