Skip to content

Commit

Permalink
added implementation of peridic refresh for source control metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
adrikagupta committed May 5, 2024
1 parent b3b2cc1 commit 507b5f8
Show file tree
Hide file tree
Showing 22 changed files with 1,216 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
import io.cdap.cdap.internal.capability.CapabilityModule;
Expand Down Expand Up @@ -214,6 +215,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -258,6 +260,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -315,6 +318,7 @@ protected void configure() {
bind(StorageProviderNamespaceAdmin.class)
.to(DistributedStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

bind(ProgramRunDispatcher.class).to(RemoteProgramRunDispatcher.class)
.in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.cdap.cdap.security.impersonation.UGIProvider;
import io.cdap.cdap.security.spi.authorization.AccessEnforcer;
import io.cdap.cdap.security.spi.authorization.ContextAccessEnforcer;
import io.cdap.cdap.sourcecontrol.guice.SourceControlModule;
import io.cdap.cdap.store.DefaultOwnerStore;
import org.apache.twill.filesystem.LocationFactory;

Expand Down Expand Up @@ -185,6 +186,10 @@ protected void configure() {
bind(MetadataAdmin.class).to(DefaultMetadataAdmin.class);
expose(MetadataAdmin.class);

//TODO(adrika): cleanup this dependency later
// This is needed because there is a transitive dependency on source control operation
// runner which is not actually being used here
install(new SourceControlModule());
bindPreviewRunner(binder());
expose(PreviewRunner.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;
import java.util.List;

public class ListSourceControlMetadataResponse {
private final List<SourceControlMetadataRecord> apps;
private final String nextPageToken;
private final Long lastRefreshTime;

public ListSourceControlMetadataResponse(List<SourceControlMetadataRecord> apps,
String nextPageToken, Long lastRefreshTime) {
this.apps = apps;
this.nextPageToken = nextPageToken;
this.lastRefreshTime = lastRefreshTime;
}

public List<SourceControlMetadataRecord> getApps() {
return apps;
}

public String getNextPageToken() {
return nextPageToken;
}

public Long getLastRefreshTime() {
return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;

public class SingleSourceControlMetadataResponse {
private final SourceControlMetadataRecord record;
private final Long lastRefreshTime;

public SingleSourceControlMetadataResponse(SourceControlMetadataRecord record,
Long lastRefreshTime) {
this.record = record;
this.lastRefreshTime = lastRefreshTime;
}
public SourceControlMetadataRecord getRecord() {
return record;
}
public Long getLastRefreshTime() {
return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,6 @@ int scanAppSourceControlMetadata(ScanSourceControlMetadataRequest request,
int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request,
Consumer<SourceControlMetadataRecord> consumer);

void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash);

/**
* Returns a Map of {@link ApplicationMeta} for the given set of {@link ApplicationId}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import io.cdap.cdap.api.security.AccessException;
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.app.store.ApplicationFilter;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanApplicationsRequest;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.app.store.SingleSourceControlMetadataResponse;
import io.cdap.cdap.common.ApplicationNotFoundException;
import io.cdap.cdap.common.ArtifactAlreadyExistsException;
import io.cdap.cdap.common.BadRequestException;
Expand Down Expand Up @@ -107,6 +109,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javafx.util.Pair;
import javax.annotation.Nullable;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
Expand Down Expand Up @@ -319,27 +322,29 @@ public void getAllNamespaceSourceControlMetadata(FullHttpRequest request, HttpRe
@QueryParam("filter") String filter
) throws Exception {
validateNamespace(namespaceId);

JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
jsonListResponder.send(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null : record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
apps.add(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand All @@ -357,10 +362,11 @@ public void getNamespaceSourceControlMetadata(HttpRequest request, HttpResponder
@PathParam("namespace-id") final String namespaceId,
@PathParam("app-id") final String appName) throws Exception {
validateApplicationId(namespaceId, appName);

responder.sendJson(HttpResponseStatus.OK,
GSON.toJson(applicationLifecycleService.getSourceControlMetadataRecord(
new ApplicationReference(namespaceId, appName))));
SourceControlMetadataRecord record = applicationLifecycleService.getSourceControlMetadataRecord(
new ApplicationReference(namespaceId, appName));
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
SingleSourceControlMetadataResponse response = new SingleSourceControlMetadataResponse(record, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

private ScanApplicationsRequest getScanRequest(String namespaceId, String artifactVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ForbiddenException;
Expand Down Expand Up @@ -59,7 +60,11 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import javafx.util.Pair;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -156,29 +161,29 @@ public void listAllApplications(FullHttpRequest request, HttpResponder responder
@QueryParam("filter") String filter) throws Exception {
checkSourceControlFeatureFlag();
validateNamespaceId(namespaceId);
JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
jsonListResponder.send(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (NotFoundException e) {
responder.sendString(HttpResponseStatus.NOT_FOUND, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null :
record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
apps.add(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = sourceControlService.getLastRefreshTime(namespaceId);
ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataMigrationService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.internal.bootstrap.BootstrapService;
import io.cdap.cdap.internal.credential.CredentialProviderService;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class AppFabricServer extends AbstractIdleService {
private final RepositoryCleanupService repositoryCleanupService;
private final OperationNotificationSubscriberService operationNotificationSubscriberService;
private final SourceControlMetadataMigrationService sourceControlMetadataMigrationService;
private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService;
private final CConfiguration cConf;
private final SConfiguration sConf;
private final boolean sslEnabled;
Expand Down Expand Up @@ -137,7 +139,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService,
SourceControlMetadataMigrationService sourceControlMetadataMigrationService) {
SourceControlMetadataMigrationService sourceControlMetadataMigrationService,
SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
Expand Down Expand Up @@ -167,6 +170,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.repositoryCleanupService = repositoryCleanupService;
this.operationNotificationSubscriberService = operationNotificationSubscriberService;
this.sourceControlMetadataMigrationService = sourceControlMetadataMigrationService;
this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService;
}

/**
Expand Down Expand Up @@ -199,7 +203,8 @@ protected void startUp() throws Exception {
sourceControlOperationRunner.start(),
repositoryCleanupService.start(),
operationNotificationSubscriberService.start(),
sourceControlMetadataMigrationService.start()
sourceControlMetadataMigrationService.start(),
sourceControlMetadataRefreshService.start()
));
Futures.allAsList(futuresList).get();

Expand Down Expand Up @@ -262,6 +267,7 @@ protected void shutDown() throws Exception {
namespaceCredentialProviderService.stopAndWait();
operationNotificationSubscriberService.stopAndWait();
sourceControlMetadataMigrationService.stopAndWait();
sourceControlMetadataRefreshService.stopAndWait();
}

private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
Expand Down
Loading

0 comments on commit 507b5f8

Please sign in to comment.