Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dataset endpoint management api #3342

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.eclipse.edc.connector.service.catalog;

import org.eclipse.edc.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.catalog.spi.DatasetRequestMessage;
import org.eclipse.edc.connector.spi.catalog.CatalogService;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.query.QuerySpec;
Expand All @@ -32,7 +33,7 @@ public CatalogServiceImpl(RemoteMessageDispatcherRegistry dispatcher) {
}

@Override
public CompletableFuture<StatusResult<byte[]>> request(String providerUrl, String protocol, QuerySpec querySpec) {
public CompletableFuture<StatusResult<byte[]>> requestCatalog(String providerUrl, String protocol, QuerySpec querySpec) {
var request = CatalogRequestMessage.Builder.newInstance()
.protocol(protocol)
.counterPartyAddress(providerUrl)
Expand All @@ -41,4 +42,15 @@ public CompletableFuture<StatusResult<byte[]>> request(String providerUrl, Strin

return dispatcher.dispatch(byte[].class, request);
}

@Override
public CompletableFuture<StatusResult<byte[]>> requestDataset(String id, String counterPartyAddress, String protocol) {
var request = DatasetRequestMessage.Builder.newInstance()
.datasetId(id)
.protocol(protocol)
.counterPartyAddress(counterPartyAddress)
.build();

return dispatcher.dispatch(byte[].class, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.edc.connector.service.catalog;

import org.eclipse.edc.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.catalog.spi.DatasetRequestMessage;
import org.eclipse.edc.connector.spi.catalog.CatalogService;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.query.QuerySpec;
Expand All @@ -38,14 +39,26 @@ class CatalogServiceImplTest {
private final CatalogService service = new CatalogServiceImpl(dispatcher);

@Test
void request_shouldDispatchRequestAndReturnResult() {
void requestCatalog_shouldDispatchRequestAndReturnResult() {
when(dispatcher.dispatch(eq(byte[].class), any())).thenReturn(completedFuture(StatusResult.success("content".getBytes())));

var result = service.request("http://provider/url", "protocol", QuerySpec.none());
var result = service.requestCatalog("http://provider/url", "protocol", QuerySpec.none());

assertThat(result).succeedsWithin(5, SECONDS).satisfies(statusResult -> {
assertThat(statusResult).isSucceeded().isEqualTo("content".getBytes());
});
verify(dispatcher).dispatch(eq(byte[].class), isA(CatalogRequestMessage.class));
}

@Test
void requestDataset_shouldDispatchRequestAndReturnResult() {
when(dispatcher.dispatch(eq(byte[].class), any())).thenReturn(completedFuture(StatusResult.success("content".getBytes())));

var result = service.requestDataset("datasetId", "http://provider/url", "protocol");

assertThat(result).succeedsWithin(5, SECONDS).satisfies(statusResult -> {
assertThat(statusResult).isSucceeded().isEqualTo("content".getBytes());
});
verify(dispatcher).dispatch(eq(byte[].class), isA(DatasetRequestMessage.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public interface CatalogApiPaths {

String BASE_PATH = "/catalog";
String CATALOG_REQUEST = "/request";

String DATASET_REQUEST = "/datasets";

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@

package org.eclipse.edc.protocol.dsp.catalog.dispatcher;

import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate.CatalogRequestHttpDelegate;
import org.eclipse.edc.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.catalog.spi.DatasetRequestMessage;
import org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate.CatalogRequestHttpRawDelegate;
import org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate.DatasetRequestHttpRawDelegate;
import org.eclipse.edc.protocol.dsp.dispatcher.GetDspHttpRequestFactory;
import org.eclipse.edc.protocol.dsp.dispatcher.PostDspHttpRequestFactory;
import org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpRemoteMessageDispatcher;
import org.eclipse.edc.protocol.dsp.spi.serialization.JsonLdRemoteMessageSerializer;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;

import static org.eclipse.edc.spi.CoreConstants.JSON_LD;
import static org.eclipse.edc.protocol.dsp.catalog.dispatcher.CatalogApiPaths.BASE_PATH;
import static org.eclipse.edc.protocol.dsp.catalog.dispatcher.CatalogApiPaths.CATALOG_REQUEST;
import static org.eclipse.edc.protocol.dsp.catalog.dispatcher.CatalogApiPaths.DATASET_REQUEST;

/**
* Creates and registers the HTTP dispatcher delegate for sending a catalog request as defined in
Expand All @@ -41,12 +44,6 @@ public class DspCatalogHttpDispatcherExtension implements ServiceExtension {
private DspHttpRemoteMessageDispatcher messageDispatcher;
@Inject
private JsonLdRemoteMessageSerializer remoteMessageSerializer;
@Inject
private TypeManager typeManager;
@Inject
private TypeTransformerRegistry transformerRegistry;
@Inject
private JsonLd jsonLdService;

@Override
public String name() {
Expand All @@ -55,9 +52,16 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
var mapper = typeManager.getMapper(JSON_LD);
messageDispatcher.registerDelegate(new CatalogRequestHttpDelegate(remoteMessageSerializer, mapper, transformerRegistry, jsonLdService));
messageDispatcher.registerDelegate(new CatalogRequestHttpRawDelegate(remoteMessageSerializer));
messageDispatcher.registerMessage(
CatalogRequestMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + CATALOG_REQUEST),
new CatalogRequestHttpRawDelegate()
);
messageDispatcher.registerMessage(
DatasetRequestMessage.class,
new GetDspHttpRequestFactory<>(m -> BASE_PATH + DATASET_REQUEST + "/" + m.getDatasetId()),
new DatasetRequestHttpRawDelegate()
);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,22 @@
package org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate;

import jakarta.json.JsonObject;
import okhttp3.Request;
import okhttp3.Response;
import org.eclipse.edc.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpDispatcherDelegate;
import org.eclipse.edc.protocol.dsp.spi.serialization.JsonLdRemoteMessageSerializer;
import org.eclipse.edc.spi.EdcException;

import java.io.IOException;
import java.util.function.Function;

import static org.eclipse.edc.protocol.dsp.catalog.dispatcher.CatalogApiPaths.BASE_PATH;
import static org.eclipse.edc.protocol.dsp.catalog.dispatcher.CatalogApiPaths.CATALOG_REQUEST;

/**
* Delegate for dispatching catalog requests as defined in the
* <a href="https://docs.internationaldataspaces.org/dataspace-protocol/catalog/catalog.binding.https">dataspace protocol specification</a>
*/
public class CatalogRequestHttpRawDelegate extends DspHttpDispatcherDelegate<CatalogRequestMessage, byte[]> {

public CatalogRequestHttpRawDelegate(JsonLdRemoteMessageSerializer serializer) {
super(serializer);
}

@Override
public Class<CatalogRequestMessage> getMessageType() {
return CatalogRequestMessage.class;
}

/**
* Sends a catalog request. The request body is constructed as defined in the dataspace protocol
* implementation. The request is sent to the remote component using the path from the
* specification.
*
* @param message the message
* @return the built okhttp request
*/
@Override
public Request buildRequest(CatalogRequestMessage message) {
return buildRequest(message, BASE_PATH + CATALOG_REQUEST);
public CatalogRequestHttpRawDelegate() {
super();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.protocol.dsp.catalog.dispatcher.delegate;

import jakarta.json.JsonObject;
import okhttp3.Response;
import org.eclipse.edc.catalog.spi.DatasetRequestMessage;
import org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpDispatcherDelegate;
import org.eclipse.edc.spi.EdcException;

import java.io.IOException;
import java.util.function.Function;

/**
* Delegate for dispatching catalog requests as defined in the
* <a href="https://docs.internationaldataspaces.org/dataspace-protocol/catalog/catalog.binding.https">dataspace protocol specification</a>
*/
public class DatasetRequestHttpRawDelegate extends DspHttpDispatcherDelegate<DatasetRequestMessage, byte[]> {

public DatasetRequestHttpRawDelegate() {
super();
}

/**
* Parses the response to a byte[]. It cannot return a {@link java.io.InputStream} because the response gets closed
* by the {@link org.eclipse.edc.spi.http.EdcHttpClient}
*
* @return a function that transforms the response body to a {@link JsonObject}.
*/
@Override
public Function<Response, byte[]> parseResponse() {
return response -> {
try {
return response.body().bytes();

} catch (NullPointerException e) {
throw new EdcException("Failed to read response body, as body was null.", e);
} catch (IOException e) {
throw new EdcException("Failed to read response body", e);
}
};
}
}
Loading
Loading