Skip to content

Commit

Permalink
fix: unbind public endpoint generator from source address type (#4315)
Browse files Browse the repository at this point in the history
* fix: unbind public endpoint generator from source address type

* pass transfer type

* refactor

* remove flow type extractor

* Trigger build
  • Loading branch information
ndr-brt committed Jul 2, 2024
1 parent 44786b5 commit 7abbdad
Show file tree
Hide file tree
Showing 37 changed files with 382 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.eclipse.edc.connector.controlplane.services.transferprocess.TransferProcessServiceImpl;
import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser;
import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable;
import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.secret.spi.observe.SecretObservableImpl;
Expand Down Expand Up @@ -171,7 +171,7 @@ public class ControlPlaneServicesExtension implements ServiceExtension {
private DataFlowManager dataFlowManager;

@Inject
private FlowTypeExtractor flowTypeExtractor;
private TransferTypeParser transferTypeParser;

@Override
public String name() {
Expand Down Expand Up @@ -237,7 +237,7 @@ public PolicyDefinitionService policyDefinitionService() {
@Provider
public TransferProcessService transferProcessService() {
return new TransferProcessServiceImpl(transferProcessStore, transferProcessManager, transactionContext,
dataAddressValidator, commandHandlerRegistry, flowTypeExtractor, contractNegotiationStore);
dataAddressValidator, commandHandlerRegistry, transferTypeParser, contractNegotiationStore);
}

@Provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.eclipse.edc.connector.controlplane.services.query.QueryValidator;
import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser;
import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DeprovisionedResource;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.ProvisionResponse;
Expand Down Expand Up @@ -61,19 +61,19 @@ public class TransferProcessServiceImpl implements TransferProcessService {
private final QueryValidator queryValidator;
private final DataAddressValidatorRegistry dataAddressValidator;
private final CommandHandlerRegistry commandHandlerRegistry;
private final FlowTypeExtractor flowTypeExtractor;
private final TransferTypeParser transferTypeParser;
private final ContractNegotiationStore contractNegotiationStore;

public TransferProcessServiceImpl(TransferProcessStore transferProcessStore, TransferProcessManager manager,
TransactionContext transactionContext, DataAddressValidatorRegistry dataAddressValidator,
CommandHandlerRegistry commandHandlerRegistry, FlowTypeExtractor flowTypeExtractor,
CommandHandlerRegistry commandHandlerRegistry, TransferTypeParser transferTypeParser,
ContractNegotiationStore contractNegotiationStore) {
this.transferProcessStore = transferProcessStore;
this.manager = manager;
this.transactionContext = transactionContext;
this.dataAddressValidator = dataAddressValidator;
this.commandHandlerRegistry = commandHandlerRegistry;
this.flowTypeExtractor = flowTypeExtractor;
this.transferTypeParser = transferTypeParser;
this.contractNegotiationStore = contractNegotiationStore;
queryValidator = new QueryValidator(TransferProcess.class, getSubtypes());
}
Expand Down Expand Up @@ -127,12 +127,17 @@ public ServiceResult<List<TransferProcess>> search(QuerySpec query) {

@Override
public @NotNull ServiceResult<TransferProcess> initiateTransfer(TransferRequest request) {
var transferTypeParse = transferTypeParser.parse(request.getTransferType());
if (transferTypeParse.failed()) {
return ServiceResult.badRequest("Property transferType not valid: " + transferTypeParse.getFailureDetail());
}

var agreement = contractNegotiationStore.findContractAgreement(request.getContractId());
if (agreement == null) {
return ServiceResult.badRequest("Contract agreement with id %s not found".formatted(request.getContractId()));
}

var flowType = flowTypeExtractor.extract(request.getTransferType()).getContent();
var flowType = transferTypeParse.getContent().flowType();

if (flowType == FlowType.PUSH) {
if (request.getDataDestination() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ private TransferRequest createTransferRequest() {
.protocol("test")
.counterPartyAddress("http://an/address")
.contractId("contractId")
.transferType("DestinationType-PUSH")
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.eclipse.edc.connector.controlplane.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser;
import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates;
Expand All @@ -34,9 +34,11 @@
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.result.ServiceFailure;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
import org.eclipse.edc.spi.types.domain.transfer.TransferType;
import org.eclipse.edc.transaction.spi.NoopTransactionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.edc.validator.spi.DataAddressValidatorRegistry;
Expand Down Expand Up @@ -79,11 +81,11 @@ class TransferProcessServiceImplTest {
private final TransactionContext transactionContext = spy(new NoopTransactionContext());
private final DataAddressValidatorRegistry dataAddressValidator = mock();
private final CommandHandlerRegistry commandHandlerRegistry = mock();
private final FlowTypeExtractor flowTypeExtractor = mock();
private final TransferTypeParser transferTypeParser = mock();
private final ContractNegotiationStore contractNegotiationStore = mock();

private final TransferProcessService service = new TransferProcessServiceImpl(store, manager, transactionContext,
dataAddressValidator, commandHandlerRegistry, flowTypeExtractor, contractNegotiationStore);
dataAddressValidator, commandHandlerRegistry, transferTypeParser, contractNegotiationStore);

@Test
void findById_whenFound() {
Expand Down Expand Up @@ -150,7 +152,7 @@ void shouldInitiateTransfer() {
var transferProcess = transferProcess();
when(contractNegotiationStore.findContractAgreement(transferRequest.getContractId()))
.thenReturn(createContractAgreement(transferProcess.getContractId(), "assetId"));
when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH));
when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("DestinationType", FlowType.PUSH)));
when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.success());
when(manager.initiateConsumerRequest(transferRequest)).thenReturn(StatusResult.success(transferProcess));

Expand All @@ -160,9 +162,22 @@ void shouldInitiateTransfer() {
verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class));
}

@Test
void shouldFail_whenTransferTypeIsNotValid() {
when(transferTypeParser.parse(any())).thenReturn(Result.failure("cannot parse"));

var result = service.initiateTransfer(transferRequest());

assertThat(result).isFailed()
.extracting(ServiceFailure::getReason)
.isEqualTo(BAD_REQUEST);
assertThat(result.getFailureDetail()).contains("cannot parse");
verifyNoInteractions(manager);
}

@Test
void shouldFail_whenContractAgreementNotFound() {
when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH));
when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("DestinationType", FlowType.PUSH)));
when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.failure(violation("invalid data address", "path")));

var result = service.initiateTransfer(transferRequest());
Expand All @@ -179,7 +194,7 @@ void shouldFail_whenDestinationIsNotValid() {
var transferRequest = transferRequest();
when(contractNegotiationStore.findContractAgreement(transferRequest.getContractId()))
.thenReturn(createContractAgreement(transferRequest.getContractId(), "assetId"));
when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH));
when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("DestinationType", FlowType.PUSH)));
when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.failure(violation("invalid data address", "path")));

var result = service.initiateTransfer(transferRequest);
Expand All @@ -198,7 +213,7 @@ void shouldFail_whenDataDestinationNotPassedAndFlowTypeIsPush() {
.build();
when(contractNegotiationStore.findContractAgreement(transferRequest.getContractId()))
.thenReturn(createContractAgreement(transferRequest.getContractId(), "assetId"));
when(flowTypeExtractor.extract(any())).thenReturn(StatusResult.success(FlowType.PUSH));
when(transferTypeParser.parse(any())).thenReturn(Result.success(new TransferType("DestinationType", FlowType.PUSH)));

var result = service.initiateTransfer(transferRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
package org.eclipse.edc.connector.controlplane.transfer;

import org.eclipse.edc.connector.controlplane.transfer.flow.DataFlowManagerImpl;
import org.eclipse.edc.connector.controlplane.transfer.flow.FlowTypeExtractorImpl;
import org.eclipse.edc.connector.controlplane.transfer.flow.TransferTypeParserImpl;
import org.eclipse.edc.connector.controlplane.transfer.observe.TransferProcessObservableImpl;
import org.eclipse.edc.connector.controlplane.transfer.provision.ProvisionManagerImpl;
import org.eclipse.edc.connector.controlplane.transfer.provision.ResourceManifestGeneratorImpl;
import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessPendingGuard;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser;
import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable;
import org.eclipse.edc.connector.controlplane.transfer.spi.provision.ProvisionManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.provision.ResourceManifestGenerator;
Expand Down Expand Up @@ -71,8 +71,8 @@ public TransferProcessPendingGuard pendingGuard() {
}

@Provider
public FlowTypeExtractor flowTypeExtractor() {
return new FlowTypeExtractorImpl();
public TransferTypeParser transferTypeParser() {
return new TransferTypeParserImpl();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,30 @@

package org.eclipse.edc.connector.controlplane.transfer.flow;

import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.spi.response.ResponseStatus;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
import org.eclipse.edc.spi.types.domain.transfer.TransferType;

import java.util.Optional;

public class FlowTypeExtractorImpl implements FlowTypeExtractor {
public class TransferTypeParserImpl implements TransferTypeParser {

@Override
public StatusResult<FlowType> extract(String transferType) {
return Optional.ofNullable(transferType)
public Result<TransferType> parse(String transferType) {
Optional<Result<TransferType>> parsed = Optional.ofNullable(transferType)
.map(type -> type.split("-"))
.filter(tokens -> tokens.length == 2)
.map(tokens -> parseFlowType(tokens[1]))
.orElse(StatusResult.failure(ResponseStatus.FATAL_ERROR, "Failed to extract flow type from transferType %s".formatted(transferType)));
.map(tokens -> parseFlowType(tokens[1]).map(flowType -> new TransferType(tokens[0], flowType)));

return parsed.orElse(Result.failure("Failed to extract flow type from transferType %s".formatted(transferType)));
}

private StatusResult<FlowType> parseFlowType(String flowType) {
private Result<FlowType> parseFlowType(String flowType) {
try {
return StatusResult.success(FlowType.valueOf(flowType));
return Result.success(FlowType.valueOf(flowType));
} catch (Exception e) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, "Unknown flow type %s".formatted(flowType));
return Result.failure("Unknown flow type %s".formatted(flowType));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,50 @@

package org.eclipse.edc.connector.controlplane.transfer.flow;

import org.eclipse.edc.connector.controlplane.transfer.spi.flow.FlowTypeExtractor;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL;
import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH;

class FlowTypeExtractorImplTest {
class TransferTypeParserImplTest {

private final FlowTypeExtractor extractor = new FlowTypeExtractorImpl();
private final TransferTypeParser parser = new TransferTypeParserImpl();

@Test
void shouldExtractPull() {
var result = extractor.extract("Any-PULL");
var result = parser.parse("DestinationType-PULL");

assertThat(result).isSucceeded().isEqualTo(PULL);
assertThat(result).isSucceeded().satisfies(type -> {
assertThat(type.destinationType()).isEqualTo("DestinationType");
assertThat(type.flowType()).isEqualTo(PULL);
});
}

@Test
void shouldExtractPush() {
var result = extractor.extract("Any-PUSH");
var result = parser.parse("DestinationType-PUSH");

assertThat(result).isSucceeded().isEqualTo(PUSH);
assertThat(result).isSucceeded().satisfies(type -> {
assertThat(type.destinationType()).isEqualTo("DestinationType");
assertThat(type.flowType()).isEqualTo(PUSH);
});
}

@Test
void shouldReturnFatalError_whenTypeIsUnknown() {
var result = extractor.extract("Any-NOT_KNOWN");
var result = parser.parse("Any-NOT_KNOWN");

assertThat(result).isFailed();
}

@Test
void shouldReturnFatalError_whenFormatIsNotCorrect() {
var result = extractor.extract("not_correct");
var result = parser.parse("not_correct");

assertThat(result).isFailed();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.eclipse.edc.spi.types.domain.DataAddress;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
Expand All @@ -30,14 +28,14 @@ class PublicEndpointGeneratorServiceImpl implements PublicEndpointGeneratorServi
private final Map<String, Function<DataAddress, Endpoint>> generatorFunctions = new ConcurrentHashMap<>();

@Override
public Result<Endpoint> generateFor(DataAddress sourceDataAddress) {
Objects.requireNonNull(sourceDataAddress);
Objects.requireNonNull(sourceDataAddress.getType());

return Optional.ofNullable(generatorFunctions.get(sourceDataAddress.getType()))
.map(function -> function.apply(sourceDataAddress))
.map(Result::success)
.orElseGet(() -> Result.failure("No Endpoint generator function registered for source data type '%s'".formatted(sourceDataAddress.getType())));
public Result<Endpoint> generateFor(String destinationType, DataAddress sourceDataAddress) {
var function = generatorFunctions.get(destinationType);
if (function == null) {
return Result.failure("No Endpoint generator function registered for transfer type destination '%s'".formatted(destinationType));
}

var endpoint = function.apply(sourceDataAddress);
return Result.success(endpoint);
}

@Override
Expand Down
Loading

0 comments on commit 7abbdad

Please sign in to comment.