Skip to content

Commit

Permalink
feat: implement get dataset dsp endpoint (#3243)
Browse files Browse the repository at this point in the history
* feat: implement get dataset dsp endpoint

* PR remark
  • Loading branch information
ndr-brt authored Jun 29, 2023
1 parent 33a22bc commit 2999513
Show file tree
Hide file tree
Showing 15 changed files with 333 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.edc.connector.contract.spi.offer.ContractDefinitionResolver;
import org.eclipse.edc.connector.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.agent.ParticipantAgent;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.asset.AssetPredicateConverter;
Expand All @@ -30,12 +29,11 @@
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.lang.Integer.MAX_VALUE;
import static java.util.stream.Collectors.toList;

public class DatasetResolverImpl implements DatasetResolver {

Expand All @@ -56,68 +54,47 @@ public DatasetResolverImpl(ContractDefinitionResolver contractDefinitionResolver
@Override
@NotNull
public Stream<Dataset> query(ParticipantAgent agent, QuerySpec querySpec) {
var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).collect(toList());
var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).toList();
var assetsQuery = QuerySpec.Builder.newInstance().offset(0).limit(MAX_VALUE).filter(querySpec.getFilterExpression()).build();
return assetIndex.queryAssets(assetsQuery)
.map(asset -> {
var offers = contractDefinitions.stream()
.filter(definition -> definition.getAssetsSelector().stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and)
.test(asset))
.map(contractDefinition -> createOffer(contractDefinition, asset.getId()))
.filter(Objects::nonNull)
.collect(toList());
return new ProtoDataset(asset, offers);
})
.filter(ProtoDataset::hasOffers)
.map(asset -> toDataset(contractDefinitions, asset))
.filter(Dataset::hasOffers)
.skip(querySpec.getOffset())
.limit(querySpec.getLimit())
.map(proto -> {
var asset = proto.asset;
var offers = proto.offers;
var distributions = distributionResolver.getDistributions(asset, null); // TODO: data addresses should be retrieved
var datasetBuilder = Dataset.Builder.newInstance()
.distributions(distributions)
.properties(asset.getProperties());

offers.forEach(offer -> datasetBuilder.offer(offer.contractId, offer.policy.withTarget(asset.getId())));

return datasetBuilder.build();
});
}

private Offer createOffer(ContractDefinition definition, String assetId) {
var policyDefinition = policyDefinitionStore.findById(definition.getContractPolicyId());
if (policyDefinition == null) {
return null;
}
var contractId = ContractId.create(definition.getId(), assetId);
return new Offer(contractId.toString(), policyDefinition.getPolicy());
.limit(querySpec.getLimit());
}

private static class Offer {
private final String contractId;
private final Policy policy;

Offer(String contractId, Policy policy) {
this.contractId = contractId;
this.policy = policy;
}
@Override
public Dataset getById(ParticipantAgent agent, String id) {
var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).toList();
return Optional.of(id)
.map(assetIndex::findById)
.map(asset -> toDataset(contractDefinitions, asset))
.orElse(null);
}

private static class ProtoDataset {
private final Asset asset;
private final List<Offer> offers;

private ProtoDataset(Asset asset, List<Offer> offers) {
this.asset = asset;
this.offers = offers;
}
private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset asset) {

var distributions = distributionResolver.getDistributions(asset, null); // TODO: data addresses should be retrieved
var datasetBuilder = Dataset.Builder.newInstance()
.id(asset.getId())
.distributions(distributions)
.properties(asset.getProperties());

contractDefinitions.stream()
.filter(definition -> definition.getAssetsSelector().stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and)
.test(asset)
)
.forEach(contractDefinition -> {
var policyDefinition = policyDefinitionStore.findById(contractDefinition.getContractPolicyId());
if (policyDefinition != null) {
var contractId = ContractId.create(contractDefinition.getId(), asset.getId());
datasetBuilder.offer(contractId.toString(), policyDefinition.getPolicy().withTarget(asset.getId()));
}
});

boolean hasOffers() {
return offers.size() > 0;
}
return datasetBuilder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ class DatasetResolverImplPerformanceTest {

private final Clock clock = Clock.systemUTC();

@NotNull
private static PolicyDefinition.Builder createPolicyDefinition(String id) {
return PolicyDefinition.Builder.newInstance().id(id).policy(Policy.Builder.newInstance().build());
}

@BeforeEach
void setUp(EdcExtension extension) {
extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class));
Expand Down Expand Up @@ -100,6 +95,11 @@ void fewDefinitionsSelectAllAssets(DatasetResolver datasetResolver, ContractDefi
assertThat(lastPageDatasets).hasSize(100);
}

@NotNull
private PolicyDefinition.Builder createPolicyDefinition(String id) {
return PolicyDefinition.Builder.newInstance().id(id).policy(Policy.Builder.newInstance().build());
}

private Stream<Dataset> queryDatasetsIn(DatasetResolver datasetResolver, QuerySpec querySpec, Duration duration) {
var start = clock.instant();
var datasets = datasetResolver.query(new ParticipantAgent(emptyMap(), emptyMap()), querySpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,9 @@
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
Expand Down Expand Up @@ -87,7 +84,7 @@ void query_shouldReturnOneDatasetPerAsset() {
var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none());

assertThat(datasets).isNotNull().hasSize(1).first().satisfies(dataset -> {
assertThat(dataset.getId()).matches(isUuid());
assertThat(dataset.getId()).isEqualTo("assetId");
assertThat(dataset.getDistributions()).hasSize(1).first().isEqualTo(distribution);
assertThat(dataset.getOffers()).hasSize(1).allSatisfy((id, policy) -> {
assertThat(ContractId.parseId(id)).isSucceeded().extracting(ContractId::definitionPart).asString().isEqualTo("definitionId");
Expand Down Expand Up @@ -117,14 +114,14 @@ void query_shouldReturnOneDataset_whenMultipleDefinitionsOnSameAsset() {
contractDefinitionBuilder("definition1").contractPolicyId("policy1").build(),
contractDefinitionBuilder("definition2").contractPolicyId("policy2").build()
));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> Stream.of(createAsset("id").build()));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> Stream.of(createAsset("assetId").build()));
when(policyStore.findById("policy1")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy1).build());
when(policyStore.findById("policy2")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy2).build());

var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none());

assertThat(datasets).hasSize(1).first().satisfies(dataset -> {
assertThat(dataset.getId()).matches(isUuid());
assertThat(dataset.getId()).isEqualTo("assetId");
assertThat(dataset.getOffers()).hasSize(2)
.anySatisfy((id, policy) -> {
assertThat(ContractId.parseId(id)).isSucceeded().extracting(ContractId::definitionPart).asString().isEqualTo("definition1");
Expand Down Expand Up @@ -162,7 +159,7 @@ void query_shouldFilterAssetsByPassedCriteria() {
void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_contained() {
var contractDefinition = contractDefinitionBuilder("definitionId").contractPolicyId("contractPolicyId").build();
var contractPolicy = Policy.Builder.newInstance().build();
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList());
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList();
when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream());
when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
Expand All @@ -177,7 +174,7 @@ void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_contained()
void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_overflowing() {
var contractDefinition = contractDefinitionBuilder("definitionId").contractPolicyId("contractPolicyId").build();
var contractPolicy = Policy.Builder.newInstance().build();
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList());
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList();
when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream());
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
Expand All @@ -190,9 +187,9 @@ void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_overflowing(

@Test
void query_shouldLimitDataset_whenMultipleDefinitionAndMultipleAssets_across() {
var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).collect(toList());
var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).toList();
var contractPolicy = Policy.Builder.newInstance().build();
var assets = range(0, 20).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList());
var assets = range(0, 20).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList();
when(contractDefinitionResolver.definitionsFor(any())).thenAnswer(it -> contractDefinitions.stream());
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream());
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
Expand All @@ -205,9 +202,9 @@ void query_shouldLimitDataset_whenMultipleDefinitionAndMultipleAssets_across() {

@Test
void query_shouldLimitDataset_whenMultipleDefinitionsWithSameAssets() {
var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).collect(toList());
var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).toList();
var contractPolicy = Policy.Builder.newInstance().build();
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList());
var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList();
when(contractDefinitionResolver.definitionsFor(any())).thenAnswer(it -> contractDefinitions.stream());
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream());
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
Expand All @@ -216,12 +213,52 @@ void query_shouldLimitDataset_whenMultipleDefinitionsWithSameAssets() {
var datasets = datasetResolver.query(createParticipantAgent(), querySpec);

assertThat(datasets).hasSize(2)
.allSatisfy(dataset -> {
assertThat(dataset.getOffers()).hasSize(2);
})
.allSatisfy(dataset -> assertThat(dataset.getOffers()).hasSize(2))
.map(getId()).containsExactly("6", "7");
}

@Test
void getById_shouldReturnDataset() {
var policy1 = Policy.Builder.newInstance().type(SET).build();
var policy2 = Policy.Builder.newInstance().type(OFFER).build();
when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(
contractDefinitionBuilder("definition1").contractPolicyId("policy1").build(),
contractDefinitionBuilder("definition2").contractPolicyId("policy2").build()
));
when(assetIndex.findById(any())).thenReturn(createAsset("datasetId").build());
when(policyStore.findById("policy1")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy1).build());
when(policyStore.findById("policy2")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy2).build());
var participantAgent = createParticipantAgent();

var dataset = datasetResolver.getById(participantAgent, "datasetId");

assertThat(dataset).isNotNull();
assertThat(dataset.getId()).isEqualTo("datasetId");
assertThat(dataset.getOffers()).hasSize(2)
.anySatisfy((id, policy) -> {
assertThat(ContractId.parseId(id)).isSucceeded().extracting(ContractId::definitionPart).isEqualTo("definition1");
assertThat(policy.getType()).isEqualTo(SET);
})
.anySatisfy((id, policy) -> {
assertThat(ContractId.parseId(id)).isSucceeded().extracting(ContractId::definitionPart).isEqualTo("definition2");
assertThat(policy.getType()).isEqualTo(OFFER);
});
verify(assetIndex).findById("datasetId");
verify(contractDefinitionResolver).definitionsFor(participantAgent);
}

@Test
void getById_shouldReturnNull_whenAssetNotFound() {
when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(
contractDefinitionBuilder("definition1").contractPolicyId("policy1").build()
));
when(assetIndex.findById(any())).thenReturn(null);
var participantAgent = createParticipantAgent();

var dataset = datasetResolver.getById(participantAgent, "datasetId");

assertThat(dataset).isNull();
}

private ContractDefinition.Builder contractDefinitionBuilder(String id) {
return ContractDefinition.Builder.newInstance()
Expand All @@ -247,16 +284,4 @@ private ThrowingExtractor<Dataset, Object, RuntimeException> getId() {
return it -> it.getProperty(Asset.PROPERTY_ID);
}

@NotNull
private Predicate<String> isUuid() {
return it -> {
try {
UUID.fromString(it);
return true;
} catch (Exception e) {
return false;
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import org.eclipse.edc.catalog.spi.Catalog;
import org.eclipse.edc.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.catalog.spi.Dataset;
import org.eclipse.edc.catalog.spi.DatasetResolver;
import org.eclipse.edc.connector.spi.catalog.CatalogProtocolService;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.agent.ParticipantAgentService;
import org.eclipse.edc.spi.iam.ClaimToken;
import org.jetbrains.annotations.NotNull;

import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;

Expand All @@ -47,8 +49,8 @@ public CatalogProtocolServiceImpl(DatasetResolver datasetResolver,

@Override
@NotNull
public ServiceResult<Catalog> getCatalog(CatalogRequestMessage message, ClaimToken token) {
var agent = participantAgentService.createFor(token);
public ServiceResult<Catalog> getCatalog(CatalogRequestMessage message, ClaimToken claimToken) {
var agent = participantAgentService.createFor(claimToken);

try (var datasets = datasetResolver.query(agent, message.getQuerySpec())) {
var dataServices = dataServiceRegistry.getDataServices();
Expand All @@ -62,4 +64,17 @@ public ServiceResult<Catalog> getCatalog(CatalogRequestMessage message, ClaimTok
return ServiceResult.success(catalog);
}
}

@Override
public @NotNull ServiceResult<Dataset> getDataset(String datasetId, ClaimToken claimToken) {
var agent = participantAgentService.createFor(claimToken);

var dataset = datasetResolver.getById(agent, datasetId);

if (dataset == null) {
return ServiceResult.notFound(format("Dataset %s does not exist", datasetId));
}

return ServiceResult.success(dataset);
}
}
Loading

0 comments on commit 2999513

Please sign in to comment.