Skip to content

Commit

Permalink
feat: implement Asset/Dataset specialization pt. 2 (#4301)
Browse files Browse the repository at this point in the history
* let catalog extend dataset, update distribution resolver

* update dataset resolver + test

* add e2e test (wip)

* added e2e test

* renamed method
  • Loading branch information
paullatzelsperger authored Jun 25, 2024
1 parent e60d30b commit 8956e78
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 92 deletions.
3 changes: 2 additions & 1 deletion core/control-plane/control-plane-catalog/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ dependencies {
api(project(":spi:control-plane:transfer-spi"))
api(project(":spi:control-plane:asset-spi"))

testImplementation(project(":tests:junit-base"));
implementation(project(":spi:common:data-address:data-address-http-data-spi"))
testImplementation(project(":tests:junit-base"))

testImplementation(project(":core:common:connector-core"))
testImplementation(project(":core:control-plane:control-plane-core"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@

import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex;
import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataService;
import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset;
import org.eclipse.edc.connector.controlplane.catalog.spi.DatasetResolver;
import org.eclipse.edc.connector.controlplane.catalog.spi.DistributionResolver;
import org.eclipse.edc.connector.controlplane.contract.spi.ContractOfferId;
import org.eclipse.edc.connector.controlplane.contract.spi.offer.ContractDefinitionResolver;
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema;
import org.eclipse.edc.policy.model.PolicyType;
import org.eclipse.edc.spi.agent.ParticipantAgent;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.query.QuerySpec;
import org.jetbrains.annotations.NotNull;

import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
Expand Down Expand Up @@ -75,10 +79,23 @@ public Dataset getById(ParticipantAgent agent, String id) {
.orElse(null);
}

private Dataset.Builder buildDataset(Asset asset) {
if (!asset.isCatalog()) {
return Dataset.Builder.newInstance();
}

return Catalog.Builder.newInstance()
.dataService(DataService.Builder.newInstance()
.id(Base64.getUrlEncoder().encodeToString(asset.getId().getBytes()))
.endpointDescription(asset.getDescription())
.endpointUrl(asset.getDataAddress().getStringProperty(HttpDataAddressSchema.BASE_URL, null))
.build());
}

private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset asset) {

var distributions = distributionResolver.getDistributions(asset);
var datasetBuilder = Dataset.Builder.newInstance()
var datasetBuilder = buildDataset(asset)
.id(asset.getId())
.distributions(distributions)
.properties(asset.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package org.eclipse.edc.connector.controlplane.catalog;

import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataService;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.connector.controlplane.catalog.spi.Distribution;
import org.eclipse.edc.connector.controlplane.catalog.spi.DistributionResolver;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;

import java.util.Base64;
import java.util.List;

public class DefaultDistributionResolver implements DistributionResolver {
Expand All @@ -35,6 +37,14 @@ public DefaultDistributionResolver(DataServiceRegistry dataServiceRegistry, Data

@Override
public List<Distribution> getDistributions(Asset asset) {
if (asset.isCatalog()) {
return List.of(Distribution.Builder.newInstance()
.format(asset.getDataAddress().getType())
.dataService(DataService.Builder.newInstance()
.id(Base64.getUrlEncoder().encodeToString(asset.getId().getBytes()))
.build())
.build());
}
return dataFlowManager.transferTypesFor(asset).stream().map(this::createDistribution).toList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.assertj.core.api.iterable.ThrowingExtractor;
import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex;
import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataService;
import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset;
import org.eclipse.edc.connector.controlplane.catalog.spi.DatasetResolver;
Expand All @@ -28,12 +29,14 @@
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.query.CriterionOperatorRegistryImpl;
import org.eclipse.edc.spi.agent.ParticipantAgent;
import org.eclipse.edc.spi.message.Range;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -219,6 +222,37 @@ void query_shouldLimitDataset_whenMultipleDefinitionsWithSameAssets() {
.map(getId()).containsExactly("6", "7");
}

@Test
void query_shouldReturnCatalogWithinCatalog_whenAssetIsCatalogAsset() {
var contractDefinition = contractDefinitionBuilder("definitionId").contractPolicyId("contractPolicyId").build();
var contractPolicy = Policy.Builder.newInstance().build();
var distribution = Distribution.Builder.newInstance().dataService(DataService.Builder.newInstance()
.endpointDescription("test-asset-desc")
.endpointUrl("https://foo.bar/baz")
.build())
.format(HttpDataAddressSchema.HTTP_DATA_TYPE).build();

when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(Stream.of(createAsset("assetId")
.property(Asset.PROPERTY_IS_CATALOG, true)
.dataAddress(DataAddress.Builder.newInstance().type(HttpDataAddressSchema.HTTP_DATA_TYPE).build())
.build()));
when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
when(distributionResolver.getDistributions(isA(Asset.class))).thenReturn(List.of(distribution));

var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none());

assertThat(datasets).isNotNull().hasSize(1).first().satisfies(dataset -> {
assertThat(dataset).isInstanceOf(Catalog.class);
assertThat(dataset.getId()).isEqualTo("assetId");
assertThat(dataset.getOffers()).hasSize(1).allSatisfy((id, policy) -> {
assertThat(ContractOfferId.parseId(id)).isSucceeded().extracting(ContractOfferId::definitionPart).asString().isEqualTo("definitionId");
assertThat(policy.getType()).isEqualTo(OFFER);
assertThat(policy.getTarget()).isEqualTo(null);
});
});
}

@Test
void getById_shouldReturnDataset() {
var policy1 = Policy.Builder.newInstance().inheritsFrom("inherits1").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.controlplane.catalog.spi.DataService;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -57,4 +58,28 @@ void shouldReturnDistributionsForEveryTransferType() {
assertThat(distribution.getDataService()).isSameAs(dataService);
});
}

@Test
void shouldReturnDistribution_whenAssetIsCatalog() {
when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService));
when(dataFlowManager.transferTypesFor(any())).thenReturn(Set.of("type1", "type2"));

var dataAddress = DataAddress.Builder.newInstance()
.type("HttpData")
.property(HttpDataAddressSchema.BASE_URL, "http://quizzqua.zz/buzz")
.build();
var asset = Asset.Builder.newInstance()
.dataAddress(dataAddress)
.property(Asset.PROPERTY_IS_CATALOG, true)
.description("test description")
.build();

var distributions = resolver.getDistributions(asset);

assertThat(distributions).hasSize(1)
.anySatisfy(distribution -> {
assertThat(distribution.getFormat()).isEqualTo("HttpData");
assertThat(distribution.getDataService().getId()).isNotNull();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.jetbrains.annotations.Nullable;

import static jakarta.json.stream.JsonCollectors.toJsonArray;
import static java.util.Optional.ofNullable;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_CATALOG_TYPE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DATASET_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DATA_SERVICE_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DISTRIBUTION_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DSPACE_PROPERTY_PARTICIPANT_ID;

/**
Expand Down Expand Up @@ -57,13 +59,19 @@ public JsonObjectFromCatalogTransformer(JsonBuilderFactory jsonFactory, ObjectMa
.map(service -> context.transform(service, JsonObject.class))
.collect(toJsonArray());

var distributions = catalog.getDistributions().stream()
.map(distro -> context.transform(distro, JsonObject.class))
.collect(toJsonArray());

var objectBuilder = jsonFactory.createObjectBuilder()
.add(ID, catalog.getId())
.add(TYPE, DCAT_CATALOG_TYPE)
.add(DSPACE_PROPERTY_PARTICIPANT_ID, participantIdMapper.toIri(catalog.getParticipantId()))
.add(DCAT_DATASET_ATTRIBUTE, datasets)
.add(DCAT_DISTRIBUTION_ATTRIBUTE, distributions)
.add(DCAT_DATA_SERVICE_ATTRIBUTE, dataServices);

ofNullable(catalog.getParticipantId()).ifPresent(pid -> objectBuilder.add(DSPACE_PROPERTY_PARTICIPANT_ID, participantIdMapper.toIri(pid)));

transformProperties(catalog.getProperties(), objectBuilder, mapper, context);

return objectBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,16 @@
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.UUID.randomUUID;

/**
* Entity representing a Catalog
*/
@JsonDeserialize(builder = Catalog.Builder.class)
public class Catalog {
private String id;
private List<Dataset> datasets = new ArrayList<>();
private List<DataService> dataServices;
private Map<String, Object> properties;
private String participantId;

public String getId() {
return id;
}
public class Catalog extends Dataset {
protected final List<Dataset> datasets = new ArrayList<>();
protected List<DataService> dataServices = new ArrayList<>();
protected String participantId;

public List<Dataset> getDatasets() {
return datasets;
Expand All @@ -48,78 +38,47 @@ public List<DataService> getDataServices() {
return dataServices;
}

public Map<String, Object> getProperties() {
return properties;
}

public String getParticipantId() {
return participantId;
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder {
private final Catalog catalog;
public static class Builder extends Dataset.Builder<Catalog, Catalog.Builder> {

private Builder() {
catalog = new Catalog();
super(new Catalog());
}

public static Builder newInstance() {
return new Builder();
}

public Builder id(String id) {
catalog.id = id;
return this;
}

public Builder datasets(List<Dataset> datasets) {
catalog.datasets.addAll(datasets);
dataset.datasets.addAll(datasets);
return this;
}

public Builder dataset(Dataset dataset) {
catalog.datasets.add(dataset);
this.dataset.datasets.add(dataset);
return this;
}

public Builder dataServices(List<DataService> dataServices) {
catalog.dataServices = dataServices;
this.dataset.dataServices = dataServices;
return this;
}

public Builder dataService(DataService dataService) {
if (catalog.dataServices == null) {
catalog.dataServices = new ArrayList<>();
if (this.dataset.dataServices == null) {
this.dataset.dataServices = new ArrayList<>();
}
catalog.dataServices.add(dataService);
return this;
}

public Builder properties(Map<String, Object> properties) {
catalog.properties = properties;
return this;
}

public Builder property(String key, Object value) {
if (catalog.properties == null) {
catalog.properties = new HashMap<>();
}
catalog.properties.put(key, value);
this.dataset.dataServices.add(dataService);
return this;
}

public Builder participantId(String participantId) {
catalog.participantId = participantId;
this.dataset.participantId = participantId;
return this;
}

public Catalog build() {
if (catalog.id == null) {
catalog.id = randomUUID().toString();
}

return catalog;
}
}
}
Loading

0 comments on commit 8956e78

Please sign in to comment.