Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement get dataset dsp endpoint #3243

Merged
merged 2 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.edc.connector.contract.spi.offer.ContractDefinitionResolver;
import org.eclipse.edc.connector.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.agent.ParticipantAgent;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.asset.AssetPredicateConverter;
Expand All @@ -30,12 +29,11 @@
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.lang.Integer.MAX_VALUE;
import static java.util.stream.Collectors.toList;

public class DatasetResolverImpl implements DatasetResolver {

Expand All @@ -56,68 +54,47 @@ public DatasetResolverImpl(ContractDefinitionResolver contractDefinitionResolver
@Override
@NotNull
public Stream<Dataset> query(ParticipantAgent agent, QuerySpec querySpec) {
var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).collect(toList());
var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).toList();
var assetsQuery = QuerySpec.Builder.newInstance().offset(0).limit(MAX_VALUE).filter(querySpec.getFilterExpression()).build();
return assetIndex.queryAssets(assetsQuery)
.map(asset -> {
var offers = contractDefinitions.stream()
.filter(definition -> definition.getAssetsSelector().stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and)
.test(asset))
.map(contractDefinition -> createOffer(contractDefinition, asset.getId()))
.filter(Objects::nonNull)
.collect(toList());
return new ProtoDataset(asset, offers);
})
.filter(ProtoDataset::hasOffers)
.map(asset -> toDataset(contractDefinitions, asset))
.filter(Dataset::hasOffers)
.skip(querySpec.getOffset())
.limit(querySpec.getLimit())
.map(proto -> {
var asset = proto.asset;
var offers = proto.offers;
var distributions = distributionResolver.getDistributions(asset, null); // TODO: data addresses should be retrieved
var datasetBuilder = Dataset.Builder.newInstance()
.distributions(distributions)
.properties(asset.getProperties());

offers.forEach(offer -> datasetBuilder.offer(offer.contractId, offer.policy.withTarget(asset.getId())));

return datasetBuilder.build();
});
}

private Offer createOffer(ContractDefinition definition, String assetId) {
var policyDefinition = policyDefinitionStore.findById(definition.getContractPolicyId());
if (policyDefinition == null) {
return null;
}
var contractId = ContractId.create(definition.getId(), assetId);
return new Offer(contractId.toString(), policyDefinition.getPolicy());
.limit(querySpec.getLimit());
}

private static class Offer {
private final String contractId;
private final Policy policy;

Offer(String contractId, Policy policy) {
this.contractId = contractId;
this.policy = policy;
}
@Override
public Dataset getById(ParticipantAgent agent, String id) {
var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).toList();
return Optional.of(id)
.map(assetIndex::findById)
.map(asset -> toDataset(contractDefinitions, asset))
.orElse(null);
}

private static class ProtoDataset {
private final Asset asset;
private final List<Offer> offers;

private ProtoDataset(Asset asset, List<Offer> offers) {
this.asset = asset;
this.offers = offers;
}
private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset asset) {

var distributions = distributionResolver.getDistributions(asset, null); // TODO: data addresses should be retrieved
var datasetBuilder = Dataset.Builder.newInstance()
.id(asset.getId())
.distributions(distributions)
.properties(asset.getProperties());

contractDefinitions.stream()
.filter(definition -> definition.getAssetsSelector().stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and)
.test(asset)
)
.forEach(contractDefinition -> {
var policyDefinition = policyDefinitionStore.findById(contractDefinition.getContractPolicyId());
if (policyDefinition != null) {
var contractId = ContractId.create(contractDefinition.getId(), asset.getId());
datasetBuilder.offer(contractId.toString(), policyDefinition.getPolicy().withTarget(asset.getId()));
}
});

boolean hasOffers() {
return offers.size() > 0;
}
return datasetBuilder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ class DatasetResolverImplPerformanceTest {

private final Clock clock = Clock.systemUTC();

@NotNull
private static PolicyDefinition.Builder createPolicyDefinition(String id) {
return PolicyDefinition.Builder.newInstance().id(id).policy(Policy.Builder.newInstance().build());
}

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
Expand Down Expand Up @@ -100,6 +95,11 @@ void fewDefinitionsSelectAllAssets(DatasetResolver datasetResolver, ContractDefi
assertThat(lastPageDatasets).hasSize(100);
}

@NotNull
private PolicyDefinition.Builder createPolicyDefinition(String id) {
return PolicyDefinition.Builder.newInstance().id(id).policy(Policy.Builder.newInstance().build());
}

private Stream<Dataset> queryDatasetsIn(DatasetResolver datasetResolver, QuerySpec querySpec, Duration duration) {
var start = clock.instant();
var datasets = datasetResolver.query(new ParticipantAgent(emptyMap(), emptyMap()), querySpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,9 @@
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
Expand Down Expand Up @@ -87,7 +84,7 @@ void query_shouldReturnOneDatasetPerAsset() {
var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none());

assertThat(datasets).isNotNull().hasSize(1).first().satisfies(dataset -> {
assertThat(dataset.getId()).matches(isUuid());
assertThat(dataset.getId()).isEqualTo("assetId");
assertThat(dataset.getDistributions()).hasSize(1).first().isEqualTo(distribution);
assertThat(dataset.getOffers()).hasSize(1).allSatisfy((id, policy) -> {
assertThat(ContractId.parseId(id)).isSucceeded().extracting(ContractId::definitionPart).asString().isEqualTo("definitionId");
Expand Down Expand Up @@ -117,14 +114,14 @@ void query_shouldReturnOneDataset_whenMultipleDefinitionsOnSameAsset() {
contractDefinitionBuilder("definition1").contractPolicyId("policy1").build(),
contractDefinitionBuilder("definition2").contractPolicyId("policy2").build()
));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> Stream.of(createAsset("id").build()));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> Stream.of(createAsset("assetId").build()));
when(policyStore.findById("policy1")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy1).build());
when(policyStore.findById("policy2")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy2).build());

var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none());

assertThat(datasets).hasSize(1).first().satisfies(dataset -> {
assertThat(dataset.getId()).matches(isUuid());
assertThat(dataset.getId()).isEqualTo("assetId");
assertThat(dataset.getOffers()).hasSize(2)
.anySatisfy((id, policy) -> {
assertThat(ContractId.parseId(id)).isSucceeded().extracting(ContractId::definitionPart).asString().isEqualTo("definition1");
Expand Down Expand Up @@ -162,7 +159,7 @@ void query_shouldFilterAssetsByPassedCriteria() {
void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_contained() {
var contractDefinition = contractDefinitionBuilder("definitionId").contractPolicyId("contractPolicyId").build();
var contractPolicy = Policy.Builder.newInstance().build();
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList());
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList();
when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream());
when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
Expand All @@ -177,7 +174,7 @@ void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_contained()
void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_overflowing() {
var contractDefinition = contractDefinitionBuilder("definitionId").contractPolicyId("contractPolicyId").build();
var contractPolicy = Policy.Builder.newInstance().build();
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList());
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList();
when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream());
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
Expand All @@ -190,9 +187,9 @@ void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_overflowing(

@Test
void query_shouldLimitDataset_whenMultipleDefinitionAndMultipleAssets_across() {
var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).collect(toList());
var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).toList();
var contractPolicy = Policy.Builder.newInstance().build();
var assets = range(0, 20).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList());
var assets = range(0, 20).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList();
when(contractDefinitionResolver.definitionsFor(any())).thenAnswer(it -> contractDefinitions.stream());
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream());
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
Expand All @@ -205,9 +202,9 @@ void query_shouldLimitDataset_whenMultipleDefinitionAndMultipleAssets_across() {

@Test
void query_shouldLimitDataset_whenMultipleDefinitionsWithSameAssets() {
var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).collect(toList());
var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).toList();
var contractPolicy = Policy.Builder.newInstance().build();
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList());
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList();
when(contractDefinitionResolver.definitionsFor(any())).thenAnswer(it -> contractDefinitions.stream());
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream());
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
Expand All @@ -216,12 +213,52 @@ void query_shouldLimitDataset_whenMultipleDefinitionsWithSameAssets() {
var datasets = datasetResolver.query(createParticipantAgent(), querySpec);

assertThat(datasets).hasSize(2)
.allSatisfy(dataset -> {
assertThat(dataset.getOffers()).hasSize(2);
})
.allSatisfy(dataset -> assertThat(dataset.getOffers()).hasSize(2))
.map(getId()).containsExactly("6", "7");
}

@Test
void getById_shouldReturnDataset() {
var policy1 = Policy.Builder.newInstance().type(SET).build();
var policy2 = Policy.Builder.newInstance().type(OFFER).build();
when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(
contractDefinitionBuilder("definition1").contractPolicyId("policy1").build(),
contractDefinitionBuilder("definition2").contractPolicyId("policy2").build()
));
when(assetIndex.findById(any())).thenReturn(createAsset("datasetId").build());
when(policyStore.findById("policy1")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy1).build());
when(policyStore.findById("policy2")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy2).build());
var participantAgent = createParticipantAgent();

var dataset = datasetResolver.getById(participantAgent, "datasetId");

assertThat(dataset).isNotNull();
assertThat(dataset.getId()).isEqualTo("datasetId");
assertThat(dataset.getOffers()).hasSize(2)
.anySatisfy((id, policy) -> {
assertThat(ContractId.parseId(id)).isSucceeded().extracting(ContractId::definitionPart).isEqualTo("definition1");
assertThat(policy.getType()).isEqualTo(SET);
})
.anySatisfy((id, policy) -> {
assertThat(ContractId.parseId(id)).isSucceeded().extracting(ContractId::definitionPart).isEqualTo("definition2");
assertThat(policy.getType()).isEqualTo(OFFER);
});
verify(assetIndex).findById("datasetId");
verify(contractDefinitionResolver).definitionsFor(participantAgent);
}

@Test
void getById_shouldReturnNull_whenAssetNotFound() {
when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(
contractDefinitionBuilder("definition1").contractPolicyId("policy1").build()
));
when(assetIndex.findById(any())).thenReturn(null);
var participantAgent = createParticipantAgent();

var dataset = datasetResolver.getById(participantAgent, "datasetId");

assertThat(dataset).isNull();
}

private ContractDefinition.Builder contractDefinitionBuilder(String id) {
return ContractDefinition.Builder.newInstance()
Expand All @@ -247,16 +284,4 @@ private ThrowingExtractor<Dataset, Object, RuntimeException> getId() {
return it -> it.getProperty(Asset.PROPERTY_ID);
}

@NotNull
private Predicate<String> isUuid() {
return it -> {
try {
UUID.fromString(it);
return true;
} catch (Exception e) {
return false;
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import org.eclipse.edc.catalog.spi.Catalog;
import org.eclipse.edc.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.catalog.spi.Dataset;
import org.eclipse.edc.catalog.spi.DatasetResolver;
import org.eclipse.edc.connector.spi.catalog.CatalogProtocolService;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.agent.ParticipantAgentService;
import org.eclipse.edc.spi.iam.ClaimToken;
import org.jetbrains.annotations.NotNull;

import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;

Expand All @@ -47,8 +49,8 @@ public CatalogProtocolServiceImpl(DatasetResolver datasetResolver,

@Override
@NotNull
public ServiceResult<Catalog> getCatalog(CatalogRequestMessage message, ClaimToken token) {
var agent = participantAgentService.createFor(token);
public ServiceResult<Catalog> getCatalog(CatalogRequestMessage message, ClaimToken claimToken) {
var agent = participantAgentService.createFor(claimToken);

try (var datasets = datasetResolver.query(agent, message.getQuerySpec())) {
var dataServices = dataServiceRegistry.getDataServices();
Expand All @@ -62,4 +64,17 @@ public ServiceResult<Catalog> getCatalog(CatalogRequestMessage message, ClaimTok
return ServiceResult.success(catalog);
}
}

@Override
public @NotNull ServiceResult<Dataset> getDataset(String datasetId, ClaimToken claimToken) {
var agent = participantAgentService.createFor(claimToken);

var dataset = datasetResolver.getById(agent, datasetId);

if (dataset == null) {
return ServiceResult.notFound(format("Dataset %s does not exist", datasetId));
}

return ServiceResult.success(dataset);
}
}
Loading