Skip to content

Commit

Permalink
fix: adds order by timestamp (#4316)
Browse files Browse the repository at this point in the history
* fix: adds order by timestamp + tests in state machines

* pr remarks
  • Loading branch information
wolf4ood committed Jul 2, 2024
1 parent d112da3 commit 97a4f64
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ private PolicyMonitorManagerImpl() {

}

@Override
protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) {
return builder
.processor(processEntriesInState(STARTED, this::processMonitoring));
}

@Override
public void startMonitoring(String transferProcessId, String contractId) {
var entry = PolicyMonitorEntry.Builder.newInstance()
Expand All @@ -71,6 +65,12 @@ public void startMonitoring(String transferProcessId, String contractId) {
update(entry);
}

@Override
protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) {
return builder
.processor(processEntriesInState(STARTED, this::processMonitoring));
}

private boolean processMonitoring(PolicyMonitorEntry entry) {
var transferProcess = transferProcessService.findById(entry.getId());
if (transferProcess == null) {
Expand Down Expand Up @@ -110,8 +110,10 @@ private boolean processMonitoring(PolicyMonitorEntry entry) {
}
}

breakLease(entry);
return true;
// we update the state timestamp ensure fairness on polling on `STARTED` state
// the lease will be broken in `onNotProcessed`
entry.updateStateTimestamp();
return false;
}

private Processor processEntriesInState(PolicyMonitorEntryStates state, Function<PolicyMonitorEntry, Boolean> function) {
Expand All @@ -125,14 +127,14 @@ private Processor processEntriesInState(PolicyMonitorEntryStates state, Function
public static class Builder
extends AbstractStateEntityManager.Builder<PolicyMonitorEntry, PolicyMonitorStore, PolicyMonitorManagerImpl, Builder> {

public static Builder newInstance() {
return new Builder();
}

private Builder() {
super(new PolicyMonitorManagerImpl());
}

public static Builder newInstance() {
return new Builder();
}

public Builder contractAgreementService(ContractAgreementService contractAgreementService) {
manager.contractAgreementService = contractAgreementService;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ void started_shouldDoNothing_whenPolicyIsValid() {
.state(STARTED.code())
.build();
var policy = Policy.Builder.newInstance().build();

var stateTimestamp = entry.getStateTimestamp();
when(store.nextNotLeased(anyInt(), stateIs(STARTED.code()))).thenReturn(List.of(entry)).thenReturn(emptyList());
when(transferProcessService.findById(entry.getId()))
.thenReturn(TransferProcess.Builder.newInstance().state(TransferProcessStates.STARTED.code()).build());
Expand All @@ -159,7 +161,7 @@ void started_shouldDoNothing_whenPolicyIsValid() {

await().untilAsserted(() -> {
verify(transferProcessService, never()).terminate(any());
verify(store).save(argThat(it -> it.getState() == STARTED.code()));
verify(store).save(argThat(it -> it.getState() == STARTED.code() && stateTimestamp < it.getStateTimestamp()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS contract_negotiation_id_uindex

CREATE UNIQUE INDEX IF NOT EXISTS contract_agreement_id_uindex
ON edc_contract_agreement (agr_id);


-- This will help to identify states that need to be transitioned without a table scan when the entries grow
CREATE INDEX IF NOT EXISTS contract_negotiation_state ON edc_contract_negotiation (state,state_timestamp);
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,39 @@ public SqlContractNegotiationStore(DataSourceRegistry dataSourceRegistry, String
}

@Override
public @Nullable ContractNegotiation findForCorrelationId(String correlationId) {
public @NotNull List<ContractNegotiation> nextNotLeased(int max, Criterion... criteria) {
return transactionContext.execute(() -> {
// utilize the generic query api
var query = correlationIdQuerySpec(correlationId);
try (var stream = queryNegotiations(query)) {
return single(stream.collect(toList()));
var filter = Arrays.stream(criteria).toList();
var querySpec = QuerySpec.Builder.newInstance().filter(filter).sortField("stateTimestamp").limit(max).build();
var statement = statements.createNegotiationsQuery(querySpec)
.addWhereClause(statements.getNotLeasedFilter(), clock.millis());

try (
var connection = getConnection();
var stream = queryExecutor.query(getConnection(), true, contractNegotiationWithAgreementMapper(connection), statement.getQueryAsString(), statement.getParameters())
) {
var negotiations = stream.collect(toList());
negotiations.forEach(cn -> leaseContext.withConnection(connection).acquireLease(cn.getId()));
return negotiations;
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public @Nullable ContractAgreement findContractAgreement(String contractId) {
public StoreResult<ContractNegotiation> findByIdAndLease(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
return findContractAgreementInternal(connection, contractId);
var entity = findInternal(connection, id);
if (entity == null) {
return StoreResult.notFound(format("ContractNegotiation %s not found", id));
}

leaseContext.withConnection(connection).acquireLease(id);
return StoreResult.success(entity);
} catch (IllegalStateException e) {
return StoreResult.alreadyLeased(format("ContractNegotiation %s is already leased", id));
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
Expand All @@ -119,6 +137,28 @@ public void save(ContractNegotiation negotiation) {

}

@Override
public @Nullable ContractNegotiation findForCorrelationId(String correlationId) {
return transactionContext.execute(() -> {
// utilize the generic query api
var query = correlationIdQuerySpec(correlationId);
try (var stream = queryNegotiations(query)) {
return single(stream.collect(toList()));
}
});
}

@Override
public @Nullable ContractAgreement findContractAgreement(String contractId) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
return findContractAgreementInternal(connection, contractId);
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public void delete(String negotiationId) {
transactionContext.execute(() -> {
Expand Down Expand Up @@ -171,46 +211,6 @@ public void delete(String negotiationId) {
});
}

@Override
public @NotNull List<ContractNegotiation> nextNotLeased(int max, Criterion... criteria) {
return transactionContext.execute(() -> {
var filter = Arrays.stream(criteria).toList();
var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build();
var statement = statements.createNegotiationsQuery(querySpec)
.addWhereClause(statements.getNotLeasedFilter(), clock.millis());

try (
var connection = getConnection();
var stream = queryExecutor.query(getConnection(), true, contractNegotiationWithAgreementMapper(connection), statement.getQueryAsString(), statement.getParameters())
) {
var negotiations = stream.collect(toList());
negotiations.forEach(cn -> leaseContext.withConnection(connection).acquireLease(cn.getId()));
return negotiations;
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public StoreResult<ContractNegotiation> findByIdAndLease(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
var entity = findInternal(connection, id);
if (entity == null) {
return StoreResult.notFound(format("ContractNegotiation %s not found", id));
}

leaseContext.withConnection(connection).acquireLease(id);
return StoreResult.success(entity);
} catch (IllegalStateException e) {
return StoreResult.alreadyLeased(format("ContractNegotiation %s is already leased", id));
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public StoreResult<ContractNegotiation> findByCorrelationIdAndLease(String correlationId) {
return transactionContext.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ CREATE UNIQUE INDEX IF NOT EXISTS transfer_process_id_uindex

CREATE UNIQUE INDEX IF NOT EXISTS lease_lease_id_uindex
ON edc_lease (lease_id);

-- This will help to identify states that need to be transitioned without a table scan when the entries grow
CREATE INDEX IF NOT EXISTS transfer_process_state ON edc_transfer_process (state,state_time_stamp);
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,22 @@ public SqlTransferProcessStore(DataSourceRegistry dataSourceRegistry, String dat
leaseContext = SqlLeaseContextBuilder.with(transactionContext, leaseHolderName, statements, clock, queryExecutor);
}

@Override
public @Nullable TransferProcess findById(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
return findByIdInternal(connection, id);
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public @NotNull List<TransferProcess> nextNotLeased(int max, Criterion... criteria) {
return transactionContext.execute(() -> {
var filter = Arrays.stream(criteria).collect(toList());
var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build();
var querySpec = QuerySpec.Builder.newInstance().filter(filter).sortField("stateTimestamp").limit(max).build();
var statement = statements.createQuery(querySpec)
.addWhereClause(statements.getNotLeasedFilter(), clock.millis());

Expand Down Expand Up @@ -110,30 +121,6 @@ public StoreResult<TransferProcess> findByIdAndLease(String id) {
});
}

@Override
public StoreResult<TransferProcess> findByCorrelationIdAndLease(String correlationId) {
return transactionContext.execute(() -> {
var query = correlationIdQuerySpec(correlationId);

try (
var connection = getConnection();
var stream = executeQuery(connection, query)
) {
var entity = stream.findFirst().orElse(null);
if (entity == null) {
return StoreResult.notFound(format("TransferProcess with correlationId %s not found", correlationId));
}

leaseContext.withConnection(connection).acquireLease(entity.getId());
return StoreResult.success(entity);
} catch (IllegalStateException e) {
return StoreResult.alreadyLeased(format("TransferProcess with correlationId %s is already leased", correlationId));
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public void save(TransferProcess entity) {
Objects.requireNonNull(entity.getId(), "TransferProcesses must have an ID!");
Expand All @@ -152,17 +139,6 @@ public void save(TransferProcess entity) {
});
}

@Override
public @Nullable TransferProcess findById(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
return findByIdInternal(connection, id);
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public @Nullable TransferProcess findForCorrelationId(String correlationId) {
return transactionContext.execute(() -> {
Expand Down Expand Up @@ -207,6 +183,30 @@ public Stream<TransferProcess> findAll(QuerySpec querySpec) {
});
}

@Override
public StoreResult<TransferProcess> findByCorrelationIdAndLease(String correlationId) {
return transactionContext.execute(() -> {
var query = correlationIdQuerySpec(correlationId);

try (
var connection = getConnection();
var stream = executeQuery(connection, query)
) {
var entity = stream.findFirst().orElse(null);
if (entity == null) {
return StoreResult.notFound(format("TransferProcess with correlationId %s not found", correlationId));
}

leaseContext.withConnection(connection).acquireLease(entity.getId());
return StoreResult.success(entity);
} catch (IllegalStateException e) {
return StoreResult.alreadyLeased(format("TransferProcess with correlationId %s is already leased", correlationId));
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

private QuerySpec correlationIdQuerySpec(String correlationId) {
var criterion = criterion("correlationId", "=", correlationId);
return QuerySpec.Builder.newInstance().filter(criterion).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ COMMENT ON COLUMN edc_data_plane.trace_context IS 'Java Map serialized as JSON';
COMMENT ON COLUMN edc_data_plane.source IS 'DataAddress serialized as JSON';
COMMENT ON COLUMN edc_data_plane.destination IS 'DataAddress serialized as JSON';
COMMENT ON COLUMN edc_data_plane.properties IS 'Java Map serialized as JSON';

-- This will help to identify states that need to be transitioned without a table scan when the entries grow
CREATE INDEX IF NOT EXISTS data_plane_state ON edc_data_plane (state,state_time_stamp);
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public SqlDataPlaneStore(DataSourceRegistry dataSourceRegistry, String dataSourc
public @NotNull List<DataFlow> nextNotLeased(int max, Criterion... criteria) {
return transactionContext.execute(() -> {
var filter = Arrays.stream(criteria).collect(toList());
var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build();
var querySpec = QuerySpec.Builder.newInstance().filter(filter).sortField("stateTimestamp").limit(max).build();
var statement = statements.createQuery(querySpec)
.addWhereClause(statements.getNotLeasedFilter(), clock.millis());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ CREATE TABLE IF NOT EXISTS edc_policy_monitor
properties JSON,
contract_id VARCHAR
);


-- This will help to identify states that need to be transitioned without a table scan when the entries grow
CREATE INDEX IF NOT EXISTS policy_monitor_state ON edc_policy_monitor (state,state_time_stamp);
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public SqlPolicyMonitorStore(DataSourceRegistry dataSourceRegistry, String dataS
public @NotNull List<PolicyMonitorEntry> nextNotLeased(int max, Criterion... criteria) {
return transactionContext.execute(() -> {
var filter = Arrays.stream(criteria).collect(toList());
var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build();
var querySpec = QuerySpec.Builder.newInstance().filter(filter).sortField("stateTimestamp").limit(max).build();
var statement = statements.createQuery(querySpec)
.addWhereClause(statements.getNotLeasedFilter(), clock.millis());

Expand Down
Loading

0 comments on commit 97a4f64

Please sign in to comment.