Skip to content

Commit

Permalink
Add job to scheduledJobService to run and action any pending entries
Browse files Browse the repository at this point in the history
#CTCTOWALTZ-2680
finos#6860
  • Loading branch information
jessica-woodland-scott-db committed Nov 20, 2023
1 parent 53f0991 commit 0c261d6
Show file tree
Hide file tree
Showing 14 changed files with 384 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,11 @@ public List<Tally<String>> getContributionScoresForUsers(List<String> userIds) {
}


public int write(ChangeLog changeLog) {
public int write(Optional<DSLContext> tx, ChangeLog changeLog) {
checkNotNull(changeLog, "changeLog must not be null");
DSLContext dslContext = tx.orElse(dsl);

return dsl
return dslContext
.insertInto(CHANGE_LOG)
.set(CHANGE_LOG.MESSAGE, changeLog.message())
.set(CHANGE_LOG.PARENT_ID, changeLog.parentReference().id())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.finos.waltz.data.survey;

import org.finos.waltz.common.DateTimeUtilities;
import org.finos.waltz.model.survey.ImmutableSurveyInstanceActionQueueItem;
import org.finos.waltz.model.survey.SurveyInstanceAction;
import org.finos.waltz.model.survey.SurveyInstanceActionQueueItem;
import org.finos.waltz.model.survey.SurveyInstanceActionStatus;
import org.finos.waltz.model.survey.SurveyInstanceStatus;
import org.finos.waltz.schema.tables.records.SurveyInstanceActionQueueRecord;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.RecordMapper;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import java.sql.Timestamp;
import java.util.List;

import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static org.finos.waltz.schema.Tables.SURVEY_INSTANCE_ACTION_QUEUE;

@Repository
public class SurveyInstanceActionQueueDao {

private final DSLContext dsl;

public static final RecordMapper<Record, SurveyInstanceActionQueueItem> TO_DOMAIN_MAPPER = r -> {
SurveyInstanceActionQueueRecord record = r.into(SURVEY_INSTANCE_ACTION_QUEUE);
return ImmutableSurveyInstanceActionQueueItem.builder()
.id(record.getId())
.action(SurveyInstanceAction.valueOf(record.getAction()))
.surveyInstanceId(record.getSurveyInstanceId())
.actionParams(record.getActionParams())
.initialState(SurveyInstanceStatus.valueOf(record.getInitialState()))
.submittedAt(DateTimeUtilities.toLocalDateTime(record.getSubmittedAt()))
.submittedBy(record.getSubmittedBy())
.actionedAt(ofNullable(record.getActionedAt()).map(Timestamp::toLocalDateTime).orElse(null))
.status(SurveyInstanceActionStatus.valueOf(record.getStatus()))
.message(record.getMessage())
.provenance(record.getProvenance())
.build();
};


@Autowired
SurveyInstanceActionQueueDao(DSLContext dsl) {
this.dsl = dsl;
}


public List<SurveyInstanceActionQueueItem> findPendingActions() {
Condition isPending = SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.PENDING.name());
return mkSelectByCondition(dsl, isPending)
.orderBy(SURVEY_INSTANCE_ACTION_QUEUE.SUBMITTED_AT)
.fetch(TO_DOMAIN_MAPPER);
}


public SurveyInstanceActionQueueItem getById(Long id) {
Condition idCondition = SURVEY_INSTANCE_ACTION_QUEUE.ID.eq(id);
return mkSelectByCondition(dsl, idCondition)
.fetchOne(TO_DOMAIN_MAPPER);
}


private SelectConditionStep<Record> mkSelectByCondition(DSLContext dslContext, Condition condition) {
return dslContext
.select(SURVEY_INSTANCE_ACTION_QUEUE.fields())
.from(SURVEY_INSTANCE_ACTION_QUEUE)
.where(condition);
}


public void updateActionStatus(DSLContext tx, Long actionId, SurveyInstanceActionStatus instanceActionStatus, String msg) {
int updated = tx
.update(SURVEY_INSTANCE_ACTION_QUEUE)
.set(SURVEY_INSTANCE_ACTION_QUEUE.ACTIONED_AT, DateTimeUtilities.nowUtcTimestamp())
.set(SURVEY_INSTANCE_ACTION_QUEUE.STATUS, instanceActionStatus.name())
.set(SURVEY_INSTANCE_ACTION_QUEUE.MESSAGE, msg)
.where(SURVEY_INSTANCE_ACTION_QUEUE.ID.eq(actionId)
.and(SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.IN_PROGRESS.name())))
.execute();

if (updated != 1) {
String messageString = "Unable to update action queue item with id: %d as %d records were updated. " +
"Reverting all action changes, this action will be attempted again in future as will be rolled back to 'PENDING'";

throw new IllegalStateException(format(
messageString,
actionId,
updated));
}
}


public void markActionInProgress(DSLContext tx, Long actionId) {

SelectConditionStep<Record1<Long>> inProgressAction = DSL
.select(SURVEY_INSTANCE_ACTION_QUEUE.ID)
.from(SURVEY_INSTANCE_ACTION_QUEUE)
.where(SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.IN_PROGRESS.name()));

int updated = tx
.update(SURVEY_INSTANCE_ACTION_QUEUE)
.set(SURVEY_INSTANCE_ACTION_QUEUE.STATUS, SurveyInstanceActionStatus.IN_PROGRESS.name())
.where(SURVEY_INSTANCE_ACTION_QUEUE.ID.eq(actionId)
.and(SURVEY_INSTANCE_ACTION_QUEUE.STATUS.eq(SurveyInstanceActionStatus.PENDING.name()))
.and(DSL.notExists(inProgressAction)))
.execute();

if (updated != 1) {

String messageString = "Unable to mark action %d as 'IN_PROGRESS', either the action id was not found, the action is no longer pending or there is another action currently marked 'IN_PROGRESS'";

throw new IllegalStateException(format(
messageString,
actionId,
updated));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,12 @@ public long create(SurveyInstanceCreateCommand command) {
}


public long createPreviousVersion(SurveyInstance currentInstance) {
public long createPreviousVersion(Optional<DSLContext> tx, SurveyInstance currentInstance) {
checkNotNull(currentInstance, "currentInstance cannot be null");

SurveyInstanceRecord record = dsl.newRecord(si);
DSLContext dslContext = tx.orElse(dsl);

SurveyInstanceRecord record = dslContext.newRecord(si);
record.setSurveyRunId(currentInstance.surveyRunId());
record.setEntityKind(currentInstance.surveyEntity().kind().name());
record.setEntityId(currentInstance.surveyEntity().id());
Expand Down Expand Up @@ -313,10 +315,10 @@ public int deleteForSurveyRun(long surveyRunId) {
}


public int updateStatus(long instanceId, SurveyInstanceStatus newStatus) {
public int updateStatus(Optional<DSLContext> tx, long instanceId, SurveyInstanceStatus newStatus) {
checkNotNull(newStatus, "newStatus cannot be null");

return dsl
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.STATUS, newStatus.name())
.where(si.STATUS.notEqual(newStatus.name())
Expand Down Expand Up @@ -367,11 +369,10 @@ public int updateOwningRoleForSurveyRun(long surveyRunId, String role) {
.execute();
}


public int markSubmitted(long instanceId, String userName) {
public int markSubmitted(Optional<DSLContext> tx, long instanceId, String userName) {
checkNotNull(userName, "userName cannot be null");

return dsl
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.STATUS, SurveyInstanceStatus.COMPLETED.name())
.set(si.SUBMITTED_AT, Timestamp.valueOf(nowUtc()))
Expand All @@ -385,10 +386,10 @@ public int markSubmitted(long instanceId, String userName) {
}


public int markApproved(long instanceId, String userName) {
public int markApproved(Optional<DSLContext> tx, long instanceId, String userName) {
checkNotNull(userName, "userName cannot be null");

return dsl
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.APPROVED_AT, Timestamp.valueOf(nowUtc()))
.set(si.APPROVED_BY, userName)
Expand All @@ -399,9 +400,9 @@ public int markApproved(long instanceId, String userName) {
.execute();
}


public int reopenSurvey(long instanceId) {
return dsl
public int reopenSurvey(Optional<DSLContext> tx, long instanceId) {
DSLContext dslContext = tx.orElse(dsl);
return dslContext
.update(si)
.set(si.STATUS, SurveyInstanceStatus.IN_PROGRESS.name())
.set(si.APPROVED_AT, (Timestamp) null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,11 @@ public List<SurveyInstanceQuestionResponse> findForInstance(long surveyInstanceI
}


public int deletePreviousResponse(List<SurveyInstanceQuestionResponse> previousResponses) {
public int deletePreviousResponse(Optional<DSLContext> tx, List<SurveyInstanceQuestionResponse> previousResponses) {
checkNotNull(previousResponses, "responses cannot be null");

DSLContext dslContext = tx.orElse(dsl);

if (!previousResponses.isEmpty()) {
Set<Long> instanceIds = map(
previousResponses,
Expand All @@ -198,13 +201,13 @@ public int deletePreviousResponse(List<SurveyInstanceQuestionResponse> previousR
previousResponses,
qr -> qr.questionResponse().questionId());

int rmSingleCount = dsl
int rmSingleCount = dslContext
.deleteFrom(Tables.SURVEY_QUESTION_RESPONSE)
.where(SURVEY_QUESTION_RESPONSE.SURVEY_INSTANCE_ID.eq(instanceId))
.and(SURVEY_QUESTION_RESPONSE.QUESTION_ID.in(previousResponseIds))
.execute();

int rmListCount = dsl
int rmListCount = dslContext
.deleteFrom(SURVEY_QUESTION_LIST_RESPONSE)
.where(SURVEY_QUESTION_LIST_RESPONSE.SURVEY_INSTANCE_ID.eq(instanceId))
.and(SURVEY_QUESTION_LIST_RESPONSE.QUESTION_ID.in(previousResponseIds))
Expand Down Expand Up @@ -328,8 +331,11 @@ private void saveEntityListResponse(DSLContext txDsl,
}


public void cloneResponses(long sourceSurveyInstanceId, long targetSurveyInstanceId) {
List<SurveyQuestionResponseRecord> responseRecords = dsl
public void cloneResponses(Optional<DSLContext> tx, long sourceSurveyInstanceId, long targetSurveyInstanceId) {

DSLContext dslContext = tx.orElse(dsl);

List<SurveyQuestionResponseRecord> responseRecords = dslContext
.select(SURVEY_QUESTION_RESPONSE.fields())
.select(entityNameField)
.from(SURVEY_QUESTION_RESPONSE)
Expand All @@ -343,7 +349,7 @@ public void cloneResponses(long sourceSurveyInstanceId, long targetSurveyInstanc
})
.collect(toList());

List<SurveyQuestionListResponseRecord> listResponseRecords = dsl
List<SurveyQuestionListResponseRecord> listResponseRecords = dslContext
.select(SURVEY_QUESTION_LIST_RESPONSE.fields())
.from(SURVEY_QUESTION_LIST_RESPONSE)
.where(SURVEY_QUESTION_LIST_RESPONSE.SURVEY_INSTANCE_ID.eq(sourceSurveyInstanceId))
Expand All @@ -356,7 +362,7 @@ public void cloneResponses(long sourceSurveyInstanceId, long targetSurveyInstanc
})
.collect(toList());

dsl.transaction(configuration -> {
dslContext.transaction(configuration -> {
DSLContext txDsl = DSL.using(configuration);

txDsl.batchInsert(responseRecords)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package org.finos.waltz.jobs.harness;

import org.finos.waltz.data.survey.SurveyInstanceDao;
import org.finos.waltz.data.survey.SurveyQuestionResponseDao;
import org.finos.waltz.model.attestation.SyncRecipientsResponse;
import org.finos.waltz.model.survey.SurveyInstanceFormDetails;
import org.finos.waltz.service.DIConfiguration;
import org.finos.waltz.service.survey.SurveyInstanceEvaluator;
import org.finos.waltz.service.survey.SurveyInstanceActionQueueService;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class SurveyInstanceHarness {

public static void main(String[] args) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(DIConfiguration.class);

SurveyInstanceDao dao = ctx.getBean(SurveyInstanceDao.class);
SurveyInstanceActionQueueService svc = ctx.getBean(SurveyInstanceActionQueueService.class);

SyncRecipientsResponse reassignRecipientsCounts = dao.getReassignRecipientsCounts();
svc.performActions();

dao.reassignRecipients();

System.out.println("-------------");
System.out.println("------------- Done!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.time.LocalDate;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -69,7 +70,7 @@ public static void main(String[] args) {
LOG.info("Added recipients to new survey instance [id: {}]", newSiId);

// withdraw the old one
siDao.updateStatus(si.id().get(), WITHDRAWN);
siDao.updateStatus(Optional.empty(), si.id().get(), WITHDRAWN);
LOG.info("Old survey instance [id: {}] withdrawn", si.id().get());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.finos.waltz.model.survey;

import org.finos.waltz.model.IdProvider;
import org.finos.waltz.model.Nullable;
import org.immutables.value.Value;

import java.time.LocalDateTime;

@Value.Immutable
public abstract class SurveyInstanceActionQueueItem implements IdProvider {

public abstract SurveyInstanceAction action();
public abstract Long surveyInstanceId();
@Nullable
public abstract String actionParams();
public abstract SurveyInstanceStatus initialState();
public abstract LocalDateTime submittedAt();
public abstract String submittedBy();
@Nullable
public abstract LocalDateTime actionedAt();
public abstract SurveyInstanceActionStatus status();
@Nullable
public abstract String message();
public abstract String provenance();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.finos.waltz.model.survey;

public enum SurveyInstanceActionStatus {

PENDING,
IN_PROGRESS,
PRECONDITION_FAILURE,
EXECUTION_FAILURE,
SUCCESS

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.finos.waltz.model.physical_flow.PhysicalFlow;
import org.finos.waltz.model.physical_specification.PhysicalSpecification;
import org.finos.waltz.model.tally.DateTally;
import org.jooq.DSLContext;
import org.jooq.lambda.tuple.Tuple2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -152,7 +153,12 @@ public List<ChangeLog> findByUser(String userName,


public int write(ChangeLog changeLog) {
return changeLogDao.write(changeLog);
return changeLogDao.write(Optional.empty(), changeLog);
}


public int write(Optional<DSLContext> tx, ChangeLog changeLog) {
return changeLogDao.write(tx, changeLog);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Optional;

import static java.lang.String.format;
import static org.finos.waltz.common.Checks.checkNotNull;
Expand Down Expand Up @@ -124,7 +125,7 @@ public AppRegistrationResponse promoteToApplication(Long id, ChangeLogComment co

migrateEudaInvolvements(id, appRegistrationResponse);

changeLogDao.write(mkChangeLog(appRegistrationResponse, comment, username));
changeLogDao.write(Optional.empty(), mkChangeLog(appRegistrationResponse, comment, username));

return appRegistrationResponse;
}
Expand Down
Loading

0 comments on commit 0c261d6

Please sign in to comment.