Skip to content

Commit

Permalink
added complete impl of refresh service
Browse files Browse the repository at this point in the history
  • Loading branch information
adrikagupta committed May 2, 2024
1 parent 55247a2 commit d8b506f
Show file tree
Hide file tree
Showing 24 changed files with 989 additions and 565 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
import io.cdap.cdap.internal.capability.CapabilityModule;
Expand Down Expand Up @@ -214,6 +215,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -258,6 +260,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -315,6 +318,7 @@ protected void configure() {
bind(StorageProviderNamespaceAdmin.class)
.to(DistributedStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

bind(ProgramRunDispatcher.class).to(RemoteProgramRunDispatcher.class)
.in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.cdap.cdap.security.impersonation.UGIProvider;
import io.cdap.cdap.security.spi.authorization.AccessEnforcer;
import io.cdap.cdap.security.spi.authorization.ContextAccessEnforcer;
import io.cdap.cdap.sourcecontrol.guice.SourceControlModule;
import io.cdap.cdap.store.DefaultOwnerStore;
import org.apache.twill.filesystem.LocationFactory;

Expand Down Expand Up @@ -185,6 +186,10 @@ protected void configure() {
bind(MetadataAdmin.class).to(DefaultMetadataAdmin.class);
expose(MetadataAdmin.class);

//TODO(adrika): cleanup this dependency later
// This is needed because there is a transitive dependency on source control operation
// runner which is not actually being used here
install(new SourceControlModule());
bindPreviewRunner(binder());
expose(PreviewRunner.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;
import java.util.List;

public class ListSourceControlMetadataResponse {
private final List<SourceControlMetadataRecord> apps;
private final String nextPageToken;
private final Long lastRefreshTime;

public ListSourceControlMetadataResponse(List<SourceControlMetadataRecord> apps,
String nextPageToken, Long lastRefreshTime) {
this.apps = apps;
this.nextPageToken = nextPageToken;
this.lastRefreshTime = lastRefreshTime;
}

public List<SourceControlMetadataRecord> getApps() {
return apps;
}

public String getNextPageToken() {
return nextPageToken;
}

public Long getLastRefreshTime() {
return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,6 @@ int scanAppSourceControlMetadata(ScanSourceControlMetadataRequest request,
int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request,
Consumer<SourceControlMetadataRecord> consumer);

void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash, Long lastRefreshTime);

/**
* Returns a Map of {@link ApplicationMeta} for the given set of {@link ApplicationId}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.cdap.cdap.api.security.AccessException;
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.app.store.ApplicationFilter;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanApplicationsRequest;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.common.ApplicationNotFoundException;
Expand Down Expand Up @@ -107,6 +108,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javafx.util.Pair;
import javax.annotation.Nullable;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
Expand Down Expand Up @@ -319,27 +321,29 @@ public void getAllNamespaceSourceControlMetadata(FullHttpRequest request, HttpRe
@QueryParam("filter") String filter
) throws Exception {
validateNamespace(namespaceId);

JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
jsonListResponder.send(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null : record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
Pair<Boolean, Long> limitReachedAndLastRefreshPair = new Pair<>(false, null);
try {
limitReachedAndLastRefreshPair = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
apps.add(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !limitReachedAndLastRefreshPair.getKey() || record == null ? null :
record.getName();
Long lastRefreshTime = limitReachedAndLastRefreshPair.getValue();
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ForbiddenException;
Expand Down Expand Up @@ -59,7 +60,11 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import javafx.util.Pair;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -156,27 +161,29 @@ public void listAllApplications(FullHttpRequest request, HttpResponder responder
@QueryParam("filter") String filter) throws Exception {
checkSourceControlFeatureFlag();
validateNamespaceId(namespaceId);
JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
//jsonListResponder.send(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null :
record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
Pair<Boolean, Long> limitReachedAndLastRefreshPair = new Pair<>(false, null);
try {
limitReachedAndLastRefreshPair = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
apps.add(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !limitReachedAndLastRefreshPair.getKey() || record == null ? null :
record.getName();
Long lastRefreshTime = limitReachedAndLastRefreshPair.getValue();
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataMigrationService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.internal.bootstrap.BootstrapService;
import io.cdap.cdap.internal.credential.CredentialProviderService;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class AppFabricServer extends AbstractIdleService {
private final RepositoryCleanupService repositoryCleanupService;
private final OperationNotificationSubscriberService operationNotificationSubscriberService;
private final SourceControlMetadataMigrationService sourceControlMetadataMigrationService;
private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService;
private final CConfiguration cConf;
private final SConfiguration sConf;
private final boolean sslEnabled;
Expand Down Expand Up @@ -137,7 +139,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService,
SourceControlMetadataMigrationService sourceControlMetadataMigrationService) {
SourceControlMetadataMigrationService sourceControlMetadataMigrationService,
SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
Expand Down Expand Up @@ -167,6 +170,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.repositoryCleanupService = repositoryCleanupService;
this.operationNotificationSubscriberService = operationNotificationSubscriberService;
this.sourceControlMetadataMigrationService = sourceControlMetadataMigrationService;
this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService;
}

/**
Expand Down Expand Up @@ -199,7 +203,8 @@ protected void startUp() throws Exception {
sourceControlOperationRunner.start(),
repositoryCleanupService.start(),
operationNotificationSubscriberService.start(),
sourceControlMetadataMigrationService.start()
sourceControlMetadataMigrationService.start(),
sourceControlMetadataRefreshService.start()
));
Futures.allAsList(futuresList).get();

Expand Down Expand Up @@ -262,6 +267,7 @@ protected void shutDown() throws Exception {
namespaceCredentialProviderService.stopAndWait();
operationNotificationSubscriberService.stopAndWait();
sourceControlMetadataMigrationService.stopAndWait();
sourceControlMetadataRefreshService.stopAndWait();
}

private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.cdap.cdap.common.ArtifactNotFoundException;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.CannotBeDeletedException;
import io.cdap.cdap.common.ForbiddenException;
import io.cdap.cdap.common.InvalidArtifactException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.SourceControlMetadataNotFoundException;
Expand All @@ -81,6 +82,7 @@
import io.cdap.cdap.internal.app.runtime.artifact.ArtifactDetail;
import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepository;
import io.cdap.cdap.internal.app.runtime.artifact.Artifacts;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.store.ApplicationMeta;
import io.cdap.cdap.internal.app.store.RunRecordDetail;
import io.cdap.cdap.internal.app.store.state.AppStateKey;
Expand Down Expand Up @@ -151,6 +153,7 @@
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import javafx.util.Pair;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -188,6 +191,7 @@ public class ApplicationLifecycleService extends AbstractIdleService {
private final MetricsCollectionService metricsCollectionService;
private final FeatureFlagsProvider featureFlagsProvider;
private final TransactionRunner transactionRunner;
private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService;

/**
* Construct the ApplicationLifeCycleService with service factory and cConf coming from guice
Expand All @@ -203,7 +207,8 @@ public ApplicationLifecycleService(CConfiguration cConf,
AccessEnforcer accessEnforcer, AuthenticationContext authenticationContext,
MessagingService messagingService, Impersonator impersonator,
CapabilityReader capabilityReader,
MetricsCollectionService metricsCollectionService, TransactionRunner transactionRunner) {
MetricsCollectionService metricsCollectionService, TransactionRunner transactionRunner,
SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
this.cConf = cConf;
this.appUpdateSchedules = cConf.getBoolean(Constants.AppFabric.APP_UPDATE_SCHEDULES,
Constants.AppFabric.DEFAULT_APP_UPDATE_SCHEDULES);
Expand All @@ -221,6 +226,7 @@ public ApplicationLifecycleService(CConfiguration cConf,
this.authenticationContext = authenticationContext;
this.impersonator = impersonator;
this.capabilityReader = capabilityReader;
this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService;
this.adminEventPublisher = new AdminEventPublisher(cConf,
new MultiThreadMessagingContext(messagingService));
this.metricsCollectionService = metricsCollectionService;
Expand Down Expand Up @@ -316,9 +322,13 @@ public boolean scanApplications(ScanApplicationsRequest request,
* @return True if the page limit has reached, false otherwise.
* @throws IOException If an I/O error occurs during the scanning process.
*/
public boolean scanSourceControlMetadata(ScanSourceControlMetadataRequest request,
public Pair<Boolean,Long> scanSourceControlMetadata(ScanSourceControlMetadataRequest request,
int txBatchSize,
Consumer<SourceControlMetadataRecord> consumer) throws IOException {
NamespaceId namespaceId = new NamespaceId(request.getNamespace());
// Get last refresh time of metadata refresh service of given namespace
Long lastRefreshTime = sourceControlMetadataRefreshService.getLastRefreshTime(namespaceId);

String lastKey = request.getScanAfter();
int currentLimit = request.getLimit();

Expand All @@ -337,12 +347,28 @@ public boolean scanSourceControlMetadata(ScanSourceControlMetadataRequest reques
}, IOException.class);

if (count < maxLimit) {
return false;
return new Pair<Boolean, Long>(false, lastRefreshTime);
}

currentLimit -= txBatchSize;
}
return true;

// Triggering source control metadata refresh service
try {
checkSourceControlMetadataAutoRefreshFlag();
sourceControlMetadataRefreshService.runRefreshService(namespaceId);
} catch (Exception e){
LOG.error("Failed to refresh source control metadata for namespace"
+ namespaceId.getNamespace(), e);
}

return new Pair<Boolean, Long>(true, lastRefreshTime);
}

private void checkSourceControlMetadataAutoRefreshFlag() throws ForbiddenException {
if (!Feature.SOURCE_CONTROL_METADATA_AUTO_REFRESH.isEnabled(featureFlagsProvider)) {
throw new ForbiddenException("Source Control Metadata Auto Refresh feature is not enabled.");
}
}

private void processApplications(List<Map.Entry<ApplicationId, ApplicationMeta>> list,
Expand Down
Loading

0 comments on commit d8b506f

Please sign in to comment.