From 7abbdad7eaa91d8409b763d3c0df38c2a7533039 Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Tue, 2 Jul 2024 16:53:11 +0200 Subject: [PATCH] fix: unbind public endpoint generator from source address type (#4315) * fix: unbind public endpoint generator from source address type * pass transfer type * refactor * remove flow type extractor * Trigger build --- .../ControlPlaneServicesExtension.java | 6 +- .../TransferProcessServiceImpl.java | 15 +++-- .../TransferProcessEventDispatchTest.java | 1 + .../TransferProcessServiceImplTest.java | 29 +++++++-- ...ansferProcessDefaultServicesExtension.java | 8 +-- ...rImpl.java => TransferTypeParserImpl.java} | 24 +++---- ...t.java => TransferTypeParserImplTest.java} | 26 +++++--- .../PublicEndpointGeneratorServiceImpl.java | 18 +++-- .../DataPlaneAuthorizationServiceImpl.java | 36 +++++----- .../manager/DataPlaneManagerImpl.java | 65 ++++++++----------- ...ublicEndpointGeneratorServiceImplTest.java | 39 +++++++---- ...DataPlaneAuthorizationServiceImplTest.java | 12 ++-- .../manager/DataPlaneManagerImplTest.java | 7 +- .../src/main/resources/version.json | 6 +- .../iam/oauth2/daps/DapsIntegrationTest.java | 1 - .../TransferDataPlaneSignalingExtension.java | 6 +- .../DataPlaneSignalingFlowController.java | 18 ++--- .../DataPlaneSignalingFlowControllerTest.java | 26 ++++---- .../client/DataPlaneSignalingClientTest.java | 3 +- ...ctFromDataFlowStartMessageTransformer.java | 4 +- ...jectToDataFlowStartMessageTransformer.java | 22 +++++-- ...omDataFlowStartMessageTransformerTest.java | 29 +++++++-- ...ToDataFlowStartMessageTransformerTest.java | 29 ++++++++- .../sql/data-plane-store-sql/docs/schema.sql | 3 +- .../store/sql/SqlDataPlaneStore.java | 12 +++- .../schema/BaseSqlDataPlaneStatements.java | 2 + .../store/sql/schema/DataPlaneStatements.java | 4 ++ .../domain/transfer/DataFlowStartMessage.java | 36 ++++++++-- .../types/domain/transfer/TransferType.java | 23 +++++++ ...Extractor.java => TransferTypeParser.java} | 18 ++--- .../edc/connector/dataplane/spi/DataFlow.java | 16 ++--- .../iam/PublicEndpointGeneratorService.java | 8 ++- .../store/DataPlaneStoreTestBase.java | 3 +- .../edc/test/e2e/AbstractDataPlaneTest.java | 18 ++--- .../DataPlaneSignalingApiEndToEndTest.java | 35 +++++----- .../e2e/participant/DataPlaneParticipant.java | 1 - .../test/e2e/TransferEndToEndParticipant.java | 7 +- 37 files changed, 382 insertions(+), 234 deletions(-) rename core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/{FlowTypeExtractorImpl.java => TransferTypeParserImpl.java} (52%) rename core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/flow/{FlowTypeExtractorImplTest.java => TransferTypeParserImplTest.java} (58%) create mode 100644 spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/TransferType.java rename spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/flow/{FlowTypeExtractor.java => TransferTypeParser.java} (53%) diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/ControlPlaneServicesExtension.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/ControlPlaneServicesExtension.java index 64abdcbd4c6..c08459dc31e 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/ControlPlaneServicesExtension.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/ControlPlaneServicesExtension.java @@ -59,7 +59,7 @@ import org.eclipse.edc.connector.controlplane.services.transferprocess.TransferProcessServiceImpl; import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.secret.spi.observe.SecretObservableImpl; @@ -171,7 +171,7 @@ public class ControlPlaneServicesExtension implements ServiceExtension { private DataFlowManager dataFlowManager; @Inject - private FlowTypeExtractor flowTypeExtractor; + private TransferTypeParser transferTypeParser; @Override public String name() { @@ -237,7 +237,7 @@ public PolicyDefinitionService policyDefinitionService() { @Provider public TransferProcessService transferProcessService() { return new TransferProcessServiceImpl(transferProcessStore, transferProcessManager, transactionContext, - dataAddressValidator, commandHandlerRegistry, flowTypeExtractor, contractNegotiationStore); + dataAddressValidator, commandHandlerRegistry, transferTypeParser, contractNegotiationStore); } @Provider diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessServiceImpl.java index 98da1504214..e346f9a33aa 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessServiceImpl.java @@ -19,7 +19,7 @@ import org.eclipse.edc.connector.controlplane.services.query.QueryValidator; import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService; import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.controlplane.transfer.spi.types.DeprovisionedResource; import org.eclipse.edc.connector.controlplane.transfer.spi.types.ProvisionResponse; @@ -61,19 +61,19 @@ public class TransferProcessServiceImpl implements TransferProcessService { private final QueryValidator queryValidator; private final DataAddressValidatorRegistry dataAddressValidator; private final CommandHandlerRegistry commandHandlerRegistry; - private final FlowTypeExtractor flowTypeExtractor; + private final TransferTypeParser transferTypeParser; private final ContractNegotiationStore contractNegotiationStore; public TransferProcessServiceImpl(TransferProcessStore transferProcessStore, TransferProcessManager manager, TransactionContext transactionContext, DataAddressValidatorRegistry dataAddressValidator, - CommandHandlerRegistry commandHandlerRegistry, FlowTypeExtractor flowTypeExtractor, + CommandHandlerRegistry commandHandlerRegistry, TransferTypeParser transferTypeParser, ContractNegotiationStore contractNegotiationStore) { this.transferProcessStore = transferProcessStore; this.manager = manager; this.transactionContext = transactionContext; this.dataAddressValidator = dataAddressValidator; this.commandHandlerRegistry = commandHandlerRegistry; - this.flowTypeExtractor = flowTypeExtractor; + this.transferTypeParser = transferTypeParser; this.contractNegotiationStore = contractNegotiationStore; queryValidator = new QueryValidator(TransferProcess.class, getSubtypes()); } @@ -127,12 +127,17 @@ public ServiceResult> search(QuerySpec query) { @Override public @NotNull ServiceResult initiateTransfer(TransferRequest request) { + var transferTypeParse = transferTypeParser.parse(request.getTransferType()); + if (transferTypeParse.failed()) { + return ServiceResult.badRequest("Property transferType not valid: " + transferTypeParse.getFailureDetail()); + } + var agreement = contractNegotiationStore.findContractAgreement(request.getContractId()); if (agreement == null) { return ServiceResult.badRequest("Contract agreement with id %s not found".formatted(request.getContractId())); } - var flowType = flowTypeExtractor.extract(request.getTransferType()).getContent(); + var flowType = transferTypeParse.getContent().flowType(); if (flowType == FlowType.PUSH) { if (request.getDataDestination() == null) { diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java index cb2ad5db752..b71625cd4dd 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java @@ -254,6 +254,7 @@ private TransferRequest createTransferRequest() { .protocol("test") .counterPartyAddress("http://an/address") .contractId("contractId") + .transferType("DestinationType-PUSH") .build(); } diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessServiceImplTest.java index eecba81be73..7936a99dcde 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessServiceImplTest.java @@ -19,7 +19,7 @@ import org.eclipse.edc.connector.controlplane.contract.spi.types.agreement.ContractAgreement; import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService; import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; @@ -34,9 +34,11 @@ import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceFailure; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.eclipse.edc.transaction.spi.NoopTransactionContext; import org.eclipse.edc.transaction.spi.TransactionContext; import org.eclipse.edc.validator.spi.DataAddressValidatorRegistry; @@ -79,11 +81,11 @@ class TransferProcessServiceImplTest { private final TransactionContext transactionContext = spy(new NoopTransactionContext()); private final DataAddressValidatorRegistry dataAddressValidator = mock(); private final CommandHandlerRegistry commandHandlerRegistry = mock(); - private final FlowTypeExtractor flowTypeExtractor = mock(); + private final TransferTypeParser transferTypeParser = mock(); private final ContractNegotiationStore contractNegotiationStore = mock(); private final TransferProcessService service = new TransferProcessServiceImpl(store, manager, transactionContext, - dataAddressValidator, commandHandlerRegistry, flowTypeExtractor, contractNegotiationStore); + dataAddressValidator, commandHandlerRegistry, transferTypeParser, contractNegotiationStore); @Test void findById_whenFound() { @@ -150,7 +152,7 @@ void shouldInitiateTransfer() { var transferProcess = transferProcess(); when(contractNegotiationStore.findContractAgreement(transferRequest.getContractId())) .thenReturn(createContractAgreement(transferProcess.getContractId(), "assetId")); - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("DestinationType", FlowType.PUSH))); when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.success()); when(manager.initiateConsumerRequest(transferRequest)).thenReturn(StatusResult.success(transferProcess)); @@ -160,9 +162,22 @@ void shouldInitiateTransfer() { verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class)); } + @Test + void shouldFail_whenTransferTypeIsNotValid() { + when(transferTypeParser.parse(any())).thenReturn(Result.failure("cannot parse")); + + var result = service.initiateTransfer(transferRequest()); + + assertThat(result).isFailed() + .extracting(ServiceFailure::getReason) + .isEqualTo(BAD_REQUEST); + assertThat(result.getFailureDetail()).contains("cannot parse"); + verifyNoInteractions(manager); + } + @Test void shouldFail_whenContractAgreementNotFound() { - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("DestinationType", FlowType.PUSH))); when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.failure(violation("invalid data address", "path"))); var result = service.initiateTransfer(transferRequest()); @@ -179,7 +194,7 @@ void shouldFail_whenDestinationIsNotValid() { var transferRequest = transferRequest(); when(contractNegotiationStore.findContractAgreement(transferRequest.getContractId())) .thenReturn(createContractAgreement(transferRequest.getContractId(), "assetId")); - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("DestinationType", FlowType.PUSH))); when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.failure(violation("invalid data address", "path"))); var result = service.initiateTransfer(transferRequest); @@ -198,7 +213,7 @@ void shouldFail_whenDataDestinationNotPassedAndFlowTypeIsPush() { .build(); when(contractNegotiationStore.findContractAgreement(transferRequest.getContractId())) .thenReturn(createContractAgreement(transferRequest.getContractId(), "assetId")); - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("DestinationType", FlowType.PUSH))); var result = service.initiateTransfer(transferRequest); diff --git a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/TransferProcessDefaultServicesExtension.java b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/TransferProcessDefaultServicesExtension.java index 0de4c26df49..78ace47c641 100644 --- a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/TransferProcessDefaultServicesExtension.java +++ b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/TransferProcessDefaultServicesExtension.java @@ -15,13 +15,13 @@ package org.eclipse.edc.connector.controlplane.transfer; import org.eclipse.edc.connector.controlplane.transfer.flow.DataFlowManagerImpl; -import org.eclipse.edc.connector.controlplane.transfer.flow.FlowTypeExtractorImpl; +import org.eclipse.edc.connector.controlplane.transfer.flow.TransferTypeParserImpl; import org.eclipse.edc.connector.controlplane.transfer.observe.TransferProcessObservableImpl; import org.eclipse.edc.connector.controlplane.transfer.provision.ProvisionManagerImpl; import org.eclipse.edc.connector.controlplane.transfer.provision.ResourceManifestGeneratorImpl; import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessPendingGuard; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.connector.controlplane.transfer.spi.provision.ProvisionManager; import org.eclipse.edc.connector.controlplane.transfer.spi.provision.ResourceManifestGenerator; @@ -71,8 +71,8 @@ public TransferProcessPendingGuard pendingGuard() { } @Provider - public FlowTypeExtractor flowTypeExtractor() { - return new FlowTypeExtractorImpl(); + public TransferTypeParser transferTypeParser() { + return new TransferTypeParserImpl(); } } diff --git a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/FlowTypeExtractorImpl.java b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImpl.java similarity index 52% rename from core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/FlowTypeExtractorImpl.java rename to core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImpl.java index d30e721ec4f..b1561c9f9ea 100644 --- a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/FlowTypeExtractorImpl.java +++ b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImpl.java @@ -14,28 +14,30 @@ package org.eclipse.edc.connector.controlplane.transfer.flow; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; -import org.eclipse.edc.spi.response.ResponseStatus; -import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; +import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import java.util.Optional; -public class FlowTypeExtractorImpl implements FlowTypeExtractor { +public class TransferTypeParserImpl implements TransferTypeParser { + @Override - public StatusResult extract(String transferType) { - return Optional.ofNullable(transferType) + public Result parse(String transferType) { + Optional> parsed = Optional.ofNullable(transferType) .map(type -> type.split("-")) .filter(tokens -> tokens.length == 2) - .map(tokens -> parseFlowType(tokens[1])) - .orElse(StatusResult.failure(ResponseStatus.FATAL_ERROR, "Failed to extract flow type from transferType %s".formatted(transferType))); + .map(tokens -> parseFlowType(tokens[1]).map(flowType -> new TransferType(tokens[0], flowType))); + + return parsed.orElse(Result.failure("Failed to extract flow type from transferType %s".formatted(transferType))); } - private StatusResult parseFlowType(String flowType) { + private Result parseFlowType(String flowType) { try { - return StatusResult.success(FlowType.valueOf(flowType)); + return Result.success(FlowType.valueOf(flowType)); } catch (Exception e) { - return StatusResult.failure(ResponseStatus.FATAL_ERROR, "Unknown flow type %s".formatted(flowType)); + return Result.failure("Unknown flow type %s".formatted(flowType)); } } } diff --git a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/flow/FlowTypeExtractorImplTest.java b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImplTest.java similarity index 58% rename from core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/flow/FlowTypeExtractorImplTest.java rename to core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImplTest.java index 0de12d9cb84..9727f941b32 100644 --- a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/flow/FlowTypeExtractorImplTest.java +++ b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/flow/TransferTypeParserImplTest.java @@ -14,42 +14,50 @@ package org.eclipse.edc.connector.controlplane.transfer.flow; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL; import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH; -class FlowTypeExtractorImplTest { +class TransferTypeParserImplTest { - private final FlowTypeExtractor extractor = new FlowTypeExtractorImpl(); + private final TransferTypeParser parser = new TransferTypeParserImpl(); @Test void shouldExtractPull() { - var result = extractor.extract("Any-PULL"); + var result = parser.parse("DestinationType-PULL"); - assertThat(result).isSucceeded().isEqualTo(PULL); + assertThat(result).isSucceeded().satisfies(type -> { + assertThat(type.destinationType()).isEqualTo("DestinationType"); + assertThat(type.flowType()).isEqualTo(PULL); + }); } @Test void shouldExtractPush() { - var result = extractor.extract("Any-PUSH"); + var result = parser.parse("DestinationType-PUSH"); - assertThat(result).isSucceeded().isEqualTo(PUSH); + assertThat(result).isSucceeded().satisfies(type -> { + assertThat(type.destinationType()).isEqualTo("DestinationType"); + assertThat(type.flowType()).isEqualTo(PUSH); + }); } @Test void shouldReturnFatalError_whenTypeIsUnknown() { - var result = extractor.extract("Any-NOT_KNOWN"); + var result = parser.parse("Any-NOT_KNOWN"); assertThat(result).isFailed(); } @Test void shouldReturnFatalError_whenFormatIsNotCorrect() { - var result = extractor.extract("not_correct"); + var result = parser.parse("not_correct"); assertThat(result).isFailed(); } + } diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImpl.java index 8bfe756ccd8..bbd90f28c30 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImpl.java @@ -20,8 +20,6 @@ import org.eclipse.edc.spi.types.domain.DataAddress; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -30,14 +28,14 @@ class PublicEndpointGeneratorServiceImpl implements PublicEndpointGeneratorServi private final Map> generatorFunctions = new ConcurrentHashMap<>(); @Override - public Result generateFor(DataAddress sourceDataAddress) { - Objects.requireNonNull(sourceDataAddress); - Objects.requireNonNull(sourceDataAddress.getType()); - - return Optional.ofNullable(generatorFunctions.get(sourceDataAddress.getType())) - .map(function -> function.apply(sourceDataAddress)) - .map(Result::success) - .orElseGet(() -> Result.failure("No Endpoint generator function registered for source data type '%s'".formatted(sourceDataAddress.getType()))); + public Result generateFor(String destinationType, DataAddress sourceDataAddress) { + var function = generatorFunctions.get(destinationType); + if (function == null) { + return Result.failure("No Endpoint generator function registered for transfer type destination '%s'".formatted(destinationType)); + } + + var endpoint = function.apply(sourceDataAddress); + return Result.success(endpoint); } @Override diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java index ea3b246413e..87870f7d9e6 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java @@ -27,10 +27,10 @@ import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import java.time.Clock; +import java.util.HashMap; import java.util.Map; import java.util.UUID; -import static java.util.stream.Collectors.toMap; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; import static org.eclipse.edc.spi.result.Result.success; @@ -60,27 +60,27 @@ public DataPlaneAuthorizationServiceImpl(DataPlaneAccessTokenService accessToken @Override public Result createEndpointDataReference(DataFlowStartMessage message) { - var endpoint = endpointGenerator.generateFor(message.getSourceDataAddress()); - - var additionalProperties = message.getProperties().entrySet().stream().collect(toMap(Map.Entry::getKey, entry -> (Object) entry.getValue())); - - additionalProperties.put(PROPERTY_AGREEMENT_ID, message.getAgreementId()); - additionalProperties.put(PROPERTY_ASSET_ID, message.getAssetId()); - additionalProperties.put(PROPERTY_PROCESS_ID, message.getProcessId()); - additionalProperties.put(PROPERTY_FLOW_TYPE, message.getFlowType().toString()); - additionalProperties.put(PROPERTY_PARTICIPANT_ID, message.getParticipantId()); - - return endpoint.compose(e -> accessTokenService.obtainToken(createTokenParams(message), message.getSourceDataAddress(), additionalProperties)) - .compose(tokenRepresentation -> createDataAddress(tokenRepresentation, endpoint.getContent())); + return endpointGenerator.generateFor(message.getTransferType().destinationType(), message.getSourceDataAddress()) + .compose(endpoint -> { + var additionalProperties = new HashMap(message.getProperties()); + additionalProperties.put(PROPERTY_AGREEMENT_ID, message.getAgreementId()); + additionalProperties.put(PROPERTY_ASSET_ID, message.getAssetId()); + additionalProperties.put(PROPERTY_PROCESS_ID, message.getProcessId()); + additionalProperties.put(PROPERTY_FLOW_TYPE, message.getFlowType().toString()); + additionalProperties.put(PROPERTY_PARTICIPANT_ID, message.getParticipantId()); + + return accessTokenService.obtainToken(createTokenParams(message), message.getSourceDataAddress(), additionalProperties) + .compose(tokenRepresentation -> createDataAddress(tokenRepresentation, endpoint)); + } + ); } @Override public Result authorize(String token, Map requestData) { - var accessTokenDataResult = accessTokenService.resolve(token); - - return accessTokenDataResult - .compose(atd -> accessControlService.checkAccess(atd.claimToken(), atd.dataAddress(), requestData, atd.additionalProperties())) - .map(u -> accessTokenDataResult.getContent().dataAddress()); + return accessTokenService.resolve(token) + .compose(atd -> accessControlService.checkAccess(atd.claimToken(), atd.dataAddress(), requestData, atd.additionalProperties()) + .map(u -> atd.dataAddress()) + ); } @Override diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index df74944bba0..02449341fbf 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -26,7 +26,6 @@ import org.eclipse.edc.spi.query.Criterion; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; -import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; @@ -78,24 +77,21 @@ public Result validate(DataFlowStartMessage dataRequest) { @Override public Result start(DataFlowStartMessage startMessage) { - var dataFlowBuilder = dataFlowRequestBuilder(startMessage); + var dataFlowBuilder = DataFlow.Builder.newInstance() + .id(startMessage.getProcessId()) + .source(startMessage.getSourceDataAddress()) + .destination(startMessage.getDestinationDataAddress()) + .callbackAddress(startMessage.getCallbackAddress()) + .traceContext(telemetry.getCurrentTraceContext()) + .properties(startMessage.getProperties()) + .transferType(startMessage.getTransferType()); - var result = switch (startMessage.getFlowType()) { - case PULL -> handleStartPull(startMessage, dataFlowBuilder); - case PUSH -> handleStartPush(dataFlowBuilder); + var response = switch (startMessage.getFlowType()) { + case PULL -> handlePull(startMessage, dataFlowBuilder); + case PUSH -> handlePush(dataFlowBuilder); }; - if (result.failed()) { - return result.mapTo(); - } - - var response = DataFlowResponseMessage.Builder.newInstance() - .dataAddress(result.getContent().orElse(null)) - .build(); - - update(dataFlowBuilder.build()); - - return Result.success(response); + return response.onSuccess(m -> update(dataFlowBuilder.build())); } @Override @@ -144,7 +140,7 @@ private StatusResult stop(String dataFlowId, String reason) { var dataFlow = result.getContent(); - if (FlowType.PUSH.equals(dataFlow.getFlowType())) { + if (FlowType.PUSH.equals(dataFlow.getTransferType().flowType())) { var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest()); if (transferService == null) { @@ -169,31 +165,21 @@ private StatusResult stop(String dataFlowId, String reason) { return StatusResult.success(dataFlow); } - private Result> handleStartPush(DataFlow.Builder dataFlowBuilder) { - dataFlowBuilder.state(RECEIVED.code()); - return Result.success(Optional.empty()); + private Result handlePull(DataFlowStartMessage startMessage, DataFlow.Builder dataFlowBuilder) { + return authorizationService.createEndpointDataReference(startMessage) + .onSuccess(dataAddress -> dataFlowBuilder.state(STARTED.code())) + .onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail()))) + .map(dataAddress -> DataFlowResponseMessage.Builder.newInstance() + .dataAddress(dataAddress) + .build()); } - private Result> handleStartPull(DataFlowStartMessage startMessage, DataFlow.Builder dataFlowBuilder) { - var dataAddressResult = authorizationService.createEndpointDataReference(startMessage) - .onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail()))); - - if (dataAddressResult.failed()) { - return dataAddressResult.mapTo(); - } - dataFlowBuilder.state(STARTED.code()); - return Result.success(Optional.of(dataAddressResult.getContent())); - } + private Result handlePush(DataFlow.Builder dataFlowBuilder) { + dataFlowBuilder.state(RECEIVED.code()); - private DataFlow.Builder dataFlowRequestBuilder(DataFlowStartMessage startMessage) { - return DataFlow.Builder.newInstance() - .id(startMessage.getProcessId()) - .source(startMessage.getSourceDataAddress()) - .destination(startMessage.getDestinationDataAddress()) - .callbackAddress(startMessage.getCallbackAddress()) - .traceContext(telemetry.getCurrentTraceContext()) - .properties(startMessage.getProperties()) - .flowType(startMessage.getFlowType()); + return Result.success(DataFlowResponseMessage.Builder.newInstance() + .dataAddress(null) + .build()); } private boolean processReceived(DataFlow dataFlow) { @@ -281,6 +267,7 @@ public Builder self() { return this; } + @Override public DataPlaneManagerImpl build() { Objects.requireNonNull(manager.transferProcessClient); return manager; diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImplTest.java index 28894c9da9e..d7b722aebec 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/PublicEndpointGeneratorServiceImplTest.java @@ -15,7 +15,9 @@ package org.eclipse.edc.connector.dataplane.framework; import org.eclipse.edc.connector.dataplane.spi.Endpoint; +import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -24,23 +26,32 @@ class PublicEndpointGeneratorServiceImplTest { - private final PublicEndpointGeneratorServiceImpl generatorService = new PublicEndpointGeneratorServiceImpl(); + private final PublicEndpointGeneratorService generatorService = new PublicEndpointGeneratorServiceImpl(); - @Test - void generateFor() { - var endpoint = new Endpoint("fizz", "bar-type"); - generatorService.addGeneratorFunction("testtype", dataAddress -> endpoint); + @Nested + class GenerateFor { - assertThat(generatorService.generateFor(DataAddress.Builder.newInstance().type("testtype").build())).isSucceeded() - .isEqualTo(endpoint); - } + @Test + void shouldGenerateEndpointBasedOnDestinationType() { + var endpoint = new Endpoint("fizz", "bar-type"); + + generatorService.addGeneratorFunction("destinationType", dataAddress -> endpoint); + var sourceAddress = DataAddress.Builder.newInstance().type("testtype").build(); + + var result = generatorService.generateFor("destinationType", sourceAddress); + + assertThat(result).isSucceeded().isEqualTo(endpoint); + } + + @Test + void shouldFail_whenFunctionIsNotRegistered() { + var sourceAddress = DataAddress.Builder.newInstance().type("testtype").build(); + + var result = generatorService.generateFor("any", sourceAddress); + + assertThat(result).isFailed(); + } - @Test - void generateFor_noFunction() { - assertThat(generatorService.generateFor(DataAddress.Builder.newInstance().type("testtype").build())) - .isFailed() - .detail() - .isEqualTo("No Endpoint generator function registered for source data type 'testtype'"); } @Test diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java index 64403ebf60b..09b57ace811 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java @@ -25,6 +25,7 @@ import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; @@ -57,16 +58,17 @@ class DataPlaneAuthorizationServiceImplTest { private final DataPlaneAccessControlService accessControlService = mock(); private final DataPlaneAuthorizationServiceImpl authorizationService = new DataPlaneAuthorizationServiceImpl(accessTokenService, endpointGenerator, accessControlService, OWN_PARTICIPANT_ID, Clock.systemUTC()); - @BeforeEach void setup() { - when(endpointGenerator.generateFor(any())).thenReturn(Result.success(Endpoint.url("http://example.com"))); + when(endpointGenerator.generateFor(any(), any())).thenReturn(Result.success(Endpoint.url("http://example.com"))); } @Test void createEndpointDataReference() { when(accessTokenService.obtainToken(any(), any(), anyMap())).thenReturn(Result.success(TokenRepresentation.Builder.newInstance().token("footoken").build())); - var startMsg = createStartMessage().build(); + var startMsg = createStartMessage() + .transferType(new TransferType("DestinationType", FlowType.PULL)) + .build(); var result = authorizationService.createEndpointDataReference(startMsg); assertThat(result).isSucceeded() @@ -90,9 +92,9 @@ void createEndpointDataReference() { m.containsKey("asset_id") && m.containsKey("process_id") && m.containsKey("flow_type"))); + verify(endpointGenerator).generateFor("DestinationType", startMsg.getSourceDataAddress()); } - @Test void createEndpointDataReference_withAuthType() { when(accessTokenService.obtainToken(any(), any(), anyMap())).thenReturn(Result.success(TokenRepresentation.Builder.newInstance() @@ -185,7 +187,7 @@ void revoke_error() { private DataFlowStartMessage.Builder createStartMessage() { return DataFlowStartMessage.Builder.newInstance() .processId("test-processid") - .flowType(FlowType.PULL) + .transferType(new TransferType("DestinationType", FlowType.PULL)) .agreementId("test-agreementid") .participantId("test-participantid") .assetId("test-assetid") diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index 389f8c38edf..a55f1ff321e 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -30,6 +30,7 @@ import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -190,7 +191,7 @@ void terminate_shouldTerminateDataFlow() { @Test void terminate_shouldTerminatePullDataFlow() { - var dataFlow = dataFlowBuilder().state(RECEIVED.code()).id("dataFlowId").flowType(FlowType.PULL).build(); + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).id("dataFlowId").transferType(new TransferType("DestinationType", FlowType.PULL)).build(); when(store.findByIdAndLease(dataFlow.getId())).thenReturn(StoreResult.success(dataFlow)); when(authorizationService.revokeEndpointDataReference(dataFlow.getId(), null)).thenReturn(Result.success()); @@ -203,7 +204,7 @@ void terminate_shouldTerminatePullDataFlow() { @Test void terminate_shouldFailToTerminatePullDataFlow_whenRevocationFails() { - var dataFlow = dataFlowBuilder().state(RECEIVED.code()).id("dataFlowId").flowType(FlowType.PULL).build(); + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).id("dataFlowId").transferType(new TransferType("DestinationType", FlowType.PULL)).build(); when(store.findByIdAndLease(dataFlow.getId())).thenReturn(StoreResult.success(dataFlow)); when(authorizationService.revokeEndpointDataReference(dataFlow.getId(), null)).thenReturn(Result.failure("failure")); @@ -476,7 +477,7 @@ private DataFlow.Builder dataFlowBuilder() { .source(DataAddress.Builder.newInstance().type("source").build()) .destination(DataAddress.Builder.newInstance().type("destination").build()) .callbackAddress(URI.create("http://any")) - .flowType(FlowType.PUSH) + .transferType(new TransferType("DestinationType", FlowType.PUSH)) .properties(Map.of("key", "value")); } diff --git a/extensions/common/api/control-api-configuration/src/main/resources/version.json b/extensions/common/api/control-api-configuration/src/main/resources/version.json index 8de76426c2e..be91548a308 100644 --- a/extensions/common/api/control-api-configuration/src/main/resources/version.json +++ b/extensions/common/api/control-api-configuration/src/main/resources/version.json @@ -1,5 +1,5 @@ { - "version": "1.0.0", + "version": "1.0.1", "urlPath": "/v1", - "lastUpdated": "2024-05-21T09:12:44Z" -} \ No newline at end of file + "lastUpdated": "2024-06-02T14:30:00Z" +} diff --git a/extensions/common/iam/oauth2/oauth2-daps/src/test/java/org/eclipse/edc/iam/oauth2/daps/DapsIntegrationTest.java b/extensions/common/iam/oauth2/oauth2-daps/src/test/java/org/eclipse/edc/iam/oauth2/daps/DapsIntegrationTest.java index 67a8fba5215..12f4a29c6ee 100644 --- a/extensions/common/iam/oauth2/oauth2-daps/src/test/java/org/eclipse/edc/iam/oauth2/daps/DapsIntegrationTest.java +++ b/extensions/common/iam/oauth2/oauth2-daps/src/test/java/org/eclipse/edc/iam/oauth2/daps/DapsIntegrationTest.java @@ -74,7 +74,6 @@ void retrieveTokenAndValidate(IdentityService identityService) { @BeforeEach protected void before(EdcExtension extension) { - System.setProperty("edc.vault", "src/test/resources/empty-vault.properties"); System.setProperty("edc.keystore", "src/test/resources/keystore.p12"); System.setProperty("edc.keystore.password", CLIENT_KEYSTORE_PASSWORD); diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtension.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtension.java index 438fff94255..d9384a86eaf 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtension.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/TransferDataPlaneSignalingExtension.java @@ -17,7 +17,7 @@ import org.eclipse.edc.connector.controlplane.transfer.dataplane.flow.DataPlaneSignalingFlowController; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; import org.eclipse.edc.runtime.metamodel.annotation.Extension; @@ -58,13 +58,13 @@ public class TransferDataPlaneSignalingExtension implements ServiceExtension { private DataFlowPropertiesProvider propertiesProvider; @Inject - private FlowTypeExtractor flowTypeExtractor; + private TransferTypeParser transferTypeParser; @Override public void initialize(ServiceExtensionContext context) { var selectionStrategy = context.getSetting(DPF_SELECTOR_STRATEGY, DEFAULT_DATAPLANE_SELECTOR_STRATEGY); var controller = new DataPlaneSignalingFlowController(callbackUrl, selectorService, getPropertiesProvider(), - clientFactory, selectionStrategy, flowTypeExtractor); + clientFactory, selectionStrategy, transferTypeParser); dataFlowManager.register(controller); } diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowController.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowController.java index babd06f6977..8ff89a979d2 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowController.java @@ -17,7 +17,7 @@ import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; @@ -54,29 +54,29 @@ public class DataPlaneSignalingFlowController implements DataFlowController { private final DataPlaneClientFactory clientFactory; private final DataFlowPropertiesProvider propertiesProvider; private final String selectionStrategy; - private final FlowTypeExtractor flowTypeExtractor; + private final TransferTypeParser transferTypeParser; public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, DataFlowPropertiesProvider propertiesProvider, DataPlaneClientFactory clientFactory, - String selectionStrategy, FlowTypeExtractor flowTypeExtractor) { + String selectionStrategy, TransferTypeParser transferTypeParser) { this.callbackUrl = callbackUrl; this.selectorClient = selectorClient; this.propertiesProvider = propertiesProvider; this.clientFactory = clientFactory; this.selectionStrategy = selectionStrategy; - this.flowTypeExtractor = flowTypeExtractor; + this.transferTypeParser = transferTypeParser; } @Override public boolean canHandle(TransferProcess transferProcess) { - return flowTypeExtractor.extract(transferProcess.getTransferType()).succeeded(); + return transferTypeParser.parse(transferProcess.getTransferType()).succeeded(); } @Override public @NotNull StatusResult start(TransferProcess transferProcess, Policy policy) { - var flowType = flowTypeExtractor.extract(transferProcess.getTransferType()); - if (flowType.failed()) { - return StatusResult.failure(FATAL_ERROR, flowType.getFailureDetail()); + var transferTypeParse = transferTypeParser.parse(transferProcess.getTransferType()); + if (transferTypeParse.failed()) { + return StatusResult.failure(FATAL_ERROR, transferTypeParse.getFailureDetail()); } var propertiesResult = propertiesProvider.propertiesFor(transferProcess, policy); @@ -97,7 +97,7 @@ public boolean canHandle(TransferProcess transferProcess) { .participantId(policy.getAssignee()) .agreementId(transferProcess.getContractId()) .assetId(transferProcess.getAssetId()) - .flowType(flowType.getContent()) + .transferType(transferTypeParse.getContent()) .callbackAddress(callbackUrl != null ? callbackUrl.get() : null) .properties(propertiesResult.getContent()) .build(); diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java index d61d840e56a..f135fa1a05f 100644 --- a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/controlplane/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java @@ -16,7 +16,7 @@ import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowPropertiesProvider; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; @@ -26,11 +26,13 @@ import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceResult; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -57,17 +59,17 @@ public class DataPlaneSignalingFlowControllerTest { private final DataPlaneClientFactory dataPlaneClientFactory = mock(); private final DataPlaneSelectorService selectorService = mock(); private final DataFlowPropertiesProvider propertiesProvider = mock(); - private final FlowTypeExtractor flowTypeExtractor = mock(); + private final TransferTypeParser transferTypeParser = mock(); private final DataPlaneSignalingFlowController flowController = new DataPlaneSignalingFlowController( () -> URI.create("http://localhost"), selectorService, propertiesProvider, dataPlaneClientFactory, - "random", flowTypeExtractor); + "random", transferTypeParser); @Nested class CanHandle { @Test void shouldReturnTrue_whenFlowTypeIsValid() { - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PUSH))); var transferProcess = transferProcess("Custom", "Valid-PUSH"); var result = flowController.canHandle(transferProcess); @@ -77,7 +79,7 @@ void shouldReturnTrue_whenFlowTypeIsValid() { @Test void shouldReturnFalse_whenFlowTypeIsNotValid() { - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR)); + when(transferTypeParser.parse(any())).thenReturn(Result.failure("cannot parse")); var transferProcess = transferProcess("Custom", "Invalid-ANY"); var result = flowController.canHandle(transferProcess); @@ -90,7 +92,7 @@ void shouldReturnFalse_whenFlowTypeIsNotValid() { class InitiateFlow { @Test void transferSuccess() { - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); var source = testDataAddress(); var policy = Policy.Builder.newInstance().assignee("participantId").build(); var transferProcess = transferProcessBuilder() @@ -124,7 +126,7 @@ void transferSuccess() { @Test void transferSuccess_withReturnedDataAddress() { - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); var policy = Policy.Builder.newInstance().assignee("participantId").build(); var transferProcess = transferProcessBuilder() .transferType(HTTP_DATA_PULL) @@ -150,7 +152,7 @@ void transferSuccess_withReturnedDataAddress() { @Test void shouldFail_whenNoDataplaneSelected() { - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) .transferType(HTTP_DATA_PULL) @@ -166,7 +168,7 @@ void shouldFail_whenNoDataplaneSelected() { @Test void invalidTransferType() { - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, "error")); + when(transferTypeParser.parse(any())).thenReturn(Result.failure("cannot parse")); var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) .transferType("invalid") @@ -174,12 +176,12 @@ void invalidTransferType() { var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); - assertThat(result).isFailed().messages().containsOnly("error"); + assertThat(result).isFailed().detail().contains("cannot parse"); } @Test void returnFailedResult_whenPropertiesResolveFails() { - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); var errorMsg = "error"; var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) @@ -195,7 +197,7 @@ void returnFailedResult_whenPropertiesResolveFails() { @Test void returnFailedResultIfTransferFails() { - when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PULL)); + when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("Valid", FlowType.PULL))); var errorMsg = "error"; var transferProcess = transferProcessBuilder() .contentDataAddress(testDataAddress()) diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java index 94b79e06bf0..70136e57f41 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java @@ -38,6 +38,7 @@ import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.eclipse.edc.transform.TypeTransformerRegistryImpl; import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import org.eclipse.edc.transform.transformer.dspace.from.JsonObjectFromDataAddressDspaceTransformer; @@ -263,7 +264,7 @@ private DataFlowStartMessage createDataFlowRequest() { return DataFlowStartMessage.Builder.newInstance() .id("123") .processId("456") - .flowType(FlowType.PULL) + .transferType(new TransferType("DestinationType", FlowType.PULL)) .assetId("assetId") .agreementId("agreementId") .participantId("participantId") diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformer.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformer.java index f8e26cc22ee..464b4b50518 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformer.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformer.java @@ -33,6 +33,7 @@ import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID; import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PROPERTIES; import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE_DESTINATION; import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TYPE; /** @@ -54,7 +55,8 @@ public JsonObjectFromDataFlowStartMessageTransformer(JsonBuilderFactory jsonFact transformProperties(message.getProperties(), propertiesBuilder, mapper, context); var builder = jsonFactory.createObjectBuilder() .add(TYPE, EDC_DATA_FLOW_START_MESSAGE_TYPE) - .add(EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE, message.getFlowType().toString()) + .add(EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE_DESTINATION, message.getTransferType().destinationType()) + .add(EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE, message.getTransferType().flowType().toString()) .add(EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID, message.getAgreementId()) .add(DC_DATA_FLOW_START_MESSAGE_PROCESS_ID, message.getProcessId()) .add(EDC_DATA_FLOW_START_MESSAGE_DATASET_ID, message.getAssetId()) diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformer.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformer.java index f982a8d2548..7c0df0786ab 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformer.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformer.java @@ -20,6 +20,7 @@ import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.eclipse.edc.transform.spi.TransformerContext; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -37,6 +38,7 @@ import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID; import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PROPERTIES; import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE_DESTINATION; /** * Converts from a {@link JsonObject} in JSON-LD expanded form to a {@link DataFlowStartMessage}. @@ -50,7 +52,22 @@ public JsonObjectToDataFlowStartMessageTransformer() { @Override public @Nullable DataFlowStartMessage transform(@NotNull JsonObject object, @NotNull TransformerContext context) { var builder = Builder.newInstance(); + + var transferTypeDestination = object.get(EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE_DESTINATION); + var flowType = object.get(EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE); + if (transferTypeDestination != null && flowType != null) { + builder.transferType(new TransferType( + transformString(transferTypeDestination, context), + FlowType.valueOf(transformString(flowType, context)) + )); + } else { + context.problem().missingProperty().property("%s - %s".formatted(EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE_DESTINATION, EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE)) + .report(); + return null; + } + visitProperties(object, (s, jsonValue) -> transformProperties(s, jsonValue, builder, context)); + return builder.build(); } @@ -67,15 +84,10 @@ private void transformProperties(String key, JsonValue jsonValue, Builder builde } case EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID -> builder.participantId(transformString(jsonValue, context)); - case EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE -> - builder.flowType(FlowType.valueOf(transformString(jsonValue, context))); - case EDC_DATA_FLOW_START_MESSAGE_DESTINATION_DATA_ADDRESS -> builder.destinationDataAddress(transformObject(jsonValue, DataAddress.class, context)); - case EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS -> builder.sourceDataAddress(transformObject(jsonValue, DataAddress.class, context)); - default -> builder.property(key, transformString(jsonValue, context)); } } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformerTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformerTest.java index cb5ac002a2c..cf8a2d02d2d 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformerTest.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformerTest.java @@ -18,6 +18,7 @@ import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.eclipse.edc.transform.spi.TransformerContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,6 +37,7 @@ import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE; import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID; import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE_DESTINATION; import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TYPE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; @@ -44,7 +46,7 @@ class JsonObjectFromDataFlowStartMessageTransformerTest { - private final TransformerContext context = mock(TransformerContext.class); + private final TransformerContext context = mock(); private JsonObjectFromDataFlowStartMessageTransformer transformer; @BeforeEach @@ -55,13 +57,12 @@ void setUp() { @Test void transform() { - var message = DataFlowStartMessage.Builder.newInstance() .processId("processId") .assetId("assetId") .agreementId("agreementId") .participantId("participantId") - .flowType(FlowType.PUSH) + .transferType(new TransferType("HttpData", FlowType.PUSH)) .callbackAddress(URI.create("http://localhost")) .sourceDataAddress(DataAddress.Builder.newInstance().type("sourceType").build()) .destinationDataAddress(DataAddress.Builder.newInstance().type("destType").build()) @@ -70,17 +71,37 @@ void transform() { var jsonObject = transformer.transform(message, context); assertThat(jsonObject).isNotNull(); - assertThat(jsonObject.getJsonString(TYPE).getString()).isEqualTo(EDC_DATA_FLOW_START_MESSAGE_TYPE); assertThat(jsonObject.getJsonString(DC_DATA_FLOW_START_MESSAGE_PROCESS_ID).getString()).isEqualTo(message.getProcessId()); assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_DATASET_ID).getString()).isEqualTo(message.getAssetId()); assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID).getString()).isEqualTo(message.getAgreementId()); assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID).getString()).isEqualTo(message.getParticipantId()); assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_DESTINATION_CALLBACK_ADDRESS).getString()).isEqualTo(message.getCallbackAddress().toString()); + assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE_DESTINATION).getString()).isEqualTo("HttpData"); assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE).getString()).isEqualTo(message.getFlowType().toString()); assertThat(jsonObject.get(EDC_DATA_FLOW_START_MESSAGE_DESTINATION_DATA_ADDRESS)).isNotNull(); assertThat(jsonObject.get(EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS)).isNotNull(); + } + + @Test + void transform_whenTransferTypeIsNull() { + var message = DataFlowStartMessage.Builder.newInstance() + .processId("any") + .sourceDataAddress(DataAddress.Builder.newInstance().type("any").build()) + .agreementId("agreementId") + .participantId("participantId") + .assetId("assetId") + .callbackAddress(URI.create("http://localhost")) + .transferType(null) + .transferTypeDestination("DestinationType") + .flowType(FlowType.PULL) + .build(); + var jsonObject = transformer.transform(message, context); + + assertThat(jsonObject).isNotNull(); + assertThat(jsonObject.getString(EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE_DESTINATION)).isEqualTo("DestinationType"); + assertThat(jsonObject.getString(EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE)).isEqualTo("PULL"); } } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformerTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformerTest.java index 6daa9598b6a..b2d191589df 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformerTest.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformerTest.java @@ -20,6 +20,8 @@ import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; +import org.eclipse.edc.transform.spi.ProblemBuilder; import org.eclipse.edc.transform.spi.TransformerContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -37,6 +39,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class JsonObjectToDataFlowStartMessageTransformerTest { @@ -49,11 +52,11 @@ class JsonObjectToDataFlowStartMessageTransformerTest { void setUp() { transformer = new JsonObjectToDataFlowStartMessageTransformer(); when(context.transform(any(), eq(DataAddress.class))).thenReturn(DataAddress.Builder.newInstance().type("address-type").build()); + when(context.problem()).thenReturn(new ProblemBuilder(context)); } @Test void transform() { - var jsonObj = jsonFactory.createObjectBuilder() .add(CONTEXT, createContextBuilder().build()) .add(TYPE, DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TYPE) @@ -61,6 +64,7 @@ void transform() { .add("agreementId", "agreementId") .add("datasetId", "datasetId") .add("participantId", "participantId") + .add("transferTypeDestination", "transferTypeDestination") .add("flowType", "PULL") .add("sourceDataAddress", jsonFactory.createObjectBuilder().add("type", "address-type")) .add("destinationDataAddress", jsonFactory.createObjectBuilder().add("type", "address-type")) @@ -76,14 +80,33 @@ void transform() { assertThat(message.getAssetId()).isEqualTo("datasetId"); assertThat(message.getAgreementId()).isEqualTo("agreementId"); assertThat(message.getParticipantId()).isEqualTo("participantId"); - assertThat(message.getFlowType()).isEqualTo(FlowType.PULL); + assertThat(message.getTransferType()).isEqualTo(new TransferType("transferTypeDestination", FlowType.PULL)); assertThat(message.getDestinationDataAddress()).extracting(DataAddress::getType).isEqualTo("address-type"); assertThat(message.getSourceDataAddress()).extracting(DataAddress::getType).isEqualTo("address-type"); assertThat(message.getProperties()).containsEntry(EDC_NAMESPACE + "foo", "bar"); assertThat(message.getCallbackAddress()).isEqualTo(URI.create("http://localhost")); - } + @Test + void shouldFail_whenTransferTypeDataIsMissing() { + var jsonObj = jsonFactory.createObjectBuilder() + .add(CONTEXT, createContextBuilder().build()) + .add(TYPE, DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TYPE) + .add("processId", "processId") + .add("agreementId", "agreementId") + .add("datasetId", "datasetId") + .add("participantId", "participantId") + .add("sourceDataAddress", jsonFactory.createObjectBuilder().add("type", "address-type")) + .add("destinationDataAddress", jsonFactory.createObjectBuilder().add("type", "address-type")) + .add("properties", jsonFactory.createObjectBuilder().add("foo", "bar")) + .add("callbackAddress", "http://localhost") + .build(); + + var message = transformer.transform(getExpanded(jsonObj), context); + + assertThat(message).isNull(); + verify(context).reportProblem(any()); + } private JsonObjectBuilder createContextBuilder() { return jsonFactory.createObjectBuilder() diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql b/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql index 768320ac590..35ad86047d8 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql +++ b/extensions/data-plane/store/sql/data-plane-store-sql/docs/schema.sql @@ -31,7 +31,8 @@ CREATE TABLE IF NOT EXISTS edc_data_plane source JSON, destination JSON, properties JSON, - flow_type VARCHAR + flow_type VARCHAR, + transfer_type_destination VARCHAR ); COMMENT ON COLUMN edc_data_plane.trace_context IS 'Java Map serialized as JSON'; diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java index 75b802a4b54..1989571ce12 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlDataPlaneStore.java @@ -24,6 +24,7 @@ import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.eclipse.edc.sql.QueryExecutor; import org.eclipse.edc.sql.lease.SqlLeaseContextBuilder; import org.eclipse.edc.sql.store.AbstractSqlStore; @@ -149,7 +150,8 @@ private void insert(Connection connection, DataFlow dataFlow) { toJson(dataFlow.getSource()), toJson(dataFlow.getDestination()), toJson(dataFlow.getProperties()), - dataFlow.getFlowType().toString() + dataFlow.getTransferType().flowType().toString(), + dataFlow.getTransferType().destinationType() ); } @@ -166,7 +168,8 @@ private void update(Connection connection, DataFlow dataFlow) { toJson(dataFlow.getSource()), toJson(dataFlow.getDestination()), toJson(dataFlow.getProperties()), - dataFlow.getFlowType().toString(), + dataFlow.getTransferType().flowType().toString(), + dataFlow.getTransferType().destinationType(), dataFlow.getId()); } @@ -184,7 +187,10 @@ private DataFlow mapDataFlow(ResultSet resultSet) throws SQLException { .source(fromJson(resultSet.getString(statements.getSourceColumn()), DataAddress.class)) .destination(fromJson(resultSet.getString(statements.getDestinationColumn()), DataAddress.class)) .properties(fromJson(resultSet.getString(statements.getPropertiesColumn()), getTypeRef())) - .flowType(FlowType.valueOf(resultSet.getString(statements.getFlowTypeColumn()))) + .transferType(new TransferType( + resultSet.getString(statements.getTransferTypeDestinationColumn()), + FlowType.valueOf(resultSet.getString(statements.getFlowTypeColumn())) + )) .build(); } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java index 9c310727265..2dc0d94e662 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlDataPlaneStatements.java @@ -45,6 +45,7 @@ public String getInsertTemplate() { .jsonColumn(getDestinationColumn()) .jsonColumn(getPropertiesColumn()) .column(getFlowTypeColumn()) + .column(getTransferTypeDestinationColumn()) .insertInto(getDataPlaneTable()); } @@ -62,6 +63,7 @@ public String getUpdateTemplate() { .jsonColumn(getDestinationColumn()) .jsonColumn(getPropertiesColumn()) .column(getFlowTypeColumn()) + .column(getTransferTypeDestinationColumn()) .update(getDataPlaneTable(), getIdColumn()); } diff --git a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java index 51850bd3217..d9f849a9947 100644 --- a/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java +++ b/extensions/data-plane/store/sql/data-plane-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/DataPlaneStatements.java @@ -52,6 +52,10 @@ default String getFlowTypeColumn() { return "flow_type"; } + default String getTransferTypeDestinationColumn() { + return "transfer_type_destination"; + } + String getInsertTemplate(); String getUpdateTemplate(); diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java index 0ef9409b849..8d0d7a0a7db 100644 --- a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java @@ -37,11 +37,11 @@ @JsonDeserialize(builder = DataFlowStartMessage.Builder.class) public class DataFlowStartMessage implements Polymorphic, TraceCarrier { - public static final String DC_DATA_FLOW_START_MESSAGE_ID = EDC_NAMESPACE + "id"; public static final String DC_DATA_FLOW_START_MESSAGE_PROCESS_ID = EDC_NAMESPACE + "processId"; public static final String EDC_DATA_FLOW_START_MESSAGE_SIMPLE_TYPE = "DataFlowStartMessage"; public static final String EDC_DATA_FLOW_START_MESSAGE_TYPE = EDC_NAMESPACE + EDC_DATA_FLOW_START_MESSAGE_SIMPLE_TYPE; public static final String EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE = EDC_NAMESPACE + "flowType"; + public static final String EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE_DESTINATION = EDC_NAMESPACE + "transferTypeDestination"; public static final String EDC_DATA_FLOW_START_MESSAGE_DATASET_ID = EDC_NAMESPACE + "datasetId"; public static final String EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID = EDC_NAMESPACE + "participantId"; public static final String EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID = EDC_NAMESPACE + "agreementId"; @@ -59,11 +59,11 @@ public class DataFlowStartMessage implements Polymorphic, TraceCarrier { private DataAddress sourceDataAddress; private DataAddress destinationDataAddress; - private FlowType flowType = FlowType.PUSH; private URI callbackAddress; private Map properties = new HashMap<>(); private Map traceContext = Map.of(); // TODO: should this stay in the DataFlow class? + private TransferType transferType; private DataFlowStartMessage() { } @@ -100,7 +100,7 @@ public DataAddress getDestinationDataAddress() { * The {@link FlowType} for the request */ public FlowType getFlowType() { - return flowType; + return getTransferType().flowType(); } /** @@ -117,6 +117,14 @@ public String getAssetId() { return assetId; } + /** + * The transfer type associated to the request. + * + * @return the transfer type. + */ + public TransferType getTransferType() { + return transferType; + } /** * The participant id associated to the request @@ -157,6 +165,8 @@ public Builder toBuilder() { @JsonPOJOBuilder(withPrefix = "") public static class Builder { private final DataFlowStartMessage request; + private String transferTypeDestination; + private FlowType flowType; private Builder() { this(new DataFlowStartMessage()); @@ -200,8 +210,18 @@ public Builder destinationDataAddress(DataAddress destination) { return this; } + public Builder transferType(TransferType transferType) { + request.transferType = transferType; + return this; + } + + public Builder transferTypeDestination(String transferTypeDestination) { + this.transferTypeDestination = transferTypeDestination; + return this; + } + public Builder flowType(FlowType flowType) { - request.flowType = flowType; + this.flowType = flowType; return this; } @@ -244,10 +264,14 @@ public DataFlowStartMessage build() { if (request.id == null) { request.id = UUID.randomUUID().toString(); } + if (request.traceContext == null) { + request.traceContext = new HashMap<>(); + } Objects.requireNonNull(request.processId, "processId"); Objects.requireNonNull(request.sourceDataAddress, "sourceDataAddress"); - Objects.requireNonNull(request.traceContext, "traceContext"); - Objects.requireNonNull(request.flowType, "flowType"); + if (request.transferType == null) { + request.transferType = new TransferType(this.transferTypeDestination, this.flowType); + } return request; } diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/TransferType.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/TransferType.java new file mode 100644 index 00000000000..bd214763ea2 --- /dev/null +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/TransferType.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.spi.types.domain.transfer; + +/** + * Represent the transfer type. + * + * @param destinationType the destination data address type. + * @param flowType the flow type. + */ +public record TransferType(String destinationType, FlowType flowType) { } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/flow/FlowTypeExtractor.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/flow/TransferTypeParser.java similarity index 53% rename from spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/flow/FlowTypeExtractor.java rename to spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/flow/TransferTypeParser.java index 50f0fdfd94e..45a154043fb 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/flow/FlowTypeExtractor.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/flow/TransferTypeParser.java @@ -14,20 +14,16 @@ package org.eclipse.edc.connector.controlplane.transfer.spi.flow; -import org.eclipse.edc.spi.response.StatusResult; -import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; -/** - * Extract the {@link FlowType} from the transfer type - */ -@FunctionalInterface -public interface FlowTypeExtractor { +public interface TransferTypeParser { /** - * Return the {@link FlowType} associated to the transfer type. + * Parse the {@link TransferType}. * - * @param transferType the transfer type. - * @return the {@link FlowType}, failure if the operation failed. + * @param transferType the transfer type string representation. + * @return the {@link TransferType}, failure if the operation failed. */ - StatusResult extract(String transferType); + Result parse(String transferType); } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java index 0cc84537c9c..bb93049f4de 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java @@ -19,7 +19,7 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; -import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.jetbrains.annotations.Nullable; import java.net.URI; @@ -48,7 +48,7 @@ public class DataFlow extends StatefulEntity { private URI callbackAddress; private Map properties = new HashMap<>(); - private FlowType flowType = FlowType.PUSH; + private TransferType transferType; @Override public DataFlow copy() { @@ -57,7 +57,7 @@ public DataFlow copy() { .destination(destination) .callbackAddress(callbackAddress) .properties(properties) - .flowType(flowType); + .transferType(getTransferType()); return copy(builder); } @@ -83,8 +83,8 @@ public Map getProperties() { return Collections.unmodifiableMap(properties); } - public FlowType getFlowType() { - return flowType; + public TransferType getTransferType() { + return transferType; } public DataFlowStartMessage toRequest() { @@ -96,7 +96,7 @@ public DataFlowStartMessage toRequest() { .callbackAddress(getCallbackAddress()) .traceContext(traceContext) .properties(getProperties()) - .flowType(getFlowType()) + .transferType(getTransferType()) .build(); } @@ -175,8 +175,8 @@ public Builder callbackAddress(URI callbackAddress) { return this; } - public Builder flowType(FlowType flowType) { - entity.flowType = flowType; + public Builder transferType(TransferType transferType) { + entity.transferType = transferType; return this; } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/PublicEndpointGeneratorService.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/PublicEndpointGeneratorService.java index 34ea55889b8..d9bc4bf6c95 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/PublicEndpointGeneratorService.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/PublicEndpointGeneratorService.java @@ -26,13 +26,15 @@ * For example, for HTTP transfers this would likely return the internet-facing HTTP URL of the data plane ("public url"). */ public interface PublicEndpointGeneratorService { + /** - * Generates an endpoint for a particular resource (={@link DataAddress}). + * Generates an endpoint for a destination type and a particular {@link DataAddress} * - * @param sourceDataAddress The (private) resource identified by an internal {@link DataAddress}. + * @param destinationType the destination type + * @param sourceDataAddress the source data address. * @return The public {@link Endpoint} where the data is made available, or a failure if the endpoint could not be generated. */ - Result generateFor(DataAddress sourceDataAddress); + Result generateFor(String destinationType, DataAddress sourceDataAddress); /** * Adds a function that can generate a {@link Endpoint} for particular source data address. Typically, the source data address diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java index 5d8a4894de8..a7e0006b51d 100644 --- a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/testfixtures/store/DataPlaneStoreTestBase.java @@ -23,6 +23,7 @@ import org.eclipse.edc.spi.result.StoreFailure; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -70,8 +71,8 @@ private DataFlow createDataFlow(String id, DataFlowStates state) { .callbackAddress(URI.create("http://any")) .source(DataAddress.Builder.newInstance().type("src-type").build()) .destination(DataAddress.Builder.newInstance().type("dest-type").build()) - .flowType(FlowType.PUSH) .state(state.code()) + .transferType(new TransferType("transferType", FlowType.PUSH)) .build(); } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java index 1e0a12b9767..40542180bc5 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java @@ -14,24 +14,28 @@ package org.eclipse.edc.test.e2e; -import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.eclipse.edc.junit.extensions.EmbeddedRuntime; +import org.eclipse.edc.junit.extensions.RuntimeExtension; +import org.eclipse.edc.junit.extensions.RuntimePerClassExtension; import org.eclipse.edc.junit.testfixtures.TestUtils; import org.eclipse.edc.spi.security.Vault; import org.eclipse.edc.test.e2e.participant.DataPlaneParticipant; import org.junit.jupiter.api.extension.RegisterExtension; public abstract class AbstractDataPlaneTest { + protected static final DataPlaneParticipant DATAPLANE = DataPlaneParticipant.Builder.newInstance() .name("provider") .id("urn:connector:provider") .build(); + @RegisterExtension - protected static EdcRuntimeExtension runtime = - new EdcRuntimeExtension( - ":system-tests:e2e-dataplane-tests:runtimes:data-plane", + protected static RuntimeExtension runtime = + new RuntimePerClassExtension(new EmbeddedRuntime( "data-plane", - DATAPLANE.dataPlaneConfiguration() - ); + DATAPLANE.dataPlaneConfiguration(), + ":system-tests:e2e-dataplane-tests:runtimes:data-plane" + )); protected void seedVault() { var vault = runtime.getService(Vault.class); @@ -41,7 +45,5 @@ protected void seedVault() { var publicKey = TestUtils.getResourceFileContentAsString("certs/cert.pem"); vault.storeSecret("public-key", publicKey); - - vault.storeSecret("provision-oauth-secret", "supersecret"); } } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java index b1dbc6fe9d3..59300bbf0ad 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java @@ -33,11 +33,11 @@ import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.spi.types.domain.transfer.TransferType; import org.eclipse.edc.transform.TypeTransformerRegistryImpl; import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import org.eclipse.edc.transform.transformer.dspace.from.JsonObjectFromDataAddressDspaceTransformer; import org.eclipse.edc.transform.transformer.dspace.to.JsonObjectToDataAddressDspaceTransformer; -import org.hamcrest.Matchers; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -45,6 +45,7 @@ import java.net.URI; import java.util.Map; +import java.util.UUID; import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; @@ -78,8 +79,16 @@ void startTransfer() throws JsonProcessingException { seedVault(); var jsonLd = runtime.getService(JsonLd.class); - var processId = "test-processId"; - var flowMessage = createStartMessage(processId); + var processId = UUID.randomUUID().toString(); + var flowMessage = DataFlowStartMessage.Builder.newInstance() + .processId(processId) + .sourceDataAddress(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://foo.bar/").build()) + .transferType(new TransferType("HttpData", FlowType.PULL)) + .participantId("some-participantId") + .assetId("test-asset") + .callbackAddress(URI.create("https://foo.bar/callback")) + .agreementId("test-agreement") + .build(); var startMessage = registry.transform(flowMessage, JsonObject.class).orElseThrow(failTest()); var resultJson = DATAPLANE.getDataPlaneControlEndpoint() @@ -88,16 +97,15 @@ void startTransfer() throws JsonProcessingException { .body(startMessage) .post("/v1/dataflows") .then() - .body(Matchers.notNullValue()) + .log().ifValidationFails() .statusCode(200) - .log().ifError() + .body(notNullValue()) .extract().body().asString(); var dataFlowResponseMessage = jsonLd.expand(mapper.readValue(resultJson, JsonObject.class)) .compose(json -> registry.transform(json, DataFlowResponseMessage.class)) .orElseThrow(failTest()); - var dataAddress = dataFlowResponseMessage.getDataAddress(); // verify basic shape of the DSPACE data address (=EDR token) @@ -146,6 +154,7 @@ void terminate() { .id(dataFlowId) .source(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://foo.bar/").build()) .destination(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://fizz.buzz").build()) + .transferType(new TransferType("HttpData", FlowType.PUSH)) .traceContext(Map.of()) .state(DataFlowStates.STARTED.code()) .build(); @@ -173,20 +182,6 @@ void terminate() { } - - private DataFlowStartMessage createStartMessage(String processId) { - return DataFlowStartMessage.Builder.newInstance() - .processId(processId) - .sourceDataAddress(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://foo.bar/").build()) - .destinationDataAddress(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://fizz.buzz").build()) - .flowType(FlowType.PULL) - .participantId("some-participantId") - .assetId("test-asset") - .callbackAddress(URI.create("https://foo.bar/callback")) - .agreementId("test-agreement") - .build(); - } - @NotNull private Function failTest() { return f -> new AssertionError(f.getFailureDetail()); diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java index aa61a768fba..4d62af0fb8d 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java @@ -53,7 +53,6 @@ public Map dataPlaneConfiguration() { put("web.http.public.path", "/public"); put("web.http.control.port", String.valueOf(dataPlaneControl.getPort())); put("web.http.control.path", dataPlaneControl.getPath()); - put("edc.vault", resourceAbsolutePath(getName() + "-vault.properties")); put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); put("edc.keystore.password", "123456"); put("edc.dataplane.token.validation.endpoint", controlPlaneControl + "/token"); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java index f8e8717362e..58ff179f607 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java @@ -111,7 +111,6 @@ public Map controlPlaneConfiguration() { put("web.http.control.port", String.valueOf(controlPlaneControl.getPort())); put("web.http.control.path", controlPlaneControl.getPath()); put("edc.dsp.callback.address", protocolEndpoint.getUrl().toString()); - put("edc.vault", resourceAbsolutePath(getName() + "-vault.properties")); put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); put("edc.keystore.password", "123456"); put("edc.transfer.proxy.endpoint", dataPlanePublic.toString()); @@ -122,6 +121,10 @@ public Map controlPlaneConfiguration() { put("edc.negotiation.consumer.send.retry.base-delay.ms", "100"); put("edc.negotiation.provider.send.retry.base-delay.ms", "100"); + put("edc.negotiation.consumer.state-machine.iteration-wait-millis", "50"); + put("edc.negotiation.provider.state-machine.iteration-wait-millis", "50"); + put("edc.transfer.state-machine.iteration-wait-millis", "50"); + put("provisioner.http.entries.default.provisioner.type", "provider"); put("provisioner.http.entries.default.endpoint", "http://localhost:%d/provision".formatted(httpProvisionerPort)); put("provisioner.http.entries.default.data.address.type", "HttpProvision"); @@ -144,7 +147,6 @@ public Map dataPlaneConfiguration() { put("web.http.public.path", "/public"); put("web.http.control.port", String.valueOf(dataPlaneControl.getPort())); put("web.http.control.path", dataPlaneControl.getPath()); - put("edc.vault", resourceAbsolutePath(getName() + "-vault.properties")); put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); put("edc.keystore.password", "123456"); put("edc.dataplane.api.public.baseurl", dataPlanePublic + "/v2/"); @@ -152,6 +154,7 @@ public Map dataPlaneConfiguration() { put("edc.transfer.proxy.token.signer.privatekey.alias", "private-key"); put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key"); put("edc.dataplane.http.sink.partition.size", "1"); + put("edc.dataplane.state-machine.iteration-wait-millis", "50"); put("edc.dpf.selector.url", controlPlaneControl + "/v1/dataplanes"); } };