diff --git a/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/manager/PolicyMonitorManagerImpl.java b/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/manager/PolicyMonitorManagerImpl.java index f8191053bc9..1fb5e8279d3 100644 --- a/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/manager/PolicyMonitorManagerImpl.java +++ b/core/policy-monitor/policy-monitor-core/src/main/java/org/eclipse/edc/connector/policy/monitor/manager/PolicyMonitorManagerImpl.java @@ -52,12 +52,6 @@ private PolicyMonitorManagerImpl() { } - @Override - protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) { - return builder - .processor(processEntriesInState(STARTED, this::processMonitoring)); - } - @Override public void startMonitoring(String transferProcessId, String contractId) { var entry = PolicyMonitorEntry.Builder.newInstance() @@ -71,6 +65,12 @@ public void startMonitoring(String transferProcessId, String contractId) { update(entry); } + @Override + protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) { + return builder + .processor(processEntriesInState(STARTED, this::processMonitoring)); + } + private boolean processMonitoring(PolicyMonitorEntry entry) { var transferProcess = transferProcessService.findById(entry.getId()); if (transferProcess == null) { @@ -110,8 +110,10 @@ private boolean processMonitoring(PolicyMonitorEntry entry) { } } - breakLease(entry); - return true; + // we update the state timestamp ensure fairness on polling on `STARTED` state + // the lease will be broken in `onNotProcessed` + entry.updateStateTimestamp(); + return false; } private Processor processEntriesInState(PolicyMonitorEntryStates state, Function function) { @@ -125,14 +127,14 @@ private Processor processEntriesInState(PolicyMonitorEntryStates state, Function public static class Builder extends AbstractStateEntityManager.Builder { - public static Builder newInstance() { - return new Builder(); - } - private Builder() { super(new PolicyMonitorManagerImpl()); } + public static Builder newInstance() { + return new Builder(); + } + public Builder contractAgreementService(ContractAgreementService contractAgreementService) { manager.contractAgreementService = contractAgreementService; return this; diff --git a/core/policy-monitor/policy-monitor-core/src/test/java/org/eclipse/edc/connector/policy/monitor/manager/PolicyMonitorManagerImplTest.java b/core/policy-monitor/policy-monitor-core/src/test/java/org/eclipse/edc/connector/policy/monitor/manager/PolicyMonitorManagerImplTest.java index f86e00003f3..8fbc0f83259 100644 --- a/core/policy-monitor/policy-monitor-core/src/test/java/org/eclipse/edc/connector/policy/monitor/manager/PolicyMonitorManagerImplTest.java +++ b/core/policy-monitor/policy-monitor-core/src/test/java/org/eclipse/edc/connector/policy/monitor/manager/PolicyMonitorManagerImplTest.java @@ -149,6 +149,8 @@ void started_shouldDoNothing_whenPolicyIsValid() { .state(STARTED.code()) .build(); var policy = Policy.Builder.newInstance().build(); + + var stateTimestamp = entry.getStateTimestamp(); when(store.nextNotLeased(anyInt(), stateIs(STARTED.code()))).thenReturn(List.of(entry)).thenReturn(emptyList()); when(transferProcessService.findById(entry.getId())) .thenReturn(TransferProcess.Builder.newInstance().state(TransferProcessStates.STARTED.code()).build()); @@ -159,7 +161,7 @@ void started_shouldDoNothing_whenPolicyIsValid() { await().untilAsserted(() -> { verify(transferProcessService, never()).terminate(any()); - verify(store).save(argThat(it -> it.getState() == STARTED.code())); + verify(store).save(argThat(it -> it.getState() == STARTED.code() && stateTimestamp < it.getStateTimestamp())); }); } diff --git a/extensions/control-plane/store/sql/contract-negotiation-store-sql/docs/schema.sql b/extensions/control-plane/store/sql/contract-negotiation-store-sql/docs/schema.sql index 2d909ec4107..02d64c4911d 100644 --- a/extensions/control-plane/store/sql/contract-negotiation-store-sql/docs/schema.sql +++ b/extensions/control-plane/store/sql/contract-negotiation-store-sql/docs/schema.sql @@ -80,3 +80,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS contract_negotiation_id_uindex CREATE UNIQUE INDEX IF NOT EXISTS contract_agreement_id_uindex ON edc_contract_agreement (agr_id); + + +-- This will help to identify states that need to be transitioned without a table scan when the entries grow +CREATE INDEX IF NOT EXISTS contract_negotiation_state ON edc_contract_negotiation (state,state_timestamp); \ No newline at end of file diff --git a/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/controlplane/store/sql/contractnegotiation/store/SqlContractNegotiationStore.java b/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/controlplane/store/sql/contractnegotiation/store/SqlContractNegotiationStore.java index 14bab58b619..4baadc82018 100644 --- a/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/controlplane/store/sql/contractnegotiation/store/SqlContractNegotiationStore.java +++ b/extensions/control-plane/store/sql/contract-negotiation-store-sql/src/main/java/org/eclipse/edc/connector/controlplane/store/sql/contractnegotiation/store/SqlContractNegotiationStore.java @@ -79,21 +79,39 @@ public SqlContractNegotiationStore(DataSourceRegistry dataSourceRegistry, String } @Override - public @Nullable ContractNegotiation findForCorrelationId(String correlationId) { + public @NotNull List nextNotLeased(int max, Criterion... criteria) { return transactionContext.execute(() -> { - // utilize the generic query api - var query = correlationIdQuerySpec(correlationId); - try (var stream = queryNegotiations(query)) { - return single(stream.collect(toList())); + var filter = Arrays.stream(criteria).toList(); + var querySpec = QuerySpec.Builder.newInstance().filter(filter).sortField("stateTimestamp").limit(max).build(); + var statement = statements.createNegotiationsQuery(querySpec) + .addWhereClause(statements.getNotLeasedFilter(), clock.millis()); + + try ( + var connection = getConnection(); + var stream = queryExecutor.query(getConnection(), true, contractNegotiationWithAgreementMapper(connection), statement.getQueryAsString(), statement.getParameters()) + ) { + var negotiations = stream.collect(toList()); + negotiations.forEach(cn -> leaseContext.withConnection(connection).acquireLease(cn.getId())); + return negotiations; + } catch (SQLException e) { + throw new EdcPersistenceException(e); } }); } @Override - public @Nullable ContractAgreement findContractAgreement(String contractId) { + public StoreResult findByIdAndLease(String id) { return transactionContext.execute(() -> { try (var connection = getConnection()) { - return findContractAgreementInternal(connection, contractId); + var entity = findInternal(connection, id); + if (entity == null) { + return StoreResult.notFound(format("ContractNegotiation %s not found", id)); + } + + leaseContext.withConnection(connection).acquireLease(id); + return StoreResult.success(entity); + } catch (IllegalStateException e) { + return StoreResult.alreadyLeased(format("ContractNegotiation %s is already leased", id)); } catch (SQLException e) { throw new EdcPersistenceException(e); } @@ -119,6 +137,28 @@ public void save(ContractNegotiation negotiation) { } + @Override + public @Nullable ContractNegotiation findForCorrelationId(String correlationId) { + return transactionContext.execute(() -> { + // utilize the generic query api + var query = correlationIdQuerySpec(correlationId); + try (var stream = queryNegotiations(query)) { + return single(stream.collect(toList())); + } + }); + } + + @Override + public @Nullable ContractAgreement findContractAgreement(String contractId) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + return findContractAgreementInternal(connection, contractId); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + @Override public void delete(String negotiationId) { transactionContext.execute(() -> { @@ -171,46 +211,6 @@ public void delete(String negotiationId) { }); } - @Override - public @NotNull List nextNotLeased(int max, Criterion... criteria) { - return transactionContext.execute(() -> { - var filter = Arrays.stream(criteria).toList(); - var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build(); - var statement = statements.createNegotiationsQuery(querySpec) - .addWhereClause(statements.getNotLeasedFilter(), clock.millis()); - - try ( - var connection = getConnection(); - var stream = queryExecutor.query(getConnection(), true, contractNegotiationWithAgreementMapper(connection), statement.getQueryAsString(), statement.getParameters()) - ) { - var negotiations = stream.collect(toList()); - negotiations.forEach(cn -> leaseContext.withConnection(connection).acquireLease(cn.getId())); - return negotiations; - } catch (SQLException e) { - throw new EdcPersistenceException(e); - } - }); - } - - @Override - public StoreResult findByIdAndLease(String id) { - return transactionContext.execute(() -> { - try (var connection = getConnection()) { - var entity = findInternal(connection, id); - if (entity == null) { - return StoreResult.notFound(format("ContractNegotiation %s not found", id)); - } - - leaseContext.withConnection(connection).acquireLease(id); - return StoreResult.success(entity); - } catch (IllegalStateException e) { - return StoreResult.alreadyLeased(format("ContractNegotiation %s is already leased", id)); - } catch (SQLException e) { - throw new EdcPersistenceException(e); - } - }); - } - @Override public StoreResult findByCorrelationIdAndLease(String correlationId) { return transactionContext.execute(() -> { diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql b/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql index 1e629968a21..ab16436cb42 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/docs/schema.sql @@ -65,3 +65,6 @@ CREATE UNIQUE INDEX IF NOT EXISTS transfer_process_id_uindex CREATE UNIQUE INDEX IF NOT EXISTS lease_lease_id_uindex ON edc_lease (lease_id); + +-- This will help to identify states that need to be transitioned without a table scan when the entries grow +CREATE INDEX IF NOT EXISTS transfer_process_state ON edc_transfer_process (state,state_time_stamp); \ No newline at end of file diff --git a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/controlplane/store/sql/transferprocess/store/SqlTransferProcessStore.java b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/controlplane/store/sql/transferprocess/store/SqlTransferProcessStore.java index a0f1a38d516..118a578ee50 100644 --- a/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/controlplane/store/sql/transferprocess/store/SqlTransferProcessStore.java +++ b/extensions/control-plane/store/sql/transfer-process-store-sql/src/main/java/org/eclipse/edc/connector/controlplane/store/sql/transferprocess/store/SqlTransferProcessStore.java @@ -70,11 +70,22 @@ public SqlTransferProcessStore(DataSourceRegistry dataSourceRegistry, String dat leaseContext = SqlLeaseContextBuilder.with(transactionContext, leaseHolderName, statements, clock, queryExecutor); } + @Override + public @Nullable TransferProcess findById(String id) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + return findByIdInternal(connection, id); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + @Override public @NotNull List nextNotLeased(int max, Criterion... criteria) { return transactionContext.execute(() -> { var filter = Arrays.stream(criteria).collect(toList()); - var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build(); + var querySpec = QuerySpec.Builder.newInstance().filter(filter).sortField("stateTimestamp").limit(max).build(); var statement = statements.createQuery(querySpec) .addWhereClause(statements.getNotLeasedFilter(), clock.millis()); @@ -110,30 +121,6 @@ public StoreResult findByIdAndLease(String id) { }); } - @Override - public StoreResult findByCorrelationIdAndLease(String correlationId) { - return transactionContext.execute(() -> { - var query = correlationIdQuerySpec(correlationId); - - try ( - var connection = getConnection(); - var stream = executeQuery(connection, query) - ) { - var entity = stream.findFirst().orElse(null); - if (entity == null) { - return StoreResult.notFound(format("TransferProcess with correlationId %s not found", correlationId)); - } - - leaseContext.withConnection(connection).acquireLease(entity.getId()); - return StoreResult.success(entity); - } catch (IllegalStateException e) { - return StoreResult.alreadyLeased(format("TransferProcess with correlationId %s is already leased", correlationId)); - } catch (SQLException e) { - throw new EdcPersistenceException(e); - } - }); - } - @Override public void save(TransferProcess entity) { Objects.requireNonNull(entity.getId(), "TransferProcesses must have an ID!"); @@ -152,17 +139,6 @@ public void save(TransferProcess entity) { }); } - @Override - public @Nullable TransferProcess findById(String id) { - return transactionContext.execute(() -> { - try (var connection = getConnection()) { - return findByIdInternal(connection, id); - } catch (SQLException e) { - throw new EdcPersistenceException(e); - } - }); - } - @Override public @Nullable TransferProcess findForCorrelationId(String correlationId) { return transactionContext.execute(() -> { @@ -207,6 +183,30 @@ public Stream findAll(QuerySpec querySpec) { }); } + @Override + public StoreResult findByCorrelationIdAndLease(String correlationId) { + return transactionContext.execute(() -> { + var query = correlationIdQuerySpec(correlationId); + + try ( + var connection = getConnection(); + var stream = executeQuery(connection, query) + ) { + var entity = stream.findFirst().orElse(null); + if (entity == null) { + return StoreResult.notFound(format("TransferProcess with correlationId %s not found", correlationId)); + } + + leaseContext.withConnection(connection).acquireLease(entity.getId()); + return StoreResult.success(entity); + } catch (IllegalStateException e) { + return StoreResult.alreadyLeased(format("TransferProcess with correlationId %s is already leased", correlationId)); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + private QuerySpec correlationIdQuerySpec(String correlationId) { var criterion = criterion("correlationId", "=", correlationId); return QuerySpec.Builder.newInstance().filter(criterion).build(); diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql b/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql index a7adac0d27e..768320ac590 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql +++ b/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql @@ -38,3 +38,6 @@ COMMENT ON COLUMN edc_data_plane.trace_context IS 'Java Map serialized as JSON'; COMMENT ON COLUMN edc_data_plane.source IS 'DataAddress serialized as JSON'; COMMENT ON COLUMN edc_data_plane.destination IS 'DataAddress serialized as JSON'; COMMENT ON COLUMN edc_data_plane.properties IS 'Java Map serialized as JSON'; + +-- This will help to identify states that need to be transitioned without a table scan when the entries grow +CREATE INDEX IF NOT EXISTS data_plane_state ON edc_data_plane (state,state_time_stamp); diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java index ebf1bc2f703..75b802a4b54 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java @@ -81,7 +81,7 @@ public SqlDataPlaneStore(DataSourceRegistry dataSourceRegistry, String dataSourc public @NotNull List nextNotLeased(int max, Criterion... criteria) { return transactionContext.execute(() -> { var filter = Arrays.stream(criteria).collect(toList()); - var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build(); + var querySpec = QuerySpec.Builder.newInstance().filter(filter).sortField("stateTimestamp").limit(max).build(); var statement = statements.createQuery(querySpec) .addWhereClause(statements.getNotLeasedFilter(), clock.millis()); diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/docs/schema.sql b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/docs/schema.sql index 9400e939df9..571e69ea034 100644 --- a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/docs/schema.sql +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/docs/schema.sql @@ -30,3 +30,7 @@ CREATE TABLE IF NOT EXISTS edc_policy_monitor properties JSON, contract_id VARCHAR ); + + +-- This will help to identify states that need to be transitioned without a table scan when the entries grow +CREATE INDEX IF NOT EXISTS policy_monitor_state ON edc_policy_monitor (state,state_time_stamp); \ No newline at end of file diff --git a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStore.java b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStore.java index 19f29a0adc9..3c23d65320a 100644 --- a/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStore.java +++ b/extensions/policy-monitor/store/sql/policy-monitor-store-sql/src/main/java/org/eclipse/edc/connector/policy/monitor/store/sql/SqlPolicyMonitorStore.java @@ -74,7 +74,7 @@ public SqlPolicyMonitorStore(DataSourceRegistry dataSourceRegistry, String dataS public @NotNull List nextNotLeased(int max, Criterion... criteria) { return transactionContext.execute(() -> { var filter = Arrays.stream(criteria).collect(toList()); - var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build(); + var querySpec = QuerySpec.Builder.newInstance().filter(filter).sortField("stateTimestamp").limit(max).build(); var statement = statements.createQuery(querySpec) .addWhereClause(statements.getNotLeasedFilter(), clock.millis()); diff --git a/spi/control-plane/contract-spi/src/testFixtures/java/org/eclipse/edc/connector/controlplane/contract/spi/testfixtures/negotiation/store/ContractNegotiationStoreTestBase.java b/spi/control-plane/contract-spi/src/testFixtures/java/org/eclipse/edc/connector/controlplane/contract/spi/testfixtures/negotiation/store/ContractNegotiationStoreTestBase.java index abd7222fc3f..cb3291596f5 100644 --- a/spi/control-plane/contract-spi/src/testFixtures/java/org/eclipse/edc/connector/controlplane/contract/spi/testfixtures/negotiation/store/ContractNegotiationStoreTestBase.java +++ b/spi/control-plane/contract-spi/src/testFixtures/java/org/eclipse/edc/connector/controlplane/contract/spi/testfixtures/negotiation/store/ContractNegotiationStoreTestBase.java @@ -26,6 +26,7 @@ import org.eclipse.edc.policy.model.Operator; import org.eclipse.edc.policy.model.Permission; import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.query.SortOrder; @@ -63,8 +64,18 @@ public abstract class ContractNegotiationStoreTestBase { protected static final String CONNECTOR_NAME = "test-connector"; - protected final Clock clock = Clock.systemUTC(); private static final String ASSET_ID = "TEST_ASSET_ID"; + protected final Clock clock = Clock.systemUTC(); + + protected abstract ContractNegotiationStore getContractNegotiationStore(); + + protected abstract void leaseEntity(String negotiationId, String owner, Duration duration); + + protected void leaseEntity(String negotiationId, String owner) { + leaseEntity(negotiationId, owner, Duration.ofSeconds(60)); + } + + protected abstract boolean isLeasedBy(String negotiationId, String owner); @Nested class FindById { @@ -800,6 +811,32 @@ void avoidsStarvation() { assertThat(list1).isNotEqualTo(list2).doesNotContainAnyElementsOf(list2); } + + @Test + void shouldLeaseOrderByStateTimestamp() { + + var all = range(0, 10) + .mapToObj(i -> createNegotiation("id-" + i)) + .peek(getContractNegotiationStore()::save) + .toList(); + + all.stream().limit(5) + .peek(this::delayByTenMillis) + .sorted(Comparator.comparing(ContractNegotiation::getStateTimestamp).reversed()) + .forEach(f -> getContractNegotiationStore().save(f)); + + var elements = getContractNegotiationStore().nextNotLeased(10, hasState(REQUESTED.code())); + assertThat(elements).hasSize(10).extracting(ContractNegotiation::getStateTimestamp).isSorted(); + } + + private void delayByTenMillis(StatefulEntity t) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + // noop + } + t.updateStateTimestamp(); + } } @Nested @@ -869,14 +906,4 @@ void shouldReturnAlreadyLeased_whenEntityIsAlreadyLeased() { } } - protected abstract ContractNegotiationStore getContractNegotiationStore(); - - protected abstract void leaseEntity(String negotiationId, String owner, Duration duration); - - protected void leaseEntity(String negotiationId, String owner) { - leaseEntity(negotiationId, owner, Duration.ofSeconds(60)); - } - - protected abstract boolean isLeasedBy(String negotiationId, String owner); - } diff --git a/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/controlplane/transfer/spi/testfixtures/store/TransferProcessStoreTestBase.java b/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/controlplane/transfer/spi/testfixtures/store/TransferProcessStoreTestBase.java index 3c6089f6f28..25b2f4ff27e 100644 --- a/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/controlplane/transfer/spi/testfixtures/store/TransferProcessStoreTestBase.java +++ b/spi/control-plane/transfer-spi/src/testFixtures/java/org/eclipse/edc/connector/controlplane/transfer/spi/testfixtures/store/TransferProcessStoreTestBase.java @@ -42,6 +42,7 @@ import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.eclipse.edc.connector.controlplane.transfer.spi.testfixtures.store.TestFunctions.createTransferProcess; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.INITIAL; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.PROVISIONING; @@ -129,10 +130,10 @@ void verifyDataPlaneId() { @Test void withSameIdExists_shouldReplace() { - var t = TestFunctions.createTransferProcess("id1", INITIAL); + var t = createTransferProcess("id1", INITIAL); getTransferProcessStore().save(t); - var t2 = TestFunctions.createTransferProcess("id1", PROVISIONING); + var t2 = createTransferProcess("id1", PROVISIONING); getTransferProcessStore().save(t2); assertThat(getTransferProcessStore().findAll(QuerySpec.none())).hasSize(1).containsExactly(t2); @@ -145,7 +146,7 @@ class NextNotLeased { void shouldReturnNotLeasedItems() { var state = STARTED; var all = range(0, 10) - .mapToObj(i -> TestFunctions.createTransferProcess("id" + i, state)) + .mapToObj(i -> createTransferProcess("id" + i, state)) .peek(getTransferProcessStore()::save) .toList(); @@ -160,7 +161,7 @@ void shouldReturnNotLeasedItems() { void shouldOnlyReturnFreeItems() { var state = STARTED; var all = range(0, 10) - .mapToObj(i -> TestFunctions.createTransferProcess("id" + i, state)) + .mapToObj(i -> createTransferProcess("id" + i, state)) .peek(getTransferProcessStore()::save) .collect(Collectors.toList()); @@ -178,7 +179,7 @@ void shouldOnlyReturnFreeItems() { void noFreeItem_shouldReturnEmpty() { var state = STARTED; range(0, 3) - .mapToObj(i -> TestFunctions.createTransferProcess("id" + i, state)) + .mapToObj(i -> createTransferProcess("id" + i, state)) .forEach(getTransferProcessStore()::save); // first time works @@ -190,7 +191,7 @@ void noFreeItem_shouldReturnEmpty() { @Test void noneInDesiredState() { range(0, 3) - .mapToObj(i -> TestFunctions.createTransferProcess("id" + i, STARTED)) + .mapToObj(i -> createTransferProcess("id" + i, STARTED)) .forEach(getTransferProcessStore()::save); var nextNotLeased = getTransferProcessStore().nextNotLeased(10, hasState(TERMINATED.code())); @@ -202,7 +203,7 @@ void noneInDesiredState() { void batchSizeLimits() { var state = STARTED; range(0, 10) - .mapToObj(i -> TestFunctions.createTransferProcess("id" + i, state)) + .mapToObj(i -> createTransferProcess("id" + i, state)) .forEach(getTransferProcessStore()::save); // first time works @@ -214,7 +215,7 @@ void batchSizeLimits() { void verifyTemporalOrdering() { var state = STARTED; range(0, 10) - .mapToObj(i -> TestFunctions.createTransferProcess(String.valueOf(i), state)) + .mapToObj(i -> createTransferProcess(String.valueOf(i), state)) .peek(this::delayByTenMillis) .forEach(getTransferProcessStore()::save); @@ -227,7 +228,7 @@ void verifyTemporalOrdering() { @Test void verifyMostRecentlyUpdatedIsLast() throws InterruptedException { var all = range(0, 10) - .mapToObj(i -> TestFunctions.createTransferProcess("id" + i, STARTED)) + .mapToObj(i -> createTransferProcess("id" + i, STARTED)) .peek(getTransferProcessStore()::save) .toList(); @@ -244,7 +245,7 @@ void verifyMostRecentlyUpdatedIsLast() throws InterruptedException { @Test @DisplayName("Verifies that calling nextNotLeased locks the TP for any subsequent calls") void locksEntity() { - var t = TestFunctions.createTransferProcess("id1", INITIAL); + var t = createTransferProcess("id1", INITIAL); getTransferProcessStore().save(t); getTransferProcessStore().nextNotLeased(100, hasState(INITIAL.code())); @@ -254,7 +255,7 @@ void locksEntity() { @Test void expiredLease() { - var t = TestFunctions.createTransferProcess("id1", INITIAL); + var t = createTransferProcess("id1", INITIAL); getTransferProcessStore().save(t); leaseEntity(t.getId(), CONNECTOR_NAME, Duration.ofMillis(100)); @@ -285,7 +286,7 @@ void shouldLeaseEntityUntilUpdate() { @Test void avoidsStarvation() throws InterruptedException { for (int i = 0; i < 10; i++) { - var process = TestFunctions.createTransferProcess("test-process-" + i); + var process = createTransferProcess("test-process-" + i); getTransferProcessStore().save(process); } @@ -299,6 +300,23 @@ void avoidsStarvation() throws InterruptedException { assertThat(list1).isNotEqualTo(list2).doesNotContainAnyElementsOf(list2); } + @Test + void shouldLeaseOrderByStateTimestamp() { + + var all = range(0, 10) + .mapToObj(i -> createTransferProcess("id-" + i)) + .peek(getTransferProcessStore()::save) + .toList(); + + all.stream().limit(5) + .peek(this::delayByTenMillis) + .sorted(Comparator.comparing(TransferProcess::getStateTimestamp).reversed()) + .forEach(f -> getTransferProcessStore().save(f)); + + var elements = getTransferProcessStore().nextNotLeased(10, hasState(INITIAL.code())); + assertThat(elements).hasSize(10).extracting(TransferProcess::getStateTimestamp).isSorted(); + } + private void delayByTenMillis(TransferProcess t) { try { Thread.sleep(10); @@ -313,7 +331,7 @@ private void delayByTenMillis(TransferProcess t) { class FindById { @Test void shouldFindEntityById() { - var t = TestFunctions.createTransferProcess("id1"); + var t = createTransferProcess("id1"); getTransferProcessStore().save(t); var result = getTransferProcessStore().findById("id1"); @@ -352,7 +370,7 @@ class Update { @Test void shouldUpdate() { - var transferProcess = TestFunctions.createTransferProcess("id1", STARTED); + var transferProcess = createTransferProcess("id1", STARTED); getTransferProcessStore().save(transferProcess); //modify transferProcess.transitionCompleted(); @@ -371,7 +389,7 @@ void shouldUpdate() { @Test @DisplayName("Verify that the lease on a TP is cleared by an update") void shouldBreakLease() { - var t1 = TestFunctions.createTransferProcess("id1"); + var t1 = createTransferProcess("id1"); getTransferProcessStore().save(t1); // acquire lease leaseEntity(t1.getId(), CONNECTOR_NAME); @@ -388,7 +406,7 @@ void shouldBreakLease() { @Test void leasedByOther_shouldThrowException() { var tpId = "id1"; - var t1 = TestFunctions.createTransferProcess(tpId); + var t1 = createTransferProcess(tpId); getTransferProcessStore().save(t1); leaseEntity(tpId, "someone"); @@ -421,7 +439,7 @@ void shouldReplaceDataRequest_whenItGetsTheIdUpdated() { class Delete { @Test void shouldDeleteTheEntityById() { - var t1 = TestFunctions.createTransferProcess("id1"); + var t1 = createTransferProcess("id1"); getTransferProcessStore().save(t1); getTransferProcessStore().delete("id1"); @@ -430,7 +448,7 @@ void shouldDeleteTheEntityById() { @Test void isLeasedBySelf_shouldThrowException() { - var t1 = TestFunctions.createTransferProcess("id1"); + var t1 = createTransferProcess("id1"); getTransferProcessStore().save(t1); leaseEntity(t1.getId(), CONNECTOR_NAME); @@ -440,7 +458,7 @@ void isLeasedBySelf_shouldThrowException() { @Test void isLeasedByOther_shouldThrowException() { - var t1 = TestFunctions.createTransferProcess("id1"); + var t1 = createTransferProcess("id1"); getTransferProcessStore().save(t1); leaseEntity(t1.getId(), "someone-else"); @@ -460,7 +478,7 @@ class FindAll { @Test void noQuerySpec() { var all = range(0, 10) - .mapToObj(i -> TestFunctions.createTransferProcess("id" + i)) + .mapToObj(i -> createTransferProcess("id" + i)) .peek(getTransferProcessStore()::save) .collect(Collectors.toList()); @@ -469,7 +487,7 @@ void noQuerySpec() { @Test void verifyFiltering() { - range(0, 10).forEach(i -> getTransferProcessStore().save(TestFunctions.createTransferProcess("test-neg-" + i))); + range(0, 10).forEach(i -> getTransferProcessStore().save(createTransferProcess("test-neg-" + i))); var querySpec = QuerySpec.Builder.newInstance().filter(Criterion.criterion("id", "=", "test-neg-3")).build(); var result = getTransferProcessStore().findAll(querySpec); @@ -479,7 +497,7 @@ void verifyFiltering() { @Test void shouldThrowException_whenInvalidOperator() { - range(0, 10).forEach(i -> getTransferProcessStore().save(TestFunctions.createTransferProcess("test-neg-" + i))); + range(0, 10).forEach(i -> getTransferProcessStore().save(createTransferProcess("test-neg-" + i))); var querySpec = QuerySpec.Builder.newInstance().filter(Criterion.criterion("id", "foobar", "other")).build(); assertThatThrownBy(() -> getTransferProcessStore().findAll(querySpec).toList()).isInstanceOf(IllegalArgumentException.class); @@ -512,7 +530,7 @@ void queryByTransferType() { @Test void verifySorting() { - range(0, 10).forEach(i -> getTransferProcessStore().save(TestFunctions.createTransferProcess("test-neg-" + i))); + range(0, 10).forEach(i -> getTransferProcessStore().save(createTransferProcess("test-neg-" + i))); assertThat(getTransferProcessStore().findAll(QuerySpec.Builder.newInstance().sortField("id").sortOrder(SortOrder.ASC).build())).hasSize(10).isSortedAccordingTo(Comparator.comparing(TransferProcess::getId)); assertThat(getTransferProcessStore().findAll(QuerySpec.Builder.newInstance().sortField("id").sortOrder(SortOrder.DESC).build())).hasSize(10).isSortedAccordingTo((c1, c2) -> c2.getId().compareTo(c1.getId())); @@ -521,7 +539,7 @@ void verifySorting() { @Test void verifyPaging() { range(0, 10) - .mapToObj(i -> TestFunctions.createTransferProcess(String.valueOf(i))) + .mapToObj(i -> createTransferProcess(String.valueOf(i))) .forEach(getTransferProcessStore()::save); var qs = QuerySpec.Builder.newInstance().limit(5).offset(3).build(); @@ -535,7 +553,7 @@ void verifyPaging() { void verifyPaging_pageSizeLargerThanCollection() { range(0, 10) - .mapToObj(i -> TestFunctions.createTransferProcess(String.valueOf(i))) + .mapToObj(i -> createTransferProcess(String.valueOf(i))) .forEach(getTransferProcessStore()::save); var qs = QuerySpec.Builder.newInstance().limit(20).offset(3).build(); @@ -550,7 +568,7 @@ void verifyPaging_pageSizeLargerThanCollection() { void verifyPaging_pageSizeOutsideCollection() { range(0, 10) - .mapToObj(i -> TestFunctions.createTransferProcess(String.valueOf(i))) + .mapToObj(i -> createTransferProcess(String.valueOf(i))) .forEach(getTransferProcessStore()::save); var qs = QuerySpec.Builder.newInstance().limit(10).offset(12).build(); @@ -567,7 +585,7 @@ void queryByDataAddressProperty() { .contentDataAddress(da) .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() .filter(List.of(new Criterion("contentDataAddress.properties.key", "=", "value"))) @@ -587,7 +605,7 @@ void queryByDataAddress_propNotExist() { .contentDataAddress(da) .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() .filter(List.of(new Criterion("contentDataAddress.properties.notexist", "=", "value"))) @@ -605,7 +623,7 @@ void queryByDataAddress_invalidKey_valueNotExist() { .contentDataAddress(da) .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() .filter(List.of(new Criterion("contentDataAddress.properties.key", "=", "notexist"))) @@ -620,7 +638,7 @@ void queryByCorrelationId() { .correlationId("counterPartyId") .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() .filter(List.of(new Criterion("correlationId", "=", "counterPartyId"))) @@ -637,7 +655,7 @@ void queryByDataRequestProperty_protocol() { .protocol("test-protocol") .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() .filter(List.of(new Criterion("protocol", "like", "test-protocol"))) @@ -653,7 +671,7 @@ void queryByDataRequest_valueNotExist() { var tp = TestFunctions.createTransferProcessBuilder("testprocess1") .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() .filter(List.of(new Criterion("dataRequest.id", "=", "notexist"))) @@ -670,7 +688,7 @@ void queryByResourceManifestProperty() { .resourceManifest(rm) .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() .filter(List.of(new Criterion("resourceManifest.definitions.id", "=", "rd-id"))) @@ -688,7 +706,7 @@ void queryByResourceManifest_valueNotExist() { .resourceManifest(rm) .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); // throws exception when an explicit mapping exists var query = QuerySpec.Builder.newInstance() @@ -711,7 +729,7 @@ void queryByProvisionedResourceSetProperty() { .provisionedResourceSet(prs) .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); var query = QuerySpec.Builder.newInstance() .filter(List.of(new Criterion("provisionedResourceSet.resources.transferProcessId", "=", "testprocess1"))) @@ -735,7 +753,7 @@ void queryByProvisionedResourceSet_valueNotExist() { .provisionedResourceSet(prs) .build(); getTransferProcessStore().save(tp); - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess2")); + getTransferProcessStore().save(createTransferProcess("testprocess2")); // returns empty when the invalid value is embedded in JSON @@ -909,7 +927,7 @@ void queryByDeprovisionedResources_valueNotExist() { @Test void queryByLease() { - getTransferProcessStore().save(TestFunctions.createTransferProcess("testprocess1")); + getTransferProcessStore().save(createTransferProcess("testprocess1")); var query = QuerySpec.Builder.newInstance() .filter(List.of(new Criterion("lease.leasedBy", "=", "foobar"))) @@ -920,7 +938,7 @@ void queryByLease() { @Test void shouldThrowException_whenSortingByNotExistentField() { - range(0, 10).forEach(i -> getTransferProcessStore().save(TestFunctions.createTransferProcess("test-neg-" + i))); + range(0, 10).forEach(i -> getTransferProcessStore().save(createTransferProcess("test-neg-" + i))); var query = QuerySpec.Builder.newInstance().sortField("notexist").sortOrder(SortOrder.DESC).build(); @@ -934,7 +952,7 @@ class FindByIdAndLease { @Test void shouldReturnTheEntityAndLeaseIt() { var id = UUID.randomUUID().toString(); - getTransferProcessStore().save(TestFunctions.createTransferProcess(id)); + getTransferProcessStore().save(createTransferProcess(id)); var result = getTransferProcessStore().findByIdAndLease(id); @@ -952,7 +970,7 @@ void shouldReturnNotFound_whenEntityDoesNotExist() { @Test void shouldReturnAlreadyLeased_whenEntityIsAlreadyLeased() { var id = UUID.randomUUID().toString(); - getTransferProcessStore().save(TestFunctions.createTransferProcess(id)); + getTransferProcessStore().save(createTransferProcess(id)); leaseEntity(id, "other owner"); var result = getTransferProcessStore().findByIdAndLease(id); diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java index dbbe2800730..5d8a4894de8 100644 --- a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java @@ -28,6 +28,7 @@ import java.net.URI; import java.time.Duration; +import java.util.Comparator; import java.util.UUID; import static java.util.stream.IntStream.range; @@ -178,6 +179,23 @@ void shouldReturnReleasedEntityByUpdate() { assertThat(thirdLeased).hasSize(1); } + @Test + void shouldLeaseOrderByStateTimestamp() { + + var all = range(0, 10) + .mapToObj(i -> createDataFlow("id-" + i, RECEIVED)) + .peek(getStore()::save) + .toList(); + + all.stream().limit(5) + .peek(this::delayByTenMillis) + .sorted(Comparator.comparing(DataFlow::getStateTimestamp).reversed()) + .forEach(f -> getStore().save(f)); + + var elements = getStore().nextNotLeased(10, hasState(RECEIVED.code())); + assertThat(elements).hasSize(10).extracting(DataFlow::getStateTimestamp).isSorted(); + } + private void delayByTenMillis(StatefulEntity t) { try { Thread.sleep(10); diff --git a/spi/policy-monitor/policy-monitor-spi/src/testFixtures/java/org/eclipse/edc/connector/policy/monitor/spi/testfixtures/store/PolicyMonitorStoreTestBase.java b/spi/policy-monitor/policy-monitor-spi/src/testFixtures/java/org/eclipse/edc/connector/policy/monitor/spi/testfixtures/store/PolicyMonitorStoreTestBase.java index 2ef33d42ac9..b09ac66c21b 100644 --- a/spi/policy-monitor/policy-monitor-spi/src/testFixtures/java/org/eclipse/edc/connector/policy/monitor/spi/testfixtures/store/PolicyMonitorStoreTestBase.java +++ b/spi/policy-monitor/policy-monitor-spi/src/testFixtures/java/org/eclipse/edc/connector/policy/monitor/spi/testfixtures/store/PolicyMonitorStoreTestBase.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.Comparator; import java.util.UUID; import static java.util.stream.IntStream.range; @@ -172,6 +173,23 @@ void shouldReturnReleasedEntityByUpdate() { assertThat(thirdLeased).hasSize(1); } + @Test + void shouldLeaseOrderByStateTimestamp() { + + var all = range(0, 10) + .mapToObj(i -> createPolicyMonitorEntry("id-" + i, STARTED)) + .peek(getStore()::save) + .toList(); + + all.stream().limit(5) + .peek(this::delayByTenMillis) + .sorted(Comparator.comparing(PolicyMonitorEntry::getStateTimestamp).reversed()) + .forEach(f -> getStore().save(f)); + + var elements = getStore().nextNotLeased(10, hasState(STARTED.code())); + assertThat(elements).hasSize(10).extracting(PolicyMonitorEntry::getStateTimestamp).isSorted(); + } + private void delayByTenMillis(StatefulEntity t) { try { Thread.sleep(10);