Skip to content

Commit

Permalink
[HWORKS-1104] Explicit storage connector provenance (#1524)
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed May 2, 2024
1 parent d5bf5e6 commit 827bec9
Show file tree
Hide file tree
Showing 16 changed files with 283 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,14 @@ def make_sample_query(project, featurestore_id, featuregroup_suffix: "")
query
end

def get_storage_connector_links(project_id, name, feature_store_id: nil, feature_store_project_id: nil,
upstreamLvls: 1, downstreamLvls: 1, expected_status: 200)
feature_store_project_id = project_id if feature_store_project_id.nil?
feature_store_id = get_featurestore(project_id, fs_project_id: feature_store_project_id)["featurestoreId"] if feature_store_id.nil?
artifactPath = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{feature_store_id}/storageconnectors/#{name}"
get_featurestore_provenance_explicit_links(artifactPath, upstreamLvls: upstreamLvls, downstreamLvls: downstreamLvls, expected_status: expected_status)
end

def get_feature_group_links(project_id, id, feature_store_id: nil, feature_store_project_id: nil,
upstreamLvls: 1, downstreamLvls: 1, expected_status: 200)
feature_store_project_id = project_id if feature_store_project_id.nil?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.hops.hopsworks.api.filter.NoCacheResponse;
import io.hops.hopsworks.api.auth.key.ApiKeyRequired;
import io.hops.hopsworks.api.jwt.JWTHelper;
import io.hops.hopsworks.api.provenance.StorageConnectorProvenanceResource;
import io.hops.hopsworks.common.api.ResourceRequest;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
Expand Down Expand Up @@ -51,6 +52,7 @@
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.ws.rs.BeanParam;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -97,6 +99,8 @@ public class FeaturestoreStorageConnectorService {
private StorageConnectorUtil storageConnectorUtil;
@EJB
private ConnectionChecker connectionChecker;
@Inject
private StorageConnectorProvenanceResource provenanceResource;

private Project project;
private Featurestore featurestore;
Expand Down Expand Up @@ -166,7 +170,7 @@ public Response getStorageConnector(@Context SecurityContext sc,
Users user = jWTHelper.getUserPrincipal(sc);
verifyStorageConnectorName(connectorName);
FeaturestoreStorageConnectorDTO featurestoreStorageConnectorDTO =
storageConnectorController.getConnectorWithName(user, project, featurestore, connectorName);
storageConnectorController.getConnectorDTOWithName(user, project, featurestore, connectorName);
GenericEntity<FeaturestoreStorageConnectorDTO> featurestoreStorageConnectorDTOGenericEntity =
new GenericEntity<FeaturestoreStorageConnectorDTO>(featurestoreStorageConnectorDTO) {
};
Expand Down Expand Up @@ -259,7 +263,7 @@ public Response updateStorageConnector(@ApiParam(value = "Name of the storage co
storageConnectorController.updateStorageConnector(user, project, featurestore,
featurestoreStorageConnectorInputDTO, connectorName);
FeaturestoreStorageConnectorDTO featurestoreStorageConnectorDTO =
storageConnectorController.getConnectorWithName(user, project, featurestore, connectorName);
storageConnectorController.getConnectorDTOWithName(user, project, featurestore, connectorName);
GenericEntity<FeaturestoreStorageConnectorDTO> featurestoreStorageConnectorDTOGenericEntity =
new GenericEntity<FeaturestoreStorageConnectorDTO>(featurestoreStorageConnectorDTO) {};
return noCacheResponse.getNoCacheResponseBuilder(Response.Status.OK)
Expand Down Expand Up @@ -406,8 +410,18 @@ public Response testConnection(@Context SecurityContext sc,
.entity(resultDto).build();
}

@Path("/{connectorName}/provenance")
public StorageConnectorProvenanceResource provenance(
@ApiParam(value = "Name of the storage connector", required = true)
@PathParam("connectorName") String connectorName) {
this.provenanceResource.setProject(project);
this.provenanceResource.setFeatureStore(featurestore);
this.provenanceResource.setConnectorName(connectorName);
return this.provenanceResource;
}

private ResourceRequest makeResourceRequest(StorageConnectorBeanParam param) {
ResourceRequest resourceRequest = new ResourceRequest(ResourceRequest.Name.STORAGECONNECTOR);
ResourceRequest resourceRequest = new ResourceRequest(ResourceRequest.Name.STORAGECONNECTORS);
resourceRequest.setOffset(param.getPagination().getOffset());
resourceRequest.setLimit(param.getPagination().getLimit());
resourceRequest.setSort(param.getParsedSortBy());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* This file is part of Hopsworks
* Copyright (C) 2024, Hopsworks AB. All rights reserved
*
* Hopsworks is free software: you can redistribute it and/or modify it under the terms of
* the GNU Affero General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along with this program.
* If not, see <https://www.gnu.org/licenses/>.
*/
package io.hops.hopsworks.api.provenance;

import io.hops.hopsworks.api.auth.key.ApiKeyRequired;
import io.hops.hopsworks.api.filter.AllowedProjectRoles;
import io.hops.hopsworks.api.filter.Audience;
import io.hops.hopsworks.api.jwt.JWTHelper;
import io.hops.hopsworks.api.provenance.explicit.ExplicitProvenanceExpansionBeanParam;
import io.hops.hopsworks.api.provenance.explicit.ProvExplicitLinksBuilder;
import io.hops.hopsworks.api.provenance.explicit.dto.ProvExplicitLinkDTO;
import io.hops.hopsworks.api.provenance.ops.ProvLinksBeanParams;
import io.hops.hopsworks.common.api.ResourceRequest;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController;
import io.hops.hopsworks.common.provenance.explicit.ProvExplicitControllerIface;
import io.hops.hopsworks.common.provenance.explicit.ProvExplicitLink;
import io.hops.hopsworks.common.provenance.ops.dto.ProvLinksDTO;
import io.hops.hopsworks.exceptions.DatasetException;
import io.hops.hopsworks.exceptions.FeatureStoreMetadataException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.MetadataException;
import io.hops.hopsworks.exceptions.ModelRegistryException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.jwt.annotation.JWTRequired;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.security.apiKey.ApiScope;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;

import javax.ejb.EJB;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.BeanParam;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import java.io.IOException;

@RequestScoped
@TransactionAttribute(TransactionAttributeType.NEVER)
@Api(value = "Storage Connector Explicit Provenance Resource")
public class StorageConnectorProvenanceResource {
@EJB
private JWTHelper jwtHelper;
@Inject
private ProvExplicitControllerIface provCtrl;
@EJB
private ProvExplicitLinksBuilder linksBuilder;
@EJB
private FeaturestoreStorageConnectorController storageConnectorController;
private Project project;
private Featurestore featureStore;
private String connectorName;

public void setProject(Project project) {
this.project = project;
}

public void setFeatureStore(Featurestore featureStore) {
this.featureStore = featureStore;
}

public void setConnectorName(String connectorName) {
this.connectorName = connectorName;
}

@GET
@Path("links")
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.MODELREGISTRY},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiOperation(value = "Links Provenance query endpoint",
response = ProvLinksDTO.class)
public Response getLinks(
@BeanParam ProvLinksBeanParams params,
@BeanParam LinksPagination pagination,
@BeanParam ExplicitProvenanceExpansionBeanParam explicitProvenanceExpansionBeanParam,
@Context UriInfo uriInfo,
@Context HttpServletRequest req,
@Context SecurityContext sc)
throws GenericException, FeaturestoreException, DatasetException, ServiceException, MetadataException,
FeatureStoreMetadataException, IOException, ModelRegistryException {
Users user = jwtHelper.getUserPrincipal(sc);
ResourceRequest resourceRequest = new ResourceRequest(ResourceRequest.Name.PROVENANCE);
resourceRequest.setExpansions(explicitProvenanceExpansionBeanParam.getResources());
FeaturestoreConnector connector =
storageConnectorController.getConnectorWithName(user, project, featureStore, connectorName);
ProvExplicitLink<FeaturestoreConnector> provenance = provCtrl.storageConnectorLinks(project, connector,
pagination.getUpstreamLvls(), pagination.getDownstreamLvls());
ProvExplicitLinkDTO<?> result = linksBuilder.build(uriInfo, resourceRequest, project, user, provenance);
return Response.ok().entity(result).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.hops.hopsworks.common.featurestore.featuregroup.ondemand.OnDemandFeaturegroupDTO;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO;
import io.hops.hopsworks.common.featurestore.featureview.FeatureViewDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.featurestore.trainingdatasets.TrainingDatasetDTO;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.provenance.explicit.ProvArtifact;
Expand All @@ -46,6 +48,7 @@
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featureview.FeatureView;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
import io.hops.hopsworks.persistence.entity.featurestore.trainingdataset.TrainingDataset;
import io.hops.hopsworks.persistence.entity.models.version.ModelVersion;
import io.hops.hopsworks.persistence.entity.project.Project;
Expand All @@ -71,6 +74,8 @@ public class ProvExplicitLinksBuilder {
@EJB
private FeaturestoreUtils featurestoreUtils;
@EJB
private FeaturestoreStorageConnectorController featurestoreStorageConnectorController;
@EJB
private FeaturegroupController featuregroupController;
@EJB
private FeatureViewBuilder featureViewBuilder;
Expand All @@ -83,14 +88,22 @@ public ProvExplicitLinksBuilder() {}

//test
public ProvExplicitLinksBuilder(FeaturestoreUtils featurestoreUtils,
FeaturestoreStorageConnectorController featurestoreStorageConnectorController,
FeaturegroupController featuregroupController,
FeatureViewBuilder featureViewBuilder,
TrainingDatasetDTOBuilder trainingDatasetBuilder) {
this.featurestoreUtils = featurestoreUtils;
this.featurestoreStorageConnectorController = featurestoreStorageConnectorController;
this.featuregroupController = featuregroupController;
this.featureViewBuilder = featureViewBuilder;
this.trainingDatasetBuilder = trainingDatasetBuilder;
}

public UriBuilder storageConnectorURI(UriInfo uriInfo, Project accessProject, FeaturestoreConnector connector) {
return featurestoreUtils.storageConnectorByNameURI(uriInfo.getBaseUriBuilder(), accessProject, connector)
.path(ResourceRequest.Name.PROVENANCE.toString().toLowerCase())
.path(ResourceRequest.Name.LINKS.toString().toLowerCase());
}

public UriBuilder featureGroupURI(UriInfo uriInfo, Project accessProject, Featuregroup featureGroup) {
return featurestoreUtils.featureGroupByIdURI(uriInfo.getBaseUriBuilder(), accessProject, featureGroup)
Expand Down Expand Up @@ -146,7 +159,16 @@ public ProvExplicitLinkDTO<?> build(UriInfo uriInfo, ResourceRequest resourceReq
boolean expandLink = resourceRequest != null && resourceRequest.contains(ResourceRequest.Name.PROVENANCE);
boolean expandArtifact = resourceRequest != null
&& resourceRequest.contains(ResourceRequest.Name.PROVENANCE_ARTIFACTS);
if (links.getNode() instanceof Featuregroup) {
if (links.getNode() instanceof FeaturestoreConnector) {
if (expandLink) {
return storageConnectorLink(uriInfo, accessProject, user, expandArtifact, links);
} else {
ProvExplicitLinkDTO<?> linksDTO = new ProvExplicitLinkDTO<>();
FeaturestoreConnector connector = (FeaturestoreConnector) links.getNode();
linksDTO.setHref(storageConnectorURI(uriInfo, accessProject, connector).build());
return linksDTO;
}
} else if (links.getNode() instanceof Featuregroup) {
if (expandLink) {
return featureGroupLink(uriInfo, accessProject, user, expandArtifact, links);
} else {
Expand Down Expand Up @@ -185,6 +207,35 @@ public ProvExplicitLinkDTO<?> build(UriInfo uriInfo, ResourceRequest resourceReq
}
return null;
}

private ProvExplicitLinkDTO storageConnectorLink(UriInfo uriInfo, Project accessProject, Users user,
boolean expandArtifact, ProvExplicitLink links)
throws FeaturestoreException, ServiceException, MetadataException, FeatureStoreMetadataException,
DatasetException, IOException, GenericException, ModelRegistryException {
ProvExplicitLinkDTO<FeaturestoreStorageConnectorDTO> linksDTO = new ProvExplicitLinkDTO<>();
RestDTO artifactDTO;
if(links.isDeleted()) {
ProvArtifact artifact = (ProvArtifact) links.getNode();
artifactDTO = new ProvArtifactDTO(artifact.getId(), artifact.getProject(),
artifact.getName(), artifact.getVersion());
} else {
FeaturestoreConnector connector = (FeaturestoreConnector) links.getNode();
linksDTO.setHref(storageConnectorURI(uriInfo, accessProject, connector).build());
if (links.isAccessible() && expandArtifact) {
artifactDTO = featurestoreStorageConnectorController.convertToConnectorDTO(user, accessProject, connector);
} else {
ProvArtifact artifact = ProvArtifact.fromFeaturestoreConnector(connector);
artifactDTO = new ProvArtifactDTO(artifact.getId(), artifact.getProject(),
artifact.getName(), artifact.getVersion());
}
URI href =
featurestoreUtils.storageConnectorByNameURI(uriInfo.getBaseUriBuilder(), accessProject, connector).build();
artifactDTO.setHref(href);
}
linksDTO.setNode(buildNodeDTO(links, artifactDTO));
traverseLinks(uriInfo, accessProject, user, linksDTO, expandArtifact, links);
return linksDTO;
}

private ProvExplicitLinkDTO featureGroupLink(UriInfo uriInfo, Project accessProject, Users user,
boolean expandArtifact, ProvExplicitLink links)
Expand Down Expand Up @@ -347,6 +398,8 @@ private ProvExplicitLinkDTO<?> traverseLinksInt(UriInfo uriInfo, Project accessP
throws FeaturestoreException, ServiceException, DatasetException, MetadataException,
FeatureStoreMetadataException, IOException, GenericException, ModelRegistryException {
switch(link.getArtifactType()) {
case STORAGE_CONNECTOR:
return storageConnectorLink(uriInfo, accessProject, user, expandArtifact, link);
case FEATURE_GROUP:
return featureGroupLink(uriInfo, accessProject, user, expandArtifact, link);
case FEATURE_VIEW:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public enum Name {
SCHEDULE,
ENVIRONMENT_HISTORY,
FEATURE_MONITORING,
STORAGECONNECTOR;
STORAGECONNECTORS;

public static Name fromString(String name) {
return valueOf(name.toUpperCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package io.hops.hopsworks.common.featurestore.featuregroup;

import com.google.common.collect.Lists;
import io.hops.hopsworks.common.dao.AbstractFacade;
import io.hops.hopsworks.common.dao.QueryParam;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;

import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
Expand All @@ -30,6 +34,7 @@
import javax.persistence.Query;
import javax.persistence.TypedQuery;
import javax.validation.ConstraintViolationException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
Expand All @@ -45,6 +50,9 @@ public class FeaturegroupFacade extends AbstractFacade<Featuregroup> {
private static final Logger LOGGER = Logger.getLogger(FeaturegroupFacade.class.getName());
@PersistenceContext(unitName = "kthfsPU")
private EntityManager em;

@EJB
private Settings settings;

/**
* Class constructor, invoke parent class and initialize Hive Queries
Expand Down Expand Up @@ -177,7 +185,25 @@ public Long countByFeaturestore(Featurestore featurestore) {
.setParameter("featurestore", featurestore)
.getSingleResult();
}


public List<Featuregroup> findByStorageConnectors(List<FeaturestoreConnector> storageConnectors) {
if (storageConnectors.size() > settings.getSQLMaxSelectIn()) {
List<Featuregroup> result = new ArrayList<>();
for(List<FeaturestoreConnector> partition : Lists.partition(storageConnectors, settings.getSQLMaxSelectIn())) {
TypedQuery<Featuregroup> query =
em.createNamedQuery("Featuregroup.findByStorageConnectors", Featuregroup.class);
query.setParameter("storageConnectors", partition);
result.addAll(query.getResultList());
}
return result;
} else {
TypedQuery<Featuregroup> query =
em.createNamedQuery("Featuregroup.findByStorageConnectors", Featuregroup.class);
query.setParameter("storageConnectors", storageConnectors);
return query.getResultList();
}
}

/**
* Transaction to create a new featuregroup in the database
*
Expand Down
Loading

0 comments on commit 827bec9

Please sign in to comment.