Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add data-plane-instance heartbeat #4230

Merged
merged 3 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/common/connector-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ plugins {
}

dependencies {
api(project(":spi:common:auth-spi"))
api(project(":spi:common:core-spi"))
api(project(":spi:common:http-spi"))
api(project(":spi:common:keys-spi"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dev.failsafe.RetryPolicy;
import okhttp3.EventListener;
import okhttp3.OkHttpClient;
import org.eclipse.edc.api.auth.spi.ControlClientAuthenticationProvider;
import org.eclipse.edc.connector.core.agent.NoOpParticipantIdMapper;
import org.eclipse.edc.connector.core.base.OkHttpClientConfiguration;
import org.eclipse.edc.connector.core.base.OkHttpClientFactory;
Expand All @@ -37,6 +38,7 @@
import org.eclipse.edc.transaction.spi.NoopTransactionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.util.Collections;
import java.util.concurrent.Executors;

/**
Expand Down Expand Up @@ -125,6 +127,11 @@ public EdcHttpClient edcHttpClient(ServiceExtensionContext context) {
);
}

@Provider(isDefault = true)
public ControlClientAuthenticationProvider controlClientAuthenticationProvider() {
return Collections::emptyMap;
}

@Provider
public OkHttpClient okHttpClient(ServiceExtensionContext context) {
var configuration = OkHttpClientConfiguration.Builder.newInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.controlplane.asset.spi.event.AssetDeleted;
import org.eclipse.edc.connector.controlplane.asset.spi.event.AssetEvent;
import org.eclipse.edc.connector.controlplane.services.spi.asset.AssetService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.spi.event.EventRouter;
Expand All @@ -44,13 +45,14 @@
@ExtendWith(EdcExtension.class)
public class AssetEventDispatchTest {

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.controlplane.contract.spi.event.contractdefinition.ContractDefinitionEvent;
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.services.spi.contractdefinition.ContractDefinitionService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.spi.event.EventRouter;
Expand Down Expand Up @@ -49,6 +50,7 @@ void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.connector.controlplane.services.spi.contractnegotiation.ContractNegotiationProtocolService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.policy.model.Policy;
Expand Down Expand Up @@ -69,7 +70,7 @@
class ContractNegotiationEventDispatchTest {
private static final String CONSUMER = "consumer";

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();
private final IdentityService identityService = mock();
private final ClaimToken token = ClaimToken.Builder.newInstance().claim(ParticipantAgentService.DEFAULT_IDENTITY_CLAIM_KEY, CONSUMER).build();

Expand All @@ -85,9 +86,10 @@ void setUp(EdcExtension extension) {
"edc.negotiation.provider.send.retry.limit", "0"
));
extension.registerServiceMock(NegotiationWaitStrategy.class, () -> 1);
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, identityService);
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.connector.controlplane.policy.spi.event.PolicyDefinitionEvent;
import org.eclipse.edc.connector.controlplane.policy.spi.event.PolicyDefinitionUpdated;
import org.eclipse.edc.connector.controlplane.services.spi.policydefinition.PolicyDefinitionService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.policy.model.Policy;
Expand Down Expand Up @@ -47,9 +48,10 @@ public class PolicyDefinitionEventDispatchTest {

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.edc.connector.controlplane.services.secret;

import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.secret.spi.event.SecretCreated;
import org.eclipse.edc.connector.secret.spi.event.SecretDeleted;
Expand Down Expand Up @@ -41,13 +42,14 @@
@ExtendWith(EdcExtension.class)
public class SecretEventDispatchTest {

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class));
extension.registerServiceMock(ProtocolWebhook.class, mock());
extension.registerServiceMock(DataPlaneInstanceStore.class, mock());
extension.registerServiceMock(IdentityService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
extension.setConfiguration(Map.of(
"web.http.port", String.valueOf(getFreePort()),
"web.http.path", "/api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferProcessAck;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.connector.core.event.EventExecutorServiceContainer;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.junit.extensions.EdcExtension;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
Expand Down Expand Up @@ -86,7 +87,7 @@
public class TransferProcessEventDispatchTest {

public static final Duration TIMEOUT = Duration.ofSeconds(30);
private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final EventSubscriber eventSubscriber = mock();
private final IdentityService identityService = mock();

@NotNull
Expand All @@ -112,9 +113,10 @@ void setUp(EdcExtension extension) {
extension.registerServiceMock(EventExecutorServiceContainer.class, new EventExecutorServiceContainer(newSingleThreadExecutor()));
extension.registerServiceMock(IdentityService.class, identityService);
extension.registerServiceMock(ProtocolWebhook.class, () -> "http://dummy");
extension.registerServiceMock(PolicyArchive.class, mock(PolicyArchive.class));
extension.registerServiceMock(ContractNegotiationStore.class, mock(ContractNegotiationStore.class));
extension.registerServiceMock(ParticipantAgentService.class, mock(ParticipantAgentService.class));
extension.registerServiceMock(PolicyArchive.class, mock());
extension.registerServiceMock(ContractNegotiationStore.class, mock());
extension.registerServiceMock(ParticipantAgentService.class, mock());
extension.registerServiceMock(DataPlaneClientFactory.class, mock());
var dataAddressValidatorRegistry = mock(DataAddressValidatorRegistry.class);
when(dataAddressValidatorRegistry.validateSource(any())).thenReturn(ValidationResult.success());
when(dataAddressValidatorRegistry.validateDestination(any())).thenReturn(ValidationResult.success());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public String name() {
}

@Provider
public DataFlowManager dataFlowManager() {
return new DataFlowManagerImpl();
public DataFlowManager dataFlowManager(ServiceExtensionContext context) {
return new DataFlowManagerImpl(context.getMonitor());
}

@Provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.StatusResult;
import org.jetbrains.annotations.NotNull;

Expand All @@ -41,6 +42,11 @@
public class DataFlowManagerImpl implements DataFlowManager {

private final List<PrioritizedDataFlowController> controllers = new ArrayList<>();
private final Monitor monitor;

public DataFlowManagerImpl(Monitor monitor) {
this.monitor = monitor;
}

@Override
public void register(DataFlowController controller) {
Expand All @@ -58,7 +64,9 @@ public void register(int priority, DataFlowController controller) {
try {
return chooseControllerAndApply(transferProcess, controller -> controller.start(transferProcess, policy));
} catch (Exception e) {
return StatusResult.failure(FATAL_ERROR, runtimeException(transferProcess.getId(), e.getLocalizedMessage()));
var message = runtimeException(transferProcess.getId(), e.getMessage());
monitor.severe(message, e);
return StatusResult.failure(FATAL_ERROR, message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

class DataFlowManagerImplTest {

private final DataFlowManagerImpl manager = new DataFlowManagerImpl();
private final DataFlowManagerImpl manager = new DataFlowManagerImpl(mock());

@Nested
class Initiate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ plugins {

dependencies {
api(project(":spi:data-plane-selector:data-plane-selector-spi"))
implementation(project(":spi:common:transaction-spi"))
api(project(":spi:common:transaction-spi"))

implementation(project(":core:common:lib:query-lib"))
implementation(project(":core:common:lib:state-machine-lib"))
implementation(project(":core:common:lib:store-lib"))
implementation(project(":core:common:lib:util-lib"))

testImplementation(project(":core:common:junit"))
testImplementation(testFixtures(project(":spi:data-plane-selector:data-plane-selector-spi")))
testImplementation(project(":core:common:junit"))

testImplementation(libs.awaitility)
}


Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import org.eclipse.edc.connector.dataplane.selector.store.InMemoryDataPlaneInstanceStore;
import org.eclipse.edc.connector.dataplane.selector.strategy.DefaultSelectionStrategyRegistry;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.system.ServiceExtension;

import java.time.Clock;

/**
* Provides default service implementations for fallback
* Omitted {@link Extension} since this module contains the extension {@link DataPlaneSelectorExtension}
Expand All @@ -31,14 +35,20 @@ public class DataPlaneSelectorDefaultServicesExtension implements ServiceExtensi

public static final String NAME = "Data Plane Selector Default Services";

@Inject
private Clock clock;

@Inject
private CriterionOperatorRegistry criterionOperatorRegistry;

@Override
public String name() {
return NAME;
}

@Provider(isDefault = true)
public DataPlaneInstanceStore instanceStore() {
return new InMemoryDataPlaneInstanceStore();
return new InMemoryDataPlaneInstanceStore(clock, criterionOperatorRegistry);
}

@Provider(isDefault = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,91 @@

package org.eclipse.edc.connector.dataplane.selector;

import org.eclipse.edc.connector.dataplane.selector.manager.DataPlaneSelectorManagerImpl;
import org.eclipse.edc.connector.dataplane.selector.service.EmbeddedDataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.manager.DataPlaneSelectorManager;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.dataplane.selector.spi.strategy.SelectionStrategyRegistry;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;

@Extension(value = "DataPlane core selector")
import java.time.Duration;

import static org.eclipse.edc.connector.dataplane.selector.DataPlaneSelectorExtension.NAME;
import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_BATCH_SIZE;
import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_ITERATION_WAIT;

@Extension(NAME)
public class DataPlaneSelectorExtension implements ServiceExtension {

public static final String NAME = "Data Plane Selector core";

private static final int DEFAULT_CHECK_PERIOD = 60;

@Setting(value = "the iteration wait time in milliseconds in the data plane selector state machine.", defaultValue = DEFAULT_ITERATION_WAIT + "", type = "long")
private static final String DATA_PLANE_SELECTOR_STATE_MACHINE_ITERATION_WAIT_MILLIS = "edc.data.plane.selector.state-machine.iteration-wait-millis";

@Setting(value = "the batch size in the data plane selector state machine.", defaultValue = DEFAULT_BATCH_SIZE + "", type = "int")
private static final String DATA_PLANE_SELECTOR_STATE_MACHINE_BATCH_SIZE = "edc.data.plane.selector.state-machine.batch-size";

@Setting(value = "the check period for data plane availability, in seconds", defaultValue = DEFAULT_CHECK_PERIOD + "", type = "int")
private static final String DATA_PLANE_SELECTOR_CHECK_PERIOD = "edc.data.plane.selector.state-machine.check.period";

@Inject
private DataPlaneInstanceStore instanceStore;

@Inject
private TransactionContext transactionContext;

@Inject
private SelectionStrategyRegistry selectionStrategyRegistry;
@Inject
private DataPlaneClientFactory clientFactory;

private DataPlaneSelectorManager manager;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
var config = context.getConfig();
var iterationWait = config.getLong(DATA_PLANE_SELECTOR_STATE_MACHINE_ITERATION_WAIT_MILLIS, DEFAULT_ITERATION_WAIT);
var checkPeriod = config.getInteger(DATA_PLANE_SELECTOR_CHECK_PERIOD, DEFAULT_CHECK_PERIOD);

var configuration = new DataPlaneSelectorManagerConfiguration(
new ExponentialWaitStrategy(iterationWait),
config.getInteger(DATA_PLANE_SELECTOR_STATE_MACHINE_BATCH_SIZE, DEFAULT_BATCH_SIZE),
Duration.ofSeconds(checkPeriod)
);

manager = DataPlaneSelectorManagerImpl.Builder.newInstance()
.clientFactory(clientFactory)
.store(instanceStore)
.monitor(context.getMonitor())
.configuration(configuration)
.build();
}

@Override
public void start() {
manager.start();
}

@Override
public void shutdown() {
if (manager != null) {
manager.stop();
}
}

@Provider
public DataPlaneSelectorService dataPlaneSelectorService() {
Expand Down
Loading
Loading