Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement Asset/Dataset specialization pt. 2 #4301

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/control-plane/control-plane-catalog/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ dependencies {
api(project(":spi:control-plane:transfer-spi"))
api(project(":spi:control-plane:asset-spi"))

testImplementation(project(":tests:junit-base"));
implementation(project(":spi:common:data-address:data-address-http-data-spi"))
paullatzelsperger marked this conversation as resolved.
Show resolved Hide resolved
testImplementation(project(":tests:junit-base"))

testImplementation(project(":core:common:connector-core"))
testImplementation(project(":core:control-plane:control-plane-core"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@

import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex;
import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataService;
import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset;
import org.eclipse.edc.connector.controlplane.catalog.spi.DatasetResolver;
import org.eclipse.edc.connector.controlplane.catalog.spi.DistributionResolver;
import org.eclipse.edc.connector.controlplane.contract.spi.ContractOfferId;
import org.eclipse.edc.connector.controlplane.contract.spi.offer.ContractDefinitionResolver;
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema;
import org.eclipse.edc.policy.model.PolicyType;
import org.eclipse.edc.spi.agent.ParticipantAgent;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.query.QuerySpec;
import org.jetbrains.annotations.NotNull;

import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
Expand Down Expand Up @@ -75,10 +79,23 @@ public Dataset getById(ParticipantAgent agent, String id) {
.orElse(null);
}

private Dataset.Builder buildDataset(Asset asset) {
if (!asset.isCatalog()) {
return Dataset.Builder.newInstance();
}

return Catalog.Builder.newInstance()
.dataService(DataService.Builder.newInstance()
.id(Base64.getUrlEncoder().encodeToString(asset.getId().getBytes()))
.endpointDescription(asset.getDescription())
.endpointUrl(asset.getDataAddress().getStringProperty(HttpDataAddressSchema.BASE_URL, null))
.build());
}

private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset asset) {

var distributions = distributionResolver.getDistributions(asset);
var datasetBuilder = Dataset.Builder.newInstance()
var datasetBuilder = buildDataset(asset)
.id(asset.getId())
.distributions(distributions)
.properties(asset.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package org.eclipse.edc.connector.controlplane.catalog;

import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataService;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.connector.controlplane.catalog.spi.Distribution;
import org.eclipse.edc.connector.controlplane.catalog.spi.DistributionResolver;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;

import java.util.Base64;
import java.util.List;

public class DefaultDistributionResolver implements DistributionResolver {
Expand All @@ -35,6 +37,14 @@ public DefaultDistributionResolver(DataServiceRegistry dataServiceRegistry, Data

@Override
public List<Distribution> getDistributions(Asset asset) {
if (asset.isCatalog()) {
return List.of(Distribution.Builder.newInstance()
.format(asset.getDataAddress().getType())
.dataService(DataService.Builder.newInstance()
.id(Base64.getUrlEncoder().encodeToString(asset.getId().getBytes()))
.build())
.build());
}
return dataFlowManager.transferTypesFor(asset).stream().map(this::createDistribution).toList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.assertj.core.api.iterable.ThrowingExtractor;
import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex;
import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataService;
import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset;
import org.eclipse.edc.connector.controlplane.catalog.spi.DatasetResolver;
Expand All @@ -28,12 +29,14 @@
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.query.CriterionOperatorRegistryImpl;
import org.eclipse.edc.spi.agent.ParticipantAgent;
import org.eclipse.edc.spi.message.Range;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -219,6 +222,37 @@ void query_shouldLimitDataset_whenMultipleDefinitionsWithSameAssets() {
.map(getId()).containsExactly("6", "7");
}

@Test
void query_shouldReturnCatalogWithinCatalog_whenAssetIsCatalogAsset() {
var contractDefinition = contractDefinitionBuilder("definitionId").contractPolicyId("contractPolicyId").build();
var contractPolicy = Policy.Builder.newInstance().build();
var distribution = Distribution.Builder.newInstance().dataService(DataService.Builder.newInstance()
.endpointDescription("test-asset-desc")
.endpointUrl("https://foo.bar/baz")
.build())
.format(HttpDataAddressSchema.HTTP_DATA_TYPE).build();

when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(Stream.of(createAsset("assetId")
.property(Asset.PROPERTY_IS_CATALOG, true)
.dataAddress(DataAddress.Builder.newInstance().type(HttpDataAddressSchema.HTTP_DATA_TYPE).build())
.build()));
when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
when(distributionResolver.getDistributions(isA(Asset.class))).thenReturn(List.of(distribution));

var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none());

assertThat(datasets).isNotNull().hasSize(1).first().satisfies(dataset -> {
assertThat(dataset).isInstanceOf(Catalog.class);
assertThat(dataset.getId()).isEqualTo("assetId");
assertThat(dataset.getOffers()).hasSize(1).allSatisfy((id, policy) -> {
assertThat(ContractOfferId.parseId(id)).isSucceeded().extracting(ContractOfferId::definitionPart).asString().isEqualTo("definitionId");
assertThat(policy.getType()).isEqualTo(OFFER);
assertThat(policy.getTarget()).isEqualTo(null);
});
});
}

@Test
void getById_shouldReturnDataset() {
var policy1 = Policy.Builder.newInstance().inheritsFrom("inherits1").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.controlplane.catalog.spi.DataService;
import org.eclipse.edc.connector.controlplane.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -57,4 +58,28 @@ void shouldReturnDistributionsForEveryTransferType() {
assertThat(distribution.getDataService()).isSameAs(dataService);
});
}

@Test
void shouldReturnDistribution_whenAssetIsCatalog() {
when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService));
when(dataFlowManager.transferTypesFor(any())).thenReturn(Set.of("type1", "type2"));

var dataAddress = DataAddress.Builder.newInstance()
.type("HttpData")
.property(HttpDataAddressSchema.BASE_URL, "http://quizzqua.zz/buzz")
.build();
var asset = Asset.Builder.newInstance()
.dataAddress(dataAddress)
.property(Asset.PROPERTY_IS_CATALOG, true)
.description("test description")
.build();

var distributions = resolver.getDistributions(asset);

assertThat(distributions).hasSize(1)
.anySatisfy(distribution -> {
assertThat(distribution.getFormat()).isEqualTo("HttpData");
assertThat(distribution.getDataService().getId()).isNotNull();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.jetbrains.annotations.Nullable;

import static jakarta.json.stream.JsonCollectors.toJsonArray;
import static java.util.Optional.ofNullable;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_CATALOG_TYPE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DATASET_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DATA_SERVICE_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DISTRIBUTION_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DSPACE_PROPERTY_PARTICIPANT_ID;

/**
Expand Down Expand Up @@ -57,13 +59,19 @@ public JsonObjectFromCatalogTransformer(JsonBuilderFactory jsonFactory, ObjectMa
.map(service -> context.transform(service, JsonObject.class))
.collect(toJsonArray());

var distributions = catalog.getDistributions().stream()
.map(distro -> context.transform(distro, JsonObject.class))
.collect(toJsonArray());

var objectBuilder = jsonFactory.createObjectBuilder()
.add(ID, catalog.getId())
.add(TYPE, DCAT_CATALOG_TYPE)
.add(DSPACE_PROPERTY_PARTICIPANT_ID, participantIdMapper.toIri(catalog.getParticipantId()))
.add(DCAT_DATASET_ATTRIBUTE, datasets)
.add(DCAT_DISTRIBUTION_ATTRIBUTE, distributions)
.add(DCAT_DATA_SERVICE_ATTRIBUTE, dataServices);

ofNullable(catalog.getParticipantId()).ifPresent(pid -> objectBuilder.add(DSPACE_PROPERTY_PARTICIPANT_ID, participantIdMapper.toIri(pid)));

transformProperties(catalog.getProperties(), objectBuilder, mapper, context);

return objectBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,16 @@
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.UUID.randomUUID;

/**
* Entity representing a Catalog
*/
@JsonDeserialize(builder = Catalog.Builder.class)
public class Catalog {
private String id;
private List<Dataset> datasets = new ArrayList<>();
private List<DataService> dataServices;
private Map<String, Object> properties;
private String participantId;

public String getId() {
return id;
}
public class Catalog extends Dataset {
protected final List<Dataset> datasets = new ArrayList<>();
protected List<DataService> dataServices = new ArrayList<>();
protected String participantId;

public List<Dataset> getDatasets() {
return datasets;
Expand All @@ -48,78 +38,47 @@ public List<DataService> getDataServices() {
return dataServices;
}

public Map<String, Object> getProperties() {
return properties;
}

public String getParticipantId() {
return participantId;
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder {
private final Catalog catalog;
public static class Builder extends Dataset.Builder<Catalog, Catalog.Builder> {

private Builder() {
catalog = new Catalog();
super(new Catalog());
}

public static Builder newInstance() {
return new Builder();
}

public Builder id(String id) {
catalog.id = id;
return this;
}

public Builder datasets(List<Dataset> datasets) {
catalog.datasets.addAll(datasets);
dataset.datasets.addAll(datasets);
return this;
}

public Builder dataset(Dataset dataset) {
catalog.datasets.add(dataset);
this.dataset.datasets.add(dataset);
return this;
}

public Builder dataServices(List<DataService> dataServices) {
catalog.dataServices = dataServices;
this.dataset.dataServices = dataServices;
return this;
}

public Builder dataService(DataService dataService) {
if (catalog.dataServices == null) {
catalog.dataServices = new ArrayList<>();
if (this.dataset.dataServices == null) {
this.dataset.dataServices = new ArrayList<>();
}
catalog.dataServices.add(dataService);
return this;
}

public Builder properties(Map<String, Object> properties) {
catalog.properties = properties;
return this;
}

public Builder property(String key, Object value) {
if (catalog.properties == null) {
catalog.properties = new HashMap<>();
}
catalog.properties.put(key, value);
this.dataset.dataServices.add(dataService);
return this;
}

public Builder participantId(String participantId) {
catalog.participantId = participantId;
this.dataset.participantId = participantId;
return this;
}

public Catalog build() {
if (catalog.id == null) {
catalog.id = randomUUID().toString();
}

return catalog;
}
}
}
Loading
Loading