Skip to content

Commit

Permalink
added implementation for periodic refresh of soure control metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
adrikagupta committed Apr 29, 2024
1 parent bcccb19 commit c6c06ec
Show file tree
Hide file tree
Showing 10 changed files with 424 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ int scanAppSourceControlMetadata(ScanSourceControlMetadataRequest request,
int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request,
Consumer<SourceControlMetadataRecord> consumer);

void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash);
void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash, Long lastRefreshTime);

/**
* Returns a Map of {@link ApplicationMeta} for the given set of {@link ApplicationId}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

/**
* Represents the sync status of source control metadata.
* It is used in filtering.
*/
public enum SyncStatus {
SYNCED,
UNSYNCED
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ record -> {
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (NotFoundException e) {
responder.sendString(HttpResponseStatus.NOT_FOUND, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class SourceControlManagementService {
private final TransactionRunner transactionRunner;
private final CConfiguration cConf;
private final SecureStore secureStore;
private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService;
private final SourceControlOperationRunner sourceControlOperationRunner;
private final ApplicationLifecycleService appLifecycleService;
private final Store store;
Expand All @@ -120,11 +121,13 @@ public SourceControlManagementService(CConfiguration cConf,
ApplicationLifecycleService applicationLifecycleService,
Store store,
OperationLifecycleManager operationLifecycleManager,
MetricsCollectionService metricsCollectionService) {
MetricsCollectionService metricsCollectionService,
SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
this (cConf, secureStore, transactionRunner,
accessEnforcer, authenticationContext,
sourceControlOperationRunner, applicationLifecycleService,
store, operationLifecycleManager, metricsCollectionService, Clock.systemUTC());
store, operationLifecycleManager, metricsCollectionService, Clock.systemUTC(),
sourceControlMetadataRefreshService);
}

@VisibleForTesting
Expand All @@ -138,7 +141,7 @@ public SourceControlManagementService(CConfiguration cConf,
Store store,
OperationLifecycleManager operationLifecycleManager,
MetricsCollectionService metricsCollectionService,
Clock clock) {
Clock clock, SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
this.cConf = cConf;
this.secureStore = secureStore;
this.transactionRunner = transactionRunner;
Expand All @@ -150,6 +153,7 @@ public SourceControlManagementService(CConfiguration cConf,
this.operationLifecycleManager = operationLifecycleManager;
this.metricsCollectionService = metricsCollectionService;
this.clock = clock;
this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService;
}

private RepositoryTable getRepositoryTable(StructuredTableContext context)
Expand Down Expand Up @@ -188,6 +192,7 @@ public RepositoryMeta setRepository(NamespaceId namespace, RepositoryConfig repo

RepositoryTable repoTable = getRepositoryTable(context);
repoTable.create(namespace, repository);
sourceControlMetadataRefreshService.runRefreshService(true, namespace);
return repoTable.get(namespace);
}, NamespaceNotFoundException.class);
}
Expand Down Expand Up @@ -397,33 +402,32 @@ private PullAppResponse<?> pullAndValidateApplication(ApplicationReference appRe
* @throws IOException If an I/O error occurs during the scanning process.
*/
public boolean scanRepoMetadata(ScanSourceControlMetadataRequest request, int txBatchSize,
Consumer<SourceControlMetadataRecord> consumer) throws NotFoundException,
AuthenticationConfigException, IOException {
Consumer<SourceControlMetadataRecord> consumer) throws IOException {
NamespaceId namespaceId = new NamespaceId(request.getNamespace());
accessEnforcer.enforce(namespaceId, authenticationContext.getPrincipal(),
NamespacePermission.READ_REPOSITORY);
RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig();
// TODO(CDAP-20993): List API is used here for testing. It will be moved to a separate background job in the next PR
RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list(
new NamespaceRepository(namespaceId, repoConfig));
LOG.debug("Successfully received apps in namespace {} from repository : response: {}",
namespaceId,
repositoryAppsResponse);
// Cleaning up the repo source control metadata table
HashSet<String> repoFileNames = new HashSet<>();
for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) {
repoFileNames.add(repoApp.getName());
}
TransactionRunners.run(transactionRunner, context -> {
getRepoSourceControlMetadataStore(context).cleanupRepoSourceControlMeta(
namespaceId.getNamespace(), repoFileNames);
});
// Updating the namespace and repo source control metadata table
for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) {
store.updateSourceControlMeta(
new ApplicationReference(request.getNamespace(),
repoApp.getName()), repoApp.getFileHash());
}
// accessEnforcer.enforce(namespaceId, authenticationContext.getPrincipal(),
// NamespacePermission.READ_REPOSITORY);
// RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig();
// // TODO(CDAP-20993): List API is used here for testing. It will be moved to a separate background job in the next PR
// RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list(
// new NamespaceRepository(namespaceId, repoConfig));
// LOG.debug("Successfully received apps in namespace {} from repository : response: {}",
// namespaceId,
// repositoryAppsResponse);
// // Cleaning up the repo source control metadata table
// HashSet<String> repoFileNames = new HashSet<>();
// for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) {
// repoFileNames.add(repoApp.getName());
// }
// TransactionRunners.run(transactionRunner, context -> {
// getRepoSourceControlMetadataStore(context).cleanupRepoSourceControlMeta(
// namespaceId.getNamespace(), repoFileNames);
// });
// // Updating the namespace and repo source control metadata table
// for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) {
// store.updateSourceControlMeta(
// new ApplicationReference(request.getNamespace(),
// repoApp.getName()), repoApp.getFileHash());
// }
// Getting repo files
String lastKey = request.getScanAfter();
int currentLimit = request.getLimit();
Expand All @@ -447,6 +451,11 @@ public boolean scanRepoMetadata(ScanSourceControlMetadataRequest request, int tx
}
currentLimit -= txBatchSize;
}
try {
sourceControlMetadataRefreshService.runRefreshService(false, namespaceId);
} catch (Exception e){
// log it
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.services;

import com.google.common.util.concurrent.AbstractScheduledService;
import io.cdap.cdap.app.store.Store;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.RepositoryNotFoundException;
import io.cdap.cdap.common.namespace.NamespaceAdmin;
import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore;
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.security.NamespacePermission;
import io.cdap.cdap.proto.security.StandardPermission;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta;
import io.cdap.cdap.security.spi.authentication.AuthenticationContext;
import io.cdap.cdap.security.spi.authorization.AccessEnforcer;
import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository;
import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp;
import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse;
import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.TableNotFoundException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.cdap.store.RepositoryTable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceControlMetadataRefreshService extends AbstractScheduledService {

private List<NamespaceId> namespaces;
private HashMap<NamespaceId, Long> lastRefreshTimes;

private final AccessEnforcer accessEnforcer;
private final AuthenticationContext authenticationContext;

private final NamespaceAdmin namespaceAdmin;
private ScheduledExecutorService executor;
private final TransactionRunner transactionRunner;
private final SourceControlOperationRunner sourceControlOperationRunner;
private final Store store;
private static final Logger LOG = LoggerFactory.getLogger(
SourceControlMetadataRefreshService.class);

public SourceControlMetadataRefreshService(AccessEnforcer accessEnforcer,
AuthenticationContext authenticationContext, NamespaceAdmin namespaceAdmin,
TransactionRunner transactionRunner,
SourceControlOperationRunner sourceControlOperationRunner, Store store) {
this.accessEnforcer = accessEnforcer;
this.authenticationContext = authenticationContext;
this.namespaceAdmin = namespaceAdmin;
this.transactionRunner = transactionRunner;
this.sourceControlOperationRunner = sourceControlOperationRunner;
this.store = store;
}

private void setNamespaces(List<NamespaceId> namespaceIds) {
this.namespaces = namespaceIds;
}

private RepositorySourceControlMetadataStore getRepoSourceControlMetadataStore(
StructuredTableContext context) {
return RepositorySourceControlMetadataStore.create(context);
}

@Override
protected final ScheduledExecutorService executor() {
executor = Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("source-control-metadata-refresh-service"));
return executor;
}

private RepositoryTable getRepositoryTable(StructuredTableContext context)
throws TableNotFoundException {
return new RepositoryTable(context);
}


public RepositoryMeta getRepositoryMeta(NamespaceId namespace)
throws RepositoryNotFoundException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), StandardPermission.GET);

return TransactionRunners.run(transactionRunner, context -> {
RepositoryTable table = getRepositoryTable(context);
RepositoryMeta repoMeta = table.get(namespace);
if (repoMeta == null) {
throw new RepositoryNotFoundException(namespace);
}

return repoMeta;
}, RepositoryNotFoundException.class);
}

@Override
protected void runOneIteration() {
try {
if (namespaces == null) {
setNamespaces(namespaceAdmin.list().stream()
.map(meta -> meta.getNamespaceId()).collect(Collectors.toList()));

}

for (NamespaceId namespaceId : namespaces) {
RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig();

accessEnforcer.enforce(namespaceId, authenticationContext.getPrincipal(),
NamespacePermission.READ_REPOSITORY);
RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list(
new NamespaceRepository(namespaceId, repoConfig));

// Cleaning up the repo source control metadata table
HashSet<String> repoFileNames = new HashSet<>();
for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) {
repoFileNames.add(repoApp.getName());
}
TransactionRunners.run(transactionRunner, context -> {
getRepoSourceControlMetadataStore(context).cleanupRepoSourceControlMeta(
namespaceId.getNamespace(), repoFileNames);
});
// Updating the namespace and repo source control metadata table
for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) {
store.updateSourceControlMeta(
new ApplicationReference(namespaceId.getNamespace(), repoApp.getName()),
repoApp.getFileHash(),
lastRefreshTimes.get(namespaceId));
}
lastRefreshTimes.put(namespaceId, System.currentTimeMillis());
}

setNamespaces(null);
} catch (RepositoryNotFoundException e) {
// LOG
} catch (NotFoundException e) {
// LOG
} catch (Exception e) {
// LOG
}

}

public void runRefreshService(boolean forced, NamespaceId namespace) throws Exception {
if (!forced && (System.currentTimeMillis() - lastRefreshTimes.get(namespace)
< 10 * 60 * 1000)) {
return;
}
List<NamespaceId> namespaceList = new ArrayList<>();
namespaceList.add(namespace);
setNamespaces(namespaceList);
runOneIteration();
}

@Override
protected void startUp() throws Exception {
this.namespaces = namespaceAdmin.list().stream()
.map(meta -> meta.getNamespaceId()).collect(Collectors.toList());
namespaces.forEach(n -> lastRefreshTimes.put(n, 0L));
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(1, 60 * 60, TimeUnit.SECONDS);
}

@Override
protected void shutDown() throws Exception {
if (executor != null) {
executor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -993,7 +994,7 @@ public ApplicationMeta getLatest(ApplicationReference appRef) {
}

@Override
public void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash) {
public void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash, Long lastRefreshTime) {
TransactionRunners.run(transactionRunner, context -> {
RepositorySourceControlMetadataStore repoMetadataStore = getRepoSourceControlMetadataStore(
context);
Expand All @@ -1005,6 +1006,12 @@ public void updateSourceControlMeta(ApplicationReference appRef, String repoFile
repoMetadataStore.write(appRef, false, 0L);
return;
}
if (namespaceSourceControlMeta.getLastSyncedAt() != null && namespaceSourceControlMeta.getLastSyncedAt().isAfter(
Instant.ofEpochMilli(lastRefreshTime))) {
repoMetadataStore.write(appRef,
namespaceSourceControlMeta.getSyncStatus(), namespaceSourceControlMeta.getLastSyncedAt().toEpochMilli());
return;
}
Boolean isSynced = namespaceSourceControlMeta.getFileHash().equals(repoFileHash);
if (isSynced) {
namespaceMetadataStore.write(appRef,
Expand Down
Loading

0 comments on commit c6c06ec

Please sign in to comment.