Skip to content

Commit

Permalink
feat: add data-plane-instance heartbeat (#4230)
Browse files Browse the repository at this point in the history
* define e2e tests

* Implement inmemory and sql stateful stores

* Implement DataPlaneInstanceManager
  • Loading branch information
ndr-brt authored Jun 6, 2024
1 parent fc10475 commit 703502f
Show file tree
Hide file tree
Showing 61 changed files with 1,404 additions and 278 deletions.
1 change: 1 addition & 0 deletions core/common/connector-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ plugins {
}

dependencies {
api(project(":spi:common:auth-spi"))
api(project(":spi:common:core-spi"))
api(project(":spi:common:http-spi"))
api(project(":spi:common:keys-spi"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dev.failsafe.RetryPolicy;
import okhttp3.EventListener;
import okhttp3.OkHttpClient;
import org.eclipse.edc.api.auth.spi.ControlClientAuthenticationProvider;
import org.eclipse.edc.connector.core.agent.NoOpParticipantIdMapper;
import org.eclipse.edc.connector.core.base.OkHttpClientConfiguration;
import org.eclipse.edc.connector.core.base.OkHttpClientFactory;
Expand All @@ -37,6 +38,7 @@
import org.eclipse.edc.transaction.spi.NoopTransactionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.util.Collections;
import java.util.concurrent.Executors;

/**
Expand Down Expand Up @@ -125,6 +127,11 @@ public EdcHttpClient edcHttpClient(ServiceExtensionContext context) {
);
}

@Provider(isDefault = true)
public ControlClientAuthenticationProvider controlClientAuthenticationProvider() {
return Collections::emptyMap;
}

@Provider
public OkHttpClient okHttpClient(ServiceExtensionContext context) {
var configuration = OkHttpClientConfiguration.Builder.newInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.controlplane.asset.spi.event.AssetDeleted;
import org.eclipse.edc.connector.controlplane.asset.spi.event.AssetEvent;
import org.eclipse.edc.connector.controlplane.services.spi.asset.AssetService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.spi.event.EventRouter;
Expand All @@ -44,13 +45,14 @@
@ExtendWith(EdcExtension.class)
public class AssetEventDispatchTest {

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.controlplane.contract.spi.event.contractdefinition.ContractDefinitionEvent;
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.services.spi.contractdefinition.ContractDefinitionService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.spi.event.EventRouter;
Expand Down Expand Up @@ -49,6 +50,7 @@ void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.connector.controlplane.services.spi.contractnegotiation.ContractNegotiationProtocolService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.policy.model.Policy;
Expand Down Expand Up @@ -69,7 +70,7 @@
class ContractNegotiationEventDispatchTest {
private static final String CONSUMER = "consumer";

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();
private final IdentityService identityService = mock();
private final ClaimToken token = ClaimToken.Builder.newInstance().claim(ParticipantAgentService.DEFAULT_IDENTITY_CLAIM_KEY, CONSUMER).build();

Expand All @@ -85,9 +86,10 @@ void setUp(EdcExtension extension) {
"edc.negotiation.provider.send.retry.limit", "0"
));
extension.registerServiceMock(NegotiationWaitStrategy.class, () -> 1);
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, identityService);
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.connector.controlplane.policy.spi.event.PolicyDefinitionEvent;
import org.eclipse.edc.connector.controlplane.policy.spi.event.PolicyDefinitionUpdated;
import org.eclipse.edc.connector.controlplane.services.spi.policydefinition.PolicyDefinitionService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.policy.model.Policy;
Expand Down Expand Up @@ -47,9 +48,10 @@ public class PolicyDefinitionEventDispatchTest {

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.edc.connector.controlplane.services.secret;

import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.secret.spi.event.SecretCreated;
import org.eclipse.edc.connector.secret.spi.event.SecretDeleted;
Expand Down Expand Up @@ -41,13 +42,14 @@
@ExtendWith(EdcExtension.class)
public class SecretEventDispatchTest {

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferProcessAck;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.connector.core.event.EventExecutorServiceContainer;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
Expand Down Expand Up @@ -86,7 +87,7 @@
public class TransferProcessEventDispatchTest {

public static final Duration TIMEOUT = Duration.ofSeconds(30);
private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();
private final IdentityService identityService = mock();

@NotNull
Expand All @@ -112,9 +113,10 @@ void setUp(EdcExtension extension) {
extension.registerServiceMock(EventExecutorServiceContainer.class, new EventExecutorServiceContainer(newSingleThreadExecutor()));
extension.registerServiceMock(IdentityService.class, identityService);
extension.registerServiceMock(ProtocolWebhook.class, () -> "http://dummy");
extension.registerServiceMock(PolicyArchive.class, mock(PolicyArchive.class));
extension.registerServiceMock(ContractNegotiationStore.class, mock(ContractNegotiationStore.class));
extension.registerServiceMock(ParticipantAgentService.class, mock(ParticipantAgentService.class));
extension.registerServiceMock(PolicyArchive.class, mock());
extension.registerServiceMock(ContractNegotiationStore.class, mock());
extension.registerServiceMock(ParticipantAgentService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
var dataAddressValidatorRegistry = mock(DataAddressValidatorRegistry.class);
when(dataAddressValidatorRegistry.validateSource(any())).thenReturn(ValidationResult.success());
when(dataAddressValidatorRegistry.validateDestination(any())).thenReturn(ValidationResult.success());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public String name() {
}

@Provider
public DataFlowManager dataFlowManager() {
return new DataFlowManagerImpl();
public DataFlowManager dataFlowManager(ServiceExtensionContext context) {
return new DataFlowManagerImpl(context.getMonitor());
}

@Provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.StatusResult;
import org.jetbrains.annotations.NotNull;

Expand All @@ -41,6 +42,11 @@
public class DataFlowManagerImpl implements DataFlowManager {

private final List<PrioritizedDataFlowController> controllers = new ArrayList<>();
private final Monitor monitor;

public DataFlowManagerImpl(Monitor monitor) {
this.monitor = monitor;
}

@Override
public void register(DataFlowController controller) {
Expand All @@ -58,7 +64,9 @@ public void register(int priority, DataFlowController controller) {
try {
return chooseControllerAndApply(transferProcess, controller -> controller.start(transferProcess, policy));
} catch (Exception e) {
return StatusResult.failure(FATAL_ERROR, runtimeException(transferProcess.getId(), e.getLocalizedMessage()));
var message = runtimeException(transferProcess.getId(), e.getMessage());
monitor.severe(message, e);
return StatusResult.failure(FATAL_ERROR, message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

class DataFlowManagerImplTest {

private final DataFlowManagerImpl manager = new DataFlowManagerImpl();
private final DataFlowManagerImpl manager = new DataFlowManagerImpl(mock());

@Nested
class Initiate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ plugins {

dependencies {
api(project(":spi:data-plane-selector:data-plane-selector-spi"))
implementation(project(":spi:common:transaction-spi"))
api(project(":spi:common:transaction-spi"))

implementation(project(":core:common:lib:query-lib"))
implementation(project(":core:common:lib:state-machine-lib"))
implementation(project(":core:common:lib:store-lib"))
implementation(project(":core:common:lib:util-lib"))

testImplementation(project(":core:common:junit"))
testImplementation(testFixtures(project(":spi:data-plane-selector:data-plane-selector-spi")))
testImplementation(project(":core:common:junit"))

testImplementation(libs.awaitility)
}


Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import org.eclipse.edc.connector.dataplane.selector.store.InMemoryDataPlaneInstanceStore;
import org.eclipse.edc.connector.dataplane.selector.strategy.DefaultSelectionStrategyRegistry;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.system.ServiceExtension;

import java.time.Clock;

/**
* Provides default service implementations for fallback
* Omitted {@link Extension} since this module contains the extension {@link DataPlaneSelectorExtension}
Expand All @@ -31,14 +35,20 @@ public class DataPlaneSelectorDefaultServicesExtension implements ServiceExtensi

public static final String NAME = "Data Plane Selector Default Services";

@Inject
private Clock clock;

@Inject
private CriterionOperatorRegistry criterionOperatorRegistry;

@Override
public String name() {
return NAME;
}

@Provider(isDefault = true)
public DataPlaneInstanceStore instanceStore() {
return new InMemoryDataPlaneInstanceStore();
return new InMemoryDataPlaneInstanceStore(clock, criterionOperatorRegistry);
}

@Provider(isDefault = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,91 @@

package org.eclipse.edc.connector.dataplane.selector;

import org.eclipse.edc.connector.dataplane.selector.manager.DataPlaneSelectorManagerImpl;
import org.eclipse.edc.connector.dataplane.selector.service.EmbeddedDataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneSelectorManager;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.dataplane.selector.spi.strategy.SelectionStrategyRegistry;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;

@Extension(value = "DataPlane core selector")
import java.time.Duration;

import static org.eclipse.edc.connector.dataplane.selector.DataPlaneSelectorExtension.NAME;
import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_BATCH_SIZE;
import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_ITERATION_WAIT;

@Extension(NAME)
public class DataPlaneSelectorExtension implements ServiceExtension {

public static final String NAME = "Data Plane Selector core";

private static final int DEFAULT_CHECK_PERIOD = 60;

@Setting(value = "the iteration wait time in milliseconds in the data plane selector state machine.", defaultValue = DEFAULT_ITERATION_WAIT + "", type = "long")
private static final String DATA_PLANE_SELECTOR_STATE_MACHINE_ITERATION_WAIT_MILLIS = "edc.data.plane.selector.state-machine.iteration-wait-millis";

@Setting(value = "the batch size in the data plane selector state machine.", defaultValue = DEFAULT_BATCH_SIZE + "", type = "int")
private static final String DATA_PLANE_SELECTOR_STATE_MACHINE_BATCH_SIZE = "edc.data.plane.selector.state-machine.batch-size";

@Setting(value = "the check period for data plane availability, in seconds", defaultValue = DEFAULT_CHECK_PERIOD + "", type = "int")
private static final String DATA_PLANE_SELECTOR_CHECK_PERIOD = "edc.data.plane.selector.state-machine.check.period";

@Inject
private DataPlaneInstanceStore instanceStore;

@Inject
private TransactionContext transactionContext;

@Inject
private SelectionStrategyRegistry selectionStrategyRegistry;
@Inject
private DataPlaneClientFactory clientFactory;

private DataPlaneSelectorManager manager;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
var config = context.getConfig();
var iterationWait = config.getLong(DATA_PLANE_SELECTOR_STATE_MACHINE_ITERATION_WAIT_MILLIS, DEFAULT_ITERATION_WAIT);
var checkPeriod = config.getInteger(DATA_PLANE_SELECTOR_CHECK_PERIOD, DEFAULT_CHECK_PERIOD);

var configuration = new DataPlaneSelectorManagerConfiguration(
new ExponentialWaitStrategy(iterationWait),
config.getInteger(DATA_PLANE_SELECTOR_STATE_MACHINE_BATCH_SIZE, DEFAULT_BATCH_SIZE),
Duration.ofSeconds(checkPeriod)
);

manager = DataPlaneSelectorManagerImpl.Builder.newInstance()
.clientFactory(clientFactory)
.store(instanceStore)
.monitor(context.getMonitor())
.configuration(configuration)
.build();
}

@Override
public void start() {
manager.start();
}

@Override
public void shutdown() {
if (manager != null) {
manager.stop();
}
}

@Provider
public DataPlaneSelectorService dataPlaneSelectorService() {
Expand Down
Loading

0 comments on commit 703502f

Please sign in to comment.