Skip to content

Commit

Permalink
added complete impl of refresh service
Browse files Browse the repository at this point in the history
  • Loading branch information
adrikagupta committed May 2, 2024
1 parent 6495c92 commit eb92835
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 188 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;
import java.util.List;

public class ListSourceControlMetadataResponse {
private final List<SourceControlMetadataRecord> apps;
private final String nextPageToken;
private final Long lastRefreshTime;

public ListSourceControlMetadataResponse(List<SourceControlMetadataRecord> apps,
String nextPageToken, Long lastRefreshTime) {
this.apps = apps;
this.nextPageToken = nextPageToken;
this.lastRefreshTime = lastRefreshTime;
}

public List<SourceControlMetadataRecord> getApps() {
return apps;
}

public String getNextPageToken() {
return nextPageToken;
}

public Long getLastRefreshTime() {
return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ForbiddenException;
Expand Down Expand Up @@ -58,7 +59,11 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import javafx.util.Pair;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -155,27 +160,29 @@ public void listAllApplications(FullHttpRequest request, HttpResponder responder
@QueryParam("filter") String filter) throws Exception {
checkSourceControlFeatureFlag();
validateNamespaceId(namespaceId);
JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
//jsonListResponder.send(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null :
record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
Pair<Boolean, Long> limitReachedAndLastRefreshPair = new Pair<>(false, null);
try {
limitReachedAndLastRefreshPair = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
apps.add(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = limitReachedAndLastRefreshPair.getKey() || record == null ? null :
record.getName();
Long lastRefreshTime = limitReachedAndLastRefreshPair.getValue();
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ public boolean scanSourceControlMetadata(ScanSourceControlMetadataRequest reques

currentLimit -= txBatchSize;
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.cdap.cdap.api.artifact.ArtifactSummary;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.api.metrics.MetricsContext;
import io.cdap.cdap.api.security.store.SecureStore;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.app.store.Store;
import io.cdap.cdap.common.ForbiddenException;
import io.cdap.cdap.common.NamespaceNotFoundException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.RepositoryNotFoundException;
Expand All @@ -33,6 +35,9 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants.Metrics.SourceControlManagement;
import io.cdap.cdap.common.conf.Constants.Metrics.Tag;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.namespace.NamespaceAdmin;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
Expand Down Expand Up @@ -64,14 +69,11 @@
import io.cdap.cdap.sourcecontrol.RepositoryManager;
import io.cdap.cdap.sourcecontrol.SourceControlConfig;
import io.cdap.cdap.sourcecontrol.SourceControlException;
import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository;
import io.cdap.cdap.sourcecontrol.operationrunner.PullAppOperationRequest;
import io.cdap.cdap.sourcecontrol.operationrunner.PullAppResponse;
import io.cdap.cdap.sourcecontrol.operationrunner.PushAppMeta;
import io.cdap.cdap.sourcecontrol.operationrunner.PushAppOperationRequest;
import io.cdap.cdap.sourcecontrol.operationrunner.PushAppsResponse;
import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp;
import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse;
import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.TableNotFoundException;
Expand All @@ -81,34 +83,43 @@
import io.cdap.cdap.store.RepositoryTable;
import java.io.IOException;
import java.time.Clock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javafx.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.AbstractIdleService;

/**
* Service that manages source control for repositories and applications. It exposes repository CRUD
* apis and source control tasks that do pull/pull/list applications in linked repository.
*/
public class SourceControlManagementService {
public class SourceControlManagementService extends AbstractIdleService {

private final AccessEnforcer accessEnforcer;
private final AuthenticationContext authenticationContext;
private final TransactionRunner transactionRunner;
private final CConfiguration cConf;
private final SecureStore secureStore;
private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService;
private final SourceControlOperationRunner sourceControlOperationRunner;
private final ApplicationLifecycleService appLifecycleService;
private final Store store;
private HashMap<NamespaceId, SourceControlMetadataRefreshService> refreshSchedulers = new HashMap<>();
private final OperationLifecycleManager operationLifecycleManager;
private final MetricsCollectionService metricsCollectionService;
private final Clock clock;
private final NamespaceAdmin namespaceAdmin;
private final FeatureFlagsProvider featureFlagsProvider;
private static final Logger LOG = LoggerFactory.getLogger(SourceControlManagementService.class);


/**
/**
* Constructor for SourceControlManagementService with all params injected via guice.
*/
@Inject
Expand All @@ -122,12 +133,12 @@ public SourceControlManagementService(CConfiguration cConf,
Store store,
OperationLifecycleManager operationLifecycleManager,
MetricsCollectionService metricsCollectionService,
SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
NamespaceAdmin namespaceAdmin) {
this (cConf, secureStore, transactionRunner,
accessEnforcer, authenticationContext,
sourceControlOperationRunner, applicationLifecycleService,
store, operationLifecycleManager, metricsCollectionService, Clock.systemUTC(),
sourceControlMetadataRefreshService);
namespaceAdmin);
}

@VisibleForTesting
Expand All @@ -141,7 +152,7 @@ public SourceControlManagementService(CConfiguration cConf,
Store store,
OperationLifecycleManager operationLifecycleManager,
MetricsCollectionService metricsCollectionService,
Clock clock, SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
Clock clock, NamespaceAdmin namespaceAdmin) {
this.cConf = cConf;
this.secureStore = secureStore;
this.transactionRunner = transactionRunner;
Expand All @@ -153,7 +164,8 @@ public SourceControlManagementService(CConfiguration cConf,
this.operationLifecycleManager = operationLifecycleManager;
this.metricsCollectionService = metricsCollectionService;
this.clock = clock;
this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService;
this.namespaceAdmin = namespaceAdmin;
this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
}

private RepositoryTable getRepositoryTable(StructuredTableContext context)
Expand Down Expand Up @@ -192,7 +204,7 @@ public RepositoryMeta setRepository(NamespaceId namespace, RepositoryConfig repo

RepositoryTable repoTable = getRepositoryTable(context);
repoTable.create(namespace, repository);
sourceControlMetadataRefreshService.runRefreshService(true, namespace);
addRefreshService(namespace);
return repoTable.get(namespace);
}, NamespaceNotFoundException.class);
}
Expand All @@ -210,6 +222,8 @@ public void deleteRepository(NamespaceId namespace) {
RepositoryTable repoTable = getRepositoryTable(context);
repoTable.delete(namespace);
});

removeRefreshService(namespace);
}

/**
Expand Down Expand Up @@ -401,33 +415,11 @@ private PullAppResponse<?> pullAndValidateApplication(ApplicationReference appRe
* @throws AuthenticationConfigException If an authentication configuration error occurs.
* @throws IOException If an I/O error occurs during the scanning process.
*/
public boolean scanRepoMetadata(ScanSourceControlMetadataRequest request, int txBatchSize,
public Pair<Boolean, Long> scanRepoMetadata(ScanSourceControlMetadataRequest request, int txBatchSize,
Consumer<SourceControlMetadataRecord> consumer) throws IOException {
NamespaceId namespaceId = new NamespaceId(request.getNamespace());
// accessEnforcer.enforce(namespaceId, authenticationContext.getPrincipal(),
// NamespacePermission.READ_REPOSITORY);
// RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig();
// // TODO(CDAP-20993): List API is used here for testing. It will be moved to a separate background job in the next PR
// RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list(
// new NamespaceRepository(namespaceId, repoConfig));
// LOG.debug("Successfully received apps in namespace {} from repository : response: {}",
// namespaceId,
// repositoryAppsResponse);
// // Cleaning up the repo source control metadata table
// HashSet<String> repoFileNames = new HashSet<>();
// for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) {
// repoFileNames.add(repoApp.getName());
// }
// TransactionRunners.run(transactionRunner, context -> {
// getRepoSourceControlMetadataStore(context).cleanupRepoSourceControlMeta(
// namespaceId.getNamespace(), repoFileNames);
// });
// // Updating the namespace and repo source control metadata table
// for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) {
// store.updateSourceControlMeta(
// new ApplicationReference(request.getNamespace(),
// repoApp.getName()), repoApp.getFileHash());
// }
// Get last refresh time of metadata refresh service of given namespace
Long lastRefreshTime = refreshSchedulers.get(namespaceId) == null ? null :refreshSchedulers.get(namespaceId).getLastRefreshTime();
// Getting repo files
String lastKey = request.getScanAfter();
int currentLimit = request.getLimit();
Expand All @@ -447,16 +439,24 @@ public boolean scanRepoMetadata(ScanSourceControlMetadataRequest request, int tx
}, IOException.class);

if (count < maxLimit) {
return false;
return new Pair<Boolean, Long>(false, lastRefreshTime);
}
currentLimit -= txBatchSize;
}
// Triggering source control metadata refresh service
try {
sourceControlMetadataRefreshService.runRefreshService(false, namespaceId);
checkSourceControlMetadataAutoRefreshFlag();
if (refreshSchedulers.containsKey(namespaceId)) {
refreshSchedulers.get(namespaceId).runRefreshService(false);
}
else{
addRefreshService(namespaceId);
}
} catch (Exception e){
// log it
LOG.error("Failed to refresh source control metadata for namespace"
+ namespaceId.getNamespace(), e);
}
return true;
return new Pair<Boolean, Long>(true, lastRefreshTime);
}

/**
Expand Down Expand Up @@ -513,4 +513,50 @@ private MetricsContext getMetricContext(NamespaceId namespace) {
return metricsCollectionService.getContext(
ImmutableMap.of(Tag.NAMESPACE, namespace.getNamespace()));
}

@Override
protected void startUp() throws Exception {
LOG.info("Starting SourceControlManagementService");

List<NamespaceId> namespaceIds = namespaceAdmin.list().stream()
.map(meta -> meta.getNamespaceId()).collect(Collectors.toList());
for (NamespaceId namespace : namespaceIds) {
refreshSchedulers.put(namespace, createRefreshService(namespace));
refreshSchedulers.get(namespace).start();
}
}

@Override
protected void shutDown() throws Exception {
LOG.info("Shutting down SourceControlManagementService");

for (SourceControlMetadataRefreshService service : refreshSchedulers.values()) {
service.stop();
}
}

private SourceControlMetadataRefreshService createRefreshService(NamespaceId namespace) {
return new SourceControlMetadataRefreshService(cConf, accessEnforcer, authenticationContext,
transactionRunner,
sourceControlOperationRunner, store,
namespace);
}

private void addRefreshService(NamespaceId namespaceId) {
refreshSchedulers.put(namespaceId, createRefreshService(namespaceId));
refreshSchedulers.get(namespaceId).start();
refreshSchedulers.get(namespaceId).runOneIteration();
}

private void removeRefreshService(NamespaceId namespaceId) {
if(refreshSchedulers.containsKey(namespaceId)) {
refreshSchedulers.get(namespaceId).stop();
refreshSchedulers.remove(namespaceId);
}
}
private void checkSourceControlMetadataAutoRefreshFlag() throws ForbiddenException {
if (!Feature.SOURCE_CONTROL_METADATA_AUTO_REFRESH.isEnabled(featureFlagsProvider)) {
throw new ForbiddenException("Source Control Metadata Auto Refresh feature is not enabled.");
}
}
}
Loading

0 comments on commit eb92835

Please sign in to comment.