Skip to content

Commit

Permalink
Implement DataPlaneInstanceManager
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Jun 3, 2024
1 parent 675c071 commit ae4baf2
Show file tree
Hide file tree
Showing 29 changed files with 608 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.controlplane.asset.spi.event.AssetDeleted;
import org.eclipse.edc.connector.controlplane.asset.spi.event.AssetEvent;
import org.eclipse.edc.connector.controlplane.services.spi.asset.AssetService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.spi.event.EventRouter;
Expand All @@ -44,13 +45,14 @@
@ExtendWith(EdcExtension.class)
public class AssetEventDispatchTest {

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.controlplane.contract.spi.event.contractdefinition.ContractDefinitionEvent;
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.services.spi.contractdefinition.ContractDefinitionService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.spi.event.EventRouter;
Expand Down Expand Up @@ -49,6 +50,7 @@ void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.connector.controlplane.services.spi.contractnegotiation.ContractNegotiationProtocolService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.policy.model.Policy;
Expand Down Expand Up @@ -69,7 +70,7 @@
class ContractNegotiationEventDispatchTest {
private static final String CONSUMER = "consumer";

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();
private final IdentityService identityService = mock();
private final ClaimToken token = ClaimToken.Builder.newInstance().claim(ParticipantAgentService.DEFAULT_IDENTITY_CLAIM_KEY, CONSUMER).build();

Expand All @@ -85,9 +86,10 @@ void setUp(EdcExtension extension) {
"edc.negotiation.provider.send.retry.limit", "0"
));
extension.registerServiceMock(NegotiationWaitStrategy.class, () -> 1);
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, identityService);
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.connector.controlplane.policy.spi.event.PolicyDefinitionEvent;
import org.eclipse.edc.connector.controlplane.policy.spi.event.PolicyDefinitionUpdated;
import org.eclipse.edc.connector.controlplane.services.spi.policydefinition.PolicyDefinitionService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.policy.model.Policy;
Expand Down Expand Up @@ -47,9 +48,10 @@ public class PolicyDefinitionEventDispatchTest {

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.edc.connector.controlplane.services.secret;

import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.secret.spi.event.SecretCreated;
import org.eclipse.edc.connector.secret.spi.event.SecretDeleted;
Expand Down Expand Up @@ -41,13 +42,14 @@
@ExtendWith(EdcExtension.class)
public class SecretEventDispatchTest {

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferProcessAck;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.connector.core.event.EventExecutorServiceContainer;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
Expand Down Expand Up @@ -86,7 +87,7 @@
public class TransferProcessEventDispatchTest {

public static final Duration TIMEOUT = Duration.ofSeconds(30);
private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();
private final IdentityService identityService = mock();

@NotNull
Expand All @@ -112,9 +113,10 @@ void setUp(EdcExtension extension) {
extension.registerServiceMock(EventExecutorServiceContainer.class, new EventExecutorServiceContainer(newSingleThreadExecutor()));
extension.registerServiceMock(IdentityService.class, identityService);
extension.registerServiceMock(ProtocolWebhook.class, () -> "http://dummy");
extension.registerServiceMock(PolicyArchive.class, mock(PolicyArchive.class));
extension.registerServiceMock(ContractNegotiationStore.class, mock(ContractNegotiationStore.class));
extension.registerServiceMock(ParticipantAgentService.class, mock(ParticipantAgentService.class));
extension.registerServiceMock(PolicyArchive.class, mock());
extension.registerServiceMock(ContractNegotiationStore.class, mock());
extension.registerServiceMock(ParticipantAgentService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
var dataAddressValidatorRegistry = mock(DataAddressValidatorRegistry.class);
when(dataAddressValidatorRegistry.validateSource(any())).thenReturn(ValidationResult.success());
when(dataAddressValidatorRegistry.validateDestination(any())).thenReturn(ValidationResult.success());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ dependencies {

testImplementation(project(":core:common:junit"))
testImplementation(testFixtures(project(":spi:data-plane-selector:data-plane-selector-spi")))
testImplementation(project(":core:common:junit"))

testImplementation(libs.awaitility)
}


Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,77 @@

package org.eclipse.edc.connector.dataplane.selector;

import org.eclipse.edc.connector.dataplane.selector.manager.DataPlaneInstanceManagerImpl;
import org.eclipse.edc.connector.dataplane.selector.manager.DataPlaneSelectorManagerImpl;
import org.eclipse.edc.connector.dataplane.selector.service.EmbeddedDataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneInstanceManager;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneSelectorManager;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.dataplane.selector.spi.strategy.SelectionStrategyRegistry;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;

@Extension(value = "DataPlane core selector")
import java.time.Duration;

import static org.eclipse.edc.connector.dataplane.selector.DataPlaneSelectorExtension.NAME;
import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_BATCH_SIZE;
import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_ITERATION_WAIT;

@Extension(NAME)
public class DataPlaneSelectorExtension implements ServiceExtension {

public static final String NAME = "Data Plane Selector core";

private static final int DEFAULT_CHECK_PERIOD = 60;

@Setting(value = "the iteration wait time in milliseconds in the data plane selector state machine.", defaultValue = DEFAULT_ITERATION_WAIT + "", type = "long")
private static final String DATA_PLANE_SELECTOR_STATE_MACHINE_ITERATION_WAIT_MILLIS = "edc.data.plane.selector.state-machine.iteration-wait-millis";

@Setting(value = "the batch size in the data plane selector state machine.", defaultValue = DEFAULT_BATCH_SIZE + "", type = "int")
private static final String DATA_PLANE_SELECTOR_STATE_MACHINE_BATCH_SIZE = "edc.data.plane.selector.state-machine.batch-size";

@Setting(value = "the check period for data plane availability, in seconds", defaultValue = DEFAULT_CHECK_PERIOD + "", type = "int")
private static final String DATA_PLANE_SELECTOR_CHECK_PERIOD = "edc.data.plane.selector.state-machine.check.period";

@Inject
private DataPlaneInstanceStore instanceStore;

@Inject
private TransactionContext transactionContext;

@Inject
private SelectionStrategyRegistry selectionStrategyRegistry;
@Inject
private DataPlaneClientFactory clientFactory;

private DataPlaneInstanceManager manager;
private DataPlaneSelectorManager manager;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
manager = DataPlaneInstanceManagerImpl.Builder.newInstance()
var config = context.getConfig();
var iterationWait = config.getLong(DATA_PLANE_SELECTOR_STATE_MACHINE_ITERATION_WAIT_MILLIS, DEFAULT_ITERATION_WAIT);
var checkPeriod = config.getInteger(DATA_PLANE_SELECTOR_CHECK_PERIOD, DEFAULT_CHECK_PERIOD);

var configuration = new DataPlaneSelectorManagerConfiguration(
new ExponentialWaitStrategy(iterationWait),
config.getInteger(DATA_PLANE_SELECTOR_STATE_MACHINE_BATCH_SIZE, DEFAULT_BATCH_SIZE),
Duration.ofSeconds(checkPeriod)
);

manager = DataPlaneSelectorManagerImpl.Builder.newInstance()
.clientFactory(clientFactory)
.store(instanceStore)
.monitor(context.getMonitor())
.configuration(configuration)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.selector;

import org.eclipse.edc.spi.retry.WaitStrategy;

import java.time.Duration;

public record DataPlaneSelectorManagerConfiguration(
WaitStrategy waitStrategy,
int batchSize,
Duration checkPeriod
) {
}

This file was deleted.

Loading

0 comments on commit ae4baf2

Please sign in to comment.