Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add findById method on DataPlaneSelectorService #4225

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public ServiceResult<DataPlaneInstance> select(DataAddress source, String transf
});
}


@Override
public ServiceResult<Void> addInstance(DataPlaneInstance instance) {
return transactionContext.execute(() -> {
Expand All @@ -86,4 +85,15 @@ public ServiceResult<Void> addInstance(DataPlaneInstance instance) {
public ServiceResult<Void> delete(String instanceId) {
return transactionContext.execute(() -> ServiceResult.from(store.deleteById(instanceId))).mapEmpty();
}

@Override
public ServiceResult<DataPlaneInstance> findById(String id) {
return transactionContext.execute(() -> {
var instance = store.findById(id);
if (instance == null) {
return ServiceResult.notFound("Data Plane instance with id %s not found".formatted(id));
}
return ServiceResult.success(instance);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.junit.jupiter.api.Test;

import java.util.UUID;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.stream.IntStream.range;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.spi.result.ServiceFailure.Reason.BAD_REQUEST;
import static org.eclipse.edc.spi.result.ServiceFailure.Reason.NOT_FOUND;
Expand All @@ -44,39 +44,47 @@ public class EmbeddedDataPlaneSelectorServiceTest {
private final SelectionStrategyRegistry selectionStrategyRegistry = mock();
private final DataPlaneSelectorService service = new EmbeddedDataPlaneSelectorService(store, selectionStrategyRegistry, new NoopTransactionContext());

@Test
void select_shouldUseChosenSelector() {
var instances = IntStream.range(0, 10).mapToObj(i -> createInstanceMock("instance" + i, "srcTestType", "destTestType")).toList();
when(store.getAll()).thenReturn(instances.stream());
SelectionStrategy selectionStrategy = mock();
when(selectionStrategy.apply(any())).thenAnswer(it -> instances.get(0));
when(selectionStrategyRegistry.find(any())).thenReturn(selectionStrategy);

var result = service.select(createAddress("srcTestType"), "transferType", "strategy");
@Nested
class Select {

assertThat(result).isSucceeded().extracting(DataPlaneInstance::getId).isEqualTo("instance0");
verify(selectionStrategyRegistry).find("strategy");
}
@Test
void select_shouldUseChosenSelector() {
var instances = range(0, 10)
.mapToObj(i -> createInstanceBuilder("instance" + i).build())
.toList();
when(store.getAll()).thenReturn(instances.stream());
SelectionStrategy selectionStrategy = mock();
when(selectionStrategy.apply(any())).thenAnswer(it -> instances.get(0));
when(selectionStrategyRegistry.find(any())).thenReturn(selectionStrategy);

var result = service.select(createAddress("srcTestType"), "transferType", "strategy");

assertThat(result).isSucceeded().extracting(DataPlaneInstance::getId).isEqualTo("instance0");
verify(selectionStrategyRegistry).find("strategy");
}

@Test
void select_shouldReturnBadRequest_whenStrategyNotFound() {
var instances = IntStream.range(0, 10).mapToObj(i -> createInstanceMock("instance" + i, "srcTestType", "destTestType")).toList();
when(store.getAll()).thenReturn(instances.stream());
when(selectionStrategyRegistry.find(any())).thenReturn(null);
@Test
void select_shouldReturnBadRequest_whenStrategyNotFound() {
var instances = range(0, 10)
.mapToObj(i -> createInstanceBuilder("instance" + i).build())
.toList();
when(store.getAll()).thenReturn(instances.stream());
when(selectionStrategyRegistry.find(any())).thenReturn(null);

var result = service.select(createAddress("srcTestType"), "transferType", "strategy");
var result = service.select(createAddress("srcTestType"), "transferType", "strategy");

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(BAD_REQUEST);
}
assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(BAD_REQUEST);
}

@Test
void select_shouldReturnNotFound_whenInstanceNotFound() {
when(store.getAll()).thenReturn(Stream.empty());
when(selectionStrategyRegistry.find(any())).thenReturn(mock());
@Test
void select_shouldReturnNotFound_whenInstanceNotFound() {
when(store.getAll()).thenReturn(Stream.empty());
when(selectionStrategyRegistry.find(any())).thenReturn(mock());

var result = service.select(createAddress("srcTestType"), "transferType", "strategy");
var result = service.select(createAddress("srcTestType"), "transferType", "strategy");

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND);
assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND);
}
}

@Nested
Expand Down Expand Up @@ -105,11 +113,27 @@ void shouldReturnNotFound_whenInstanceIsNotFound() {

}

private DataPlaneInstance createInstanceMock(String id, String srcType, String destType) {
return createInstanceBuilder(id)
.allowedSourceType(srcType)
.allowedDestType(destType)
.build();
@Nested
class FindById {

@Test
void shouldReturnInstance() {
var instance = createInstanceBuilder("instanceId").build();
when(store.findById(any())).thenReturn(instance);

var result = service.findById("instanceId");

assertThat(result).isSucceeded().isSameAs(instance);
}

@Test
void shouldFail_whenInstanceDoesNotExist() {
when(store.findById(any())).thenReturn(null);

var result = service.findById("any");

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND);
}
}

private DataPlaneInstance.Builder createInstanceBuilder(String id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ public boolean canHandle(TransferProcess transferProcess) {
.dataPlaneId(dataPlaneInstance.getId())
.build()
);

}

@Override
Expand Down Expand Up @@ -150,17 +149,10 @@ public Set<String> transferTypesFor(Asset asset) {
}

private StatusResult<DataPlaneClient> getClientForDataplane(String id) {
var result = selectorClient.getAll();
if (result.failed()) {
return StatusResult.failure(FATAL_ERROR, result.getFailureDetail());
}

return result.getContent().stream()
.filter(instance -> instance.getId().equals(id))
.findFirst()
return selectorClient.findById(id)
.map(clientFactory::createClient)
.map(StatusResult::success)
.orElseGet(() -> StatusResult.failure(FATAL_ERROR, "No data-plane found with id %s".formatted(id)));
.orElse(f -> StatusResult.failure(FATAL_ERROR, "No data-plane found with id %s. %s".formatted(id, f.getFailureDetail())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,14 @@ class Terminate {
@Test
void shouldCallTerminateOnTheRightDataPlane() {
var dataPlaneInstance = dataPlaneInstanceBuilder().id("dataPlaneId").build();
var anotherDataPlane = dataPlaneInstanceBuilder().id("anotherId").build();
var transferProcess = transferProcessBuilder()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("dataPlaneId")
.build();
when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance, anotherDataPlane)));
when(selectorService.findById(any())).thenReturn(ServiceResult.success(dataPlaneInstance));

var result = flowController.terminate(transferProcess);

Expand All @@ -241,35 +240,20 @@ void shouldCallTerminateOnTheRightDataPlane() {
}

@Test
void shouldFail_withInvalidDataPlaneId() {
var dataPlaneInstance = createDataPlaneInstance();
void shouldFail_whenDataPlaneNotFound() {
var transferProcess = transferProcessBuilder()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("invalid")
.build();
when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance)));
when(selectorService.findById(any())).thenReturn(ServiceResult.notFound("not found"));

var result = flowController.terminate(transferProcess);

assertThat(result).isFailed().detail().contains("Failed to select the data plane for terminating the transfer process");
}

@Test
void shouldFail_whenCannotGetDataplaneInstances() {
var transferProcess = transferProcessBuilder()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("invalid")
.build();
when(selectorService.getAll()).thenReturn(ServiceResult.unexpected("error"));

var result = flowController.terminate(transferProcess);

assertThat(result).isFailed();
}
}

@Nested
Expand All @@ -285,26 +269,7 @@ void shouldCallTerminate() {
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
var dataPlaneInstance = dataPlaneInstanceBuilder().id("dataPlaneId").build();
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance)));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
}

@Test
void shouldCallSuspendOnTheRightDataPlane() {
var dataPlaneInstance = dataPlaneInstanceBuilder().id("dataPlaneId").build();
var anotherDataPlane = dataPlaneInstanceBuilder().id("anotherDataPlaneId").build();
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance, anotherDataPlane)));
when(selectorService.findById(any())).thenReturn(ServiceResult.success(dataPlaneInstance));

var result = flowController.suspend(transferProcess);

Expand All @@ -314,35 +279,21 @@ void shouldCallSuspendOnTheRightDataPlane() {
}

@Test
void shouldFail_withInvalidDataPlaneId() {
var dataPlaneInstance = createDataPlaneInstance();
void shouldFail_whenDataPlaneDoesNotExist() {
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("invalid")
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(dataPlaneInstance)));
when(selectorService.findById(any())).thenReturn(ServiceResult.notFound("not found"));

var result = flowController.suspend(transferProcess);

assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process");
}

@Test
void shouldFail_whenCannotGetDataplaneInstances() {
var transferProcess = transferProcessBuilder()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("invalid")
.build();
when(selectorService.getAll()).thenReturn(ServiceResult.unexpected("error"));

var result = flowController.suspend(transferProcess);

assertThat(result).isFailed();
}
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ public RemoteDataPlaneSelectorService(EdcHttpClient httpClient, String url, Obje

@Override
public ServiceResult<List<DataPlaneInstance>> getAll() {
var builder = new Request.Builder().get().url(url);
authenticationProvider.authenticationHeaders().forEach(builder::header);
var requestBuilder = new Request.Builder().get().url(url);

return request(builder.build())
return request(requestBuilder)
.compose(this::toJsonArray)
.map(it -> it.stream()
.map(j -> typeTransformerRegistry.transform(j, DataPlaneInstance.class))
Expand All @@ -95,10 +94,9 @@ public ServiceResult<DataPlaneInstance> select(DataAddress source, String transf

var body = RequestBody.create(jsonObject.toString(), TYPE_JSON);

var builder = new Request.Builder().post(body).url(url + SELECT_PATH);
authenticationProvider.authenticationHeaders().forEach(builder::header);
var requestBuilder = new Request.Builder().post(body).url(url + SELECT_PATH);

return request(builder.build()).compose(this::toJsonObject)
return request(requestBuilder).compose(this::toJsonObject)
.map(it -> typeTransformerRegistry.transform(it, DataPlaneInstance.class))
.compose(ServiceResult::from);
}
Expand All @@ -115,22 +113,30 @@ public ServiceResult<Void> addInstance(DataPlaneInstance instance) {
.build();
var body = RequestBody.create(requestBody.toString(), TYPE_JSON);

var builder = new Request.Builder().post(body).url(url);
authenticationProvider.authenticationHeaders().forEach(builder::header);
var requestBuilder = new Request.Builder().post(body).url(url);

return request(builder.build()).mapEmpty();
return request(requestBuilder).mapEmpty();
}

@Override
public ServiceResult<Void> delete(String instanceId) {
var request = new Request.Builder().delete().url(url + "/" + instanceId).build();
var requestBuilder = new Request.Builder().delete().url(url + "/" + instanceId);

return request(request).mapEmpty();
return request(requestBuilder).mapEmpty();
}

private <R> ServiceResult<String> request(Request request) {
@Override
public ServiceResult<DataPlaneInstance> findById(String id) {
var requestBuilder = new Request.Builder().get().url(url + "/" + id);

return request(requestBuilder).compose(this::toJsonObject)
.map(it -> typeTransformerRegistry.transform(it, DataPlaneInstance.class).getContent());
}

private <R> ServiceResult<String> request(Request.Builder requestBuilder) {
authenticationProvider.authenticationHeaders().forEach(requestBuilder::header);
try (
var response = httpClient.execute(request);
var response = httpClient.execute(requestBuilder.build());
var responseBody = response.body();
) {
var bodyAsString = responseBody == null ? null : responseBody.string();
Expand Down
Loading
Loading