From d8b506f5cc9faf0bf785f0c5f713e8d4390fc0f4 Mon Sep 17 00:00:00 2001 From: Adrika Gupta Date: Thu, 2 May 2024 09:54:28 +0530 Subject: [PATCH] added complete impl of refresh service --- .../guice/AppFabricServiceRuntimeModule.java | 4 + .../cdap/app/preview/PreviewRunnerModule.java | 5 + .../ListSourceControlMetadataResponse.java | 45 +++ .../java/io/cdap/cdap/app/store/Store.java | 2 - .../handlers/AppLifecycleHttpHandler.java | 46 +-- .../SourceControlManagementHttpHandler.java | 49 +-- .../app/services/AppFabricServer.java | 10 +- .../services/ApplicationLifecycleService.java | 34 +- ...ceSourceControlMetadataRefreshService.java | 254 ++++++++++++++ .../SourceControlManagementService.java | 69 ++-- .../SourceControlMetadataRefreshService.java | 197 ----------- .../SourceControlMetadataRefreshService.java | 110 ++++++ .../cdap/internal/app/store/DefaultStore.java | 66 ++-- .../NamespaceSourceControlMetadataStore.java | 11 + .../RepositorySourceControlMetadataStore.java | 41 +-- ...urceControlMetadataRefreshServiceTest.java | 331 ++++++++++++++++++ .../SourceControlManagementServiceTest.java | 80 ++--- ...urceControlMetadataRefreshServiceTest.java | 85 ----- .../handlers/AppLifecycleHttpHandlerTest.java | 8 +- ...urceControlManagementHttpHandlerTests.java | 22 +- .../io/cdap/cdap/common/conf/Constants.java | 4 + .../src/main/resources/cdap-default.xml | 16 + .../java/io/cdap/cdap/features/Feature.java | 3 +- .../proto/SourceControlMetadataDetail.java | 62 ---- 24 files changed, 989 insertions(+), 565 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/ListSourceControlMetadataResponse.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshService.java delete mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlMetadataRefreshService.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java create mode 100644 cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshServiceTest.java delete mode 100644 cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlMetadataRefreshServiceTest.java delete mode 100644 cdap-proto/src/main/java/io/cdap/cdap/proto/SourceControlMetadataDetail.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index 24c028a0471d..374151c36c4b 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -117,6 +117,7 @@ import io.cdap.cdap.internal.app.services.RunRecordCorrectorService; import io.cdap.cdap.internal.app.services.RunRecordMonitorService; import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.DefaultStore; import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules; import io.cdap.cdap.internal.capability.CapabilityModule; @@ -214,6 +215,7 @@ protected void configure() { bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class); bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class); bind(UGIProvider.class).toProvider(UgiProviderProvider.class); + bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON); Multibinder servicesNamesBinder = Multibinder.newSetBinder(binder(), String.class, @@ -258,6 +260,7 @@ protected void configure() { bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class); bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class); bind(UGIProvider.class).toProvider(UgiProviderProvider.class); + bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON); Multibinder servicesNamesBinder = Multibinder.newSetBinder(binder(), String.class, @@ -315,6 +318,7 @@ protected void configure() { bind(StorageProviderNamespaceAdmin.class) .to(DistributedStorageProviderNamespaceAdmin.class); bind(UGIProvider.class).toProvider(UgiProviderProvider.class); + bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON); bind(ProgramRunDispatcher.class).to(RemoteProgramRunDispatcher.class) .in(Scopes.SINGLETON); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java index f3d8e85c0524..6bd2ee384ca8 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java @@ -75,6 +75,7 @@ import io.cdap.cdap.security.impersonation.UGIProvider; import io.cdap.cdap.security.spi.authorization.AccessEnforcer; import io.cdap.cdap.security.spi.authorization.ContextAccessEnforcer; +import io.cdap.cdap.sourcecontrol.guice.SourceControlModule; import io.cdap.cdap.store.DefaultOwnerStore; import org.apache.twill.filesystem.LocationFactory; @@ -185,6 +186,10 @@ protected void configure() { bind(MetadataAdmin.class).to(DefaultMetadataAdmin.class); expose(MetadataAdmin.class); + //TODO(adrika): cleanup this dependency later + // This is needed because there is a transitive dependency on source control operation + // runner which is not actually being used here + install(new SourceControlModule()); bindPreviewRunner(binder()); expose(PreviewRunner.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/ListSourceControlMetadataResponse.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/ListSourceControlMetadataResponse.java new file mode 100644 index 000000000000..29491cd145c9 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/ListSourceControlMetadataResponse.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.app.store; + +import io.cdap.cdap.proto.SourceControlMetadataRecord; +import java.util.List; + +public class ListSourceControlMetadataResponse { + private final List apps; + private final String nextPageToken; + private final Long lastRefreshTime; + + public ListSourceControlMetadataResponse(List apps, + String nextPageToken, Long lastRefreshTime) { + this.apps = apps; + this.nextPageToken = nextPageToken; + this.lastRefreshTime = lastRefreshTime; + } + + public List getApps() { + return apps; + } + + public String getNextPageToken() { + return nextPageToken; + } + + public Long getLastRefreshTime() { + return lastRefreshTime; + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java index 21828fa838ed..9d4903f35b59 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java @@ -495,8 +495,6 @@ int scanAppSourceControlMetadata(ScanSourceControlMetadataRequest request, int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request, Consumer consumer); - void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash, Long lastRefreshTime); - /** * Returns a Map of {@link ApplicationMeta} for the given set of {@link ApplicationId}. * diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java index 38c968899b61..69b68869faae 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java @@ -33,6 +33,7 @@ import io.cdap.cdap.api.security.AccessException; import io.cdap.cdap.app.runtime.ProgramRuntimeService; import io.cdap.cdap.app.store.ApplicationFilter; +import io.cdap.cdap.app.store.ListSourceControlMetadataResponse; import io.cdap.cdap.app.store.ScanApplicationsRequest; import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest; import io.cdap.cdap.common.ApplicationNotFoundException; @@ -107,6 +108,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import javafx.util.Pair; import javax.annotation.Nullable; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -319,27 +321,29 @@ public void getAllNamespaceSourceControlMetadata(FullHttpRequest request, HttpRe @QueryParam("filter") String filter ) throws Exception { validateNamespace(namespaceId); - - JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT, - jsonListResponder -> { - AtomicReference lastRecord = new AtomicReference<>(null); - ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest( - namespaceId, - pageToken, pageSize, sortOrder, sortOn, filter); - boolean pageLimitReached = false; - try { - pageLimitReached = applicationLifecycleService.scanSourceControlMetadata( - scanRequest, batchSize, - scmMetaRecord -> { - jsonListResponder.send(scmMetaRecord); - lastRecord.set(scmMetaRecord); - }); - } catch (IOException e) { - responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); - } - SourceControlMetadataRecord record = lastRecord.get(); - return !pageLimitReached || record == null ? null : record.getName(); - }); + List apps = new ArrayList<>(); + AtomicReference lastRecord = new AtomicReference<>(null); + ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest( + namespaceId, + pageToken, pageSize, sortOrder, sortOn, filter); + Pair limitReachedAndLastRefreshPair = new Pair<>(false, null); + try { + limitReachedAndLastRefreshPair = applicationLifecycleService.scanSourceControlMetadata( + scanRequest, batchSize, + scmMetaRecord -> { + apps.add(scmMetaRecord); + lastRecord.set(scmMetaRecord); + }); + } catch (IOException e) { + responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + SourceControlMetadataRecord record = lastRecord.get(); + String nextPageToken = !limitReachedAndLastRefreshPair.getKey() || record == null ? null : + record.getName(); + Long lastRefreshTime = limitReachedAndLastRefreshPair.getValue(); + ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps, + nextPageToken, lastRefreshTime); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response)); } /** diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java index e0408fc07a7e..19812af49e18 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java @@ -21,6 +21,7 @@ import com.google.gson.JsonSyntaxException; import com.google.inject.Inject; import io.cdap.cdap.api.feature.FeatureFlagsProvider; +import io.cdap.cdap.app.store.ListSourceControlMetadataResponse; import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest; import io.cdap.cdap.common.BadRequestException; import io.cdap.cdap.common.ForbiddenException; @@ -59,7 +60,11 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; +import javafx.util.Pair; import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -156,27 +161,29 @@ public void listAllApplications(FullHttpRequest request, HttpResponder responder @QueryParam("filter") String filter) throws Exception { checkSourceControlFeatureFlag(); validateNamespaceId(namespaceId); - JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT, - jsonListResponder -> { - AtomicReference lastRecord = new AtomicReference<>(null); - ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest( - namespaceId, - pageToken, pageSize, sortOrder, sortOn, filter); - boolean pageLimitReached = false; - try { - pageLimitReached = sourceControlService.scanRepoMetadata( - scanRequest, batchSize, - record -> { - //jsonListResponder.send(record); - lastRecord.set(record); - }); - } catch (IOException e) { - responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); - } - SourceControlMetadataRecord record = lastRecord.get(); - return !pageLimitReached || record == null ? null : - record.getName(); - }); + List apps = new ArrayList<>(); + AtomicReference lastRecord = new AtomicReference<>(null); + ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest( + namespaceId, + pageToken, pageSize, sortOrder, sortOn, filter); + Pair limitReachedAndLastRefreshPair = new Pair<>(false, null); + try { + limitReachedAndLastRefreshPair = sourceControlService.scanRepoMetadata( + scanRequest, batchSize, + record -> { + apps.add(record); + lastRecord.set(record); + }); + } catch (IOException e) { + responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); + } + SourceControlMetadataRecord record = lastRecord.get(); + String nextPageToken = !limitReachedAndLastRefreshPair.getKey() || record == null ? null : + record.getName(); + Long lastRefreshTime = limitReachedAndLastRefreshPair.getValue(); + ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps, + nextPageToken, lastRefreshTime); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response)); } /** diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java index 7e42da293add..9c5391cea6df 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java @@ -38,6 +38,7 @@ import io.cdap.cdap.common.security.HttpsEnabler; import io.cdap.cdap.features.Feature; import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataMigrationService; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.AppMetadataStore; import io.cdap.cdap.internal.bootstrap.BootstrapService; import io.cdap.cdap.internal.credential.CredentialProviderService; @@ -97,6 +98,7 @@ public class AppFabricServer extends AbstractIdleService { private final RepositoryCleanupService repositoryCleanupService; private final OperationNotificationSubscriberService operationNotificationSubscriberService; private final SourceControlMetadataMigrationService sourceControlMetadataMigrationService; + private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService; private final CConfiguration cConf; private final SConfiguration sConf; private final boolean sslEnabled; @@ -137,7 +139,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, SourceControlOperationRunner sourceControlOperationRunner, RepositoryCleanupService repositoryCleanupService, OperationNotificationSubscriberService operationNotificationSubscriberService, - SourceControlMetadataMigrationService sourceControlMetadataMigrationService) { + SourceControlMetadataMigrationService sourceControlMetadataMigrationService, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { this.hostname = hostname; this.discoveryService = discoveryService; this.handlers = handlers; @@ -167,6 +170,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, this.repositoryCleanupService = repositoryCleanupService; this.operationNotificationSubscriberService = operationNotificationSubscriberService; this.sourceControlMetadataMigrationService = sourceControlMetadataMigrationService; + this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService; } /** @@ -199,7 +203,8 @@ protected void startUp() throws Exception { sourceControlOperationRunner.start(), repositoryCleanupService.start(), operationNotificationSubscriberService.start(), - sourceControlMetadataMigrationService.start() + sourceControlMetadataMigrationService.start(), + sourceControlMetadataRefreshService.start() )); Futures.allAsList(futuresList).get(); @@ -262,6 +267,7 @@ protected void shutDown() throws Exception { namespaceCredentialProviderService.stopAndWait(); operationNotificationSubscriberService.stopAndWait(); sourceControlMetadataMigrationService.stopAndWait(); + sourceControlMetadataRefreshService.stopAndWait(); } private Cancellable startHttpService(NettyHttpService httpService) throws Exception { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java index a56e32f69c60..485a3e33efb0 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java @@ -58,6 +58,7 @@ import io.cdap.cdap.common.ArtifactNotFoundException; import io.cdap.cdap.common.BadRequestException; import io.cdap.cdap.common.CannotBeDeletedException; +import io.cdap.cdap.common.ForbiddenException; import io.cdap.cdap.common.InvalidArtifactException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.SourceControlMetadataNotFoundException; @@ -81,6 +82,7 @@ import io.cdap.cdap.internal.app.runtime.artifact.ArtifactDetail; import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepository; import io.cdap.cdap.internal.app.runtime.artifact.Artifacts; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.ApplicationMeta; import io.cdap.cdap.internal.app.store.RunRecordDetail; import io.cdap.cdap.internal.app.store.state.AppStateKey; @@ -151,6 +153,7 @@ import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import javafx.util.Pair; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,6 +191,7 @@ public class ApplicationLifecycleService extends AbstractIdleService { private final MetricsCollectionService metricsCollectionService; private final FeatureFlagsProvider featureFlagsProvider; private final TransactionRunner transactionRunner; + private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService; /** * Construct the ApplicationLifeCycleService with service factory and cConf coming from guice @@ -203,7 +207,8 @@ public ApplicationLifecycleService(CConfiguration cConf, AccessEnforcer accessEnforcer, AuthenticationContext authenticationContext, MessagingService messagingService, Impersonator impersonator, CapabilityReader capabilityReader, - MetricsCollectionService metricsCollectionService, TransactionRunner transactionRunner) { + MetricsCollectionService metricsCollectionService, TransactionRunner transactionRunner, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { this.cConf = cConf; this.appUpdateSchedules = cConf.getBoolean(Constants.AppFabric.APP_UPDATE_SCHEDULES, Constants.AppFabric.DEFAULT_APP_UPDATE_SCHEDULES); @@ -221,6 +226,7 @@ public ApplicationLifecycleService(CConfiguration cConf, this.authenticationContext = authenticationContext; this.impersonator = impersonator; this.capabilityReader = capabilityReader; + this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService; this.adminEventPublisher = new AdminEventPublisher(cConf, new MultiThreadMessagingContext(messagingService)); this.metricsCollectionService = metricsCollectionService; @@ -316,9 +322,13 @@ public boolean scanApplications(ScanApplicationsRequest request, * @return True if the page limit has reached, false otherwise. * @throws IOException If an I/O error occurs during the scanning process. */ - public boolean scanSourceControlMetadata(ScanSourceControlMetadataRequest request, + public Pair scanSourceControlMetadata(ScanSourceControlMetadataRequest request, int txBatchSize, Consumer consumer) throws IOException { + NamespaceId namespaceId = new NamespaceId(request.getNamespace()); + // Get last refresh time of metadata refresh service of given namespace + Long lastRefreshTime = sourceControlMetadataRefreshService.getLastRefreshTime(namespaceId); + String lastKey = request.getScanAfter(); int currentLimit = request.getLimit(); @@ -337,12 +347,28 @@ public boolean scanSourceControlMetadata(ScanSourceControlMetadataRequest reques }, IOException.class); if (count < maxLimit) { - return false; + return new Pair(false, lastRefreshTime); } currentLimit -= txBatchSize; } - return true; + + // Triggering source control metadata refresh service + try { + checkSourceControlMetadataAutoRefreshFlag(); + sourceControlMetadataRefreshService.runRefreshService(namespaceId); + } catch (Exception e){ + LOG.error("Failed to refresh source control metadata for namespace" + + namespaceId.getNamespace(), e); + } + + return new Pair(true, lastRefreshTime); + } + + private void checkSourceControlMetadataAutoRefreshFlag() throws ForbiddenException { + if (!Feature.SOURCE_CONTROL_METADATA_AUTO_REFRESH.isEnabled(featureFlagsProvider)) { + throw new ForbiddenException("Source Control Metadata Auto Refresh feature is not enabled."); + } } private void processApplications(List> list, diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshService.java new file mode 100644 index 000000000000..f060b39884d1 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshService.java @@ -0,0 +1,254 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.services; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.AbstractScheduledService; +import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest; +import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.common.RepositoryNotFoundException; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.SourceControlManagement; +import io.cdap.cdap.common.namespace.NamespaceAdmin; +import io.cdap.cdap.internal.app.store.NamespaceSourceControlMetadataStore; +import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; +import io.cdap.cdap.proto.SourceControlMetadataRecord; +import io.cdap.cdap.proto.element.EntityType; +import io.cdap.cdap.proto.id.ApplicationReference; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig; +import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta; +import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta; +import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository; +import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp; +import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse; +import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.TableNotFoundException; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import io.cdap.cdap.store.RepositoryTable; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.inject.Inject; +import org.apache.twill.common.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NamespaceSourceControlMetadataRefreshService extends AbstractScheduledService { + + private static final Logger LOG = LoggerFactory.getLogger( + NamespaceSourceControlMetadataRefreshService.class); + private final long runInterval; + private final long bufferTime; + private final TransactionRunner transactionRunner; + private final SourceControlOperationRunner sourceControlOperationRunner; + private final Store store; + private final NamespaceId namespaceId; + private final NamespaceAdmin namespaceAdmin; + private AtomicLong lastRefreshTime = new AtomicLong(0); + private AtomicLong refreshStartTime = new AtomicLong(0); + private ScheduledExecutorService executor; + + @Inject + public NamespaceSourceControlMetadataRefreshService(CConfiguration cConf, + TransactionRunner transactionRunner, + SourceControlOperationRunner sourceControlOperationRunner, Store store, + NamespaceId namespaceId, NamespaceAdmin namespaceAdmin) { + this.transactionRunner = transactionRunner; + this.sourceControlOperationRunner = sourceControlOperationRunner; + this.store = store; + this.runInterval = cConf.getLong(SourceControlManagement.METADATA_REFRESH_INTERVAL_SECONDS); + this.bufferTime = cConf.getLong(SourceControlManagement.METADATA_BUFFER_SECONDS); + this.namespaceId = namespaceId; + this.namespaceAdmin = namespaceAdmin; + } + + private RepositorySourceControlMetadataStore getRepoSourceControlMetadataStore( + StructuredTableContext context) { + return RepositorySourceControlMetadataStore.create(context); + } + + private NamespaceSourceControlMetadataStore getNamespaceSourceControlMetadataStore( + StructuredTableContext context) { + return NamespaceSourceControlMetadataStore.create(context); + } + + @Override + protected final ScheduledExecutorService executor() { + executor = Executors.newSingleThreadScheduledExecutor( + Threads.createDaemonThreadFactory("source-control-metadata-refresh-service")); + return executor; + } + + private RepositoryTable getRepositoryTable(StructuredTableContext context) + throws TableNotFoundException { + return new RepositoryTable(context); + } + + + public RepositoryMeta getRepositoryMeta(NamespaceId namespace) + throws RepositoryNotFoundException { + return TransactionRunners.run(transactionRunner, context -> { + RepositoryTable table = getRepositoryTable(context); + RepositoryMeta repoMeta = table.get(namespace); + if (repoMeta == null) { + throw new RepositoryNotFoundException(namespace); + } + + return repoMeta; + }, RepositoryNotFoundException.class); + } + + // TODO(CDAP-21017): Optimize periodic refresh of source control metadata + @Override + public void runOneIteration() { + try { + if (!namespaceAdmin.exists(namespaceId)) { + LOG.info("Stopping SourceControlMetadataRefreshService for " + namespaceId.getNamespace() + + " since it does not exist"); + this.stop(); + } + if (System.currentTimeMillis() - refreshStartTime.get() < bufferTime) { + return; + } + + refreshStartTime.set(System.currentTimeMillis()); + LOG.info("Running SourceControlMetadataRefreshService for namespace " + + namespaceId.getNamespace()); + // Getting repository config for a specific namespace + RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig(); + + // Listing the apps from remote repo + RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list( + new NamespaceRepository(namespaceId, repoConfig)); + + // Cleaning up the repo source control metadata table + HashSet repoFileNames = new HashSet<>(); + for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { + repoFileNames.add(repoApp.getName()); + } + cleanupRepoSourceControlMeta(namespaceId.getNamespace(), repoFileNames); + + // Updating the namespace and repo source control metadata table + for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { + updateSourceControlMeta( + new ApplicationReference(namespaceId.getNamespace(), repoApp.getName()), + repoApp.getFileHash(), + refreshStartTime.get()); + } + lastRefreshTime.set(System.currentTimeMillis()); + + } catch (RepositoryNotFoundException e) { + LOG.info("Stopping SourceControlMetadataRefreshService because repository does not exist " + + "for namespace " + namespaceId.getNamespace(), e); + this.stop(); + } catch (Exception e) { + LOG.error("Failed to refresh source control metadata for namespace " + + namespaceId.getNamespace(), e); + } + + } + + public Long getLastRefreshTime() { + return this.lastRefreshTime.get(); + } + + @Override + protected void startUp() throws Exception { + if (this.runInterval <= 0) { + this.stop(); + } + LOG.info("Starting SourceControlMetadataRefreshService for namespace " + + namespaceId.getNamespace()); + } + + @Override + protected Scheduler scheduler() { + return Scheduler.newFixedRateSchedule(1, runInterval, TimeUnit.SECONDS); + } + + @Override + protected void shutDown() throws Exception { + if (executor != null) { + executor.shutdownNow(); + } + } + + public void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash, + Long refreshStartTime) { + TransactionRunners.run(transactionRunner, context -> { + RepositorySourceControlMetadataStore repoMetadataStore = getRepoSourceControlMetadataStore( + context); + NamespaceSourceControlMetadataStore namespaceMetadataStore = getNamespaceSourceControlMetadataStore( + context); + SourceControlMeta namespaceSourceControlMeta = namespaceMetadataStore.get(appRef); + + if (namespaceSourceControlMeta == null || namespaceSourceControlMeta.getFileHash() == null) { + repoMetadataStore.write(appRef, false, 0L); + return; + } + Instant lastSyncedAt = namespaceSourceControlMeta.getLastSyncedAt(); + Instant refreshTime = Instant.ofEpochMilli(refreshStartTime); + if (namespaceSourceControlMeta.getLastSyncedAt() != null + && namespaceSourceControlMeta.getLastSyncedAt().isAfter( + Instant.ofEpochMilli(refreshStartTime))) { + repoMetadataStore.write(appRef, + Boolean.TRUE.equals(namespaceSourceControlMeta.getSyncStatus()), + namespaceSourceControlMeta.getLastSyncedAt().toEpochMilli()); + return; + } + boolean isSynced = namespaceSourceControlMeta.getFileHash().equals(repoFileHash); + namespaceMetadataStore.write(appRef, + SourceControlMeta.builder(namespaceSourceControlMeta).setSyncStatus(isSynced).build()); + repoMetadataStore.write(appRef, + isSynced, namespaceSourceControlMeta.getLastSyncedAt() == null ? 0L : + namespaceSourceControlMeta.getLastSyncedAt().toEpochMilli()); + }); + } + + private void cleanupRepoSourceControlMeta(String namespace, HashSet repoFiles) { + TransactionRunners.run(transactionRunner, context -> { + RepositorySourceControlMetadataStore store = getRepoSourceControlMetadataStore(context); + ScanSourceControlMetadataRequest request = ScanSourceControlMetadataRequest + .builder() + .setNamespace(namespace) + .setLimit(Integer.MAX_VALUE) + .build(); + ArrayList records = new ArrayList<>(); + String type = EntityType.APPLICATION.toString(); + store.scan(request, type, records::add); + for (SourceControlMetadataRecord record : records) { + if (!repoFiles.contains(record.getName())) { + store.delete( + new ApplicationReference(record.getNamespace(), record.getName())); + } + } + }); + } + + @VisibleForTesting + void setStartRefreshTime(Instant refreshStartTime) { + this.refreshStartTime.set(refreshStartTime.toEpochMilli()); + } + +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java index 779295cd8ac7..c9595e3ef3ec 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java @@ -20,11 +20,13 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.feature.FeatureFlagsProvider; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.api.metrics.MetricsContext; import io.cdap.cdap.api.security.store.SecureStore; import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest; import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.common.ForbiddenException; import io.cdap.cdap.common.NamespaceNotFoundException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.RepositoryNotFoundException; @@ -33,9 +35,12 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants.Metrics.SourceControlManagement; import io.cdap.cdap.common.conf.Constants.Metrics.Tag; +import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider; +import io.cdap.cdap.features.Feature; import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; import io.cdap.cdap.internal.operation.OperationLifecycleManager; import io.cdap.cdap.proto.ApplicationDetail; @@ -64,14 +69,11 @@ import io.cdap.cdap.sourcecontrol.RepositoryManager; import io.cdap.cdap.sourcecontrol.SourceControlConfig; import io.cdap.cdap.sourcecontrol.SourceControlException; -import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository; import io.cdap.cdap.sourcecontrol.operationrunner.PullAppOperationRequest; import io.cdap.cdap.sourcecontrol.operationrunner.PullAppResponse; import io.cdap.cdap.sourcecontrol.operationrunner.PushAppMeta; import io.cdap.cdap.sourcecontrol.operationrunner.PushAppOperationRequest; import io.cdap.cdap.sourcecontrol.operationrunner.PushAppsResponse; -import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp; -import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse; import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; import io.cdap.cdap.spi.data.StructuredTableContext; import io.cdap.cdap.spi.data.TableNotFoundException; @@ -84,6 +86,7 @@ import java.util.HashSet; import java.util.Optional; import java.util.function.Consumer; +import javafx.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,17 +101,19 @@ public class SourceControlManagementService { private final TransactionRunner transactionRunner; private final CConfiguration cConf; private final SecureStore secureStore; - private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService; private final SourceControlOperationRunner sourceControlOperationRunner; private final ApplicationLifecycleService appLifecycleService; private final Store store; private final OperationLifecycleManager operationLifecycleManager; private final MetricsCollectionService metricsCollectionService; private final Clock clock; + private final FeatureFlagsProvider featureFlagsProvider; + private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService; private static final Logger LOG = LoggerFactory.getLogger(SourceControlManagementService.class); /** + /** * Constructor for SourceControlManagementService with all params injected via guice. */ @Inject @@ -141,7 +146,8 @@ public SourceControlManagementService(CConfiguration cConf, Store store, OperationLifecycleManager operationLifecycleManager, MetricsCollectionService metricsCollectionService, - Clock clock, SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { + Clock clock, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { this.cConf = cConf; this.secureStore = secureStore; this.transactionRunner = transactionRunner; @@ -153,6 +159,7 @@ public SourceControlManagementService(CConfiguration cConf, this.operationLifecycleManager = operationLifecycleManager; this.metricsCollectionService = metricsCollectionService; this.clock = clock; + this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf); this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService; } @@ -192,7 +199,7 @@ public RepositoryMeta setRepository(NamespaceId namespace, RepositoryConfig repo RepositoryTable repoTable = getRepositoryTable(context); repoTable.create(namespace, repository); - sourceControlMetadataRefreshService.runRefreshService(true, namespace); + sourceControlMetadataRefreshService.addRefreshService(namespace); return repoTable.get(namespace); }, NamespaceNotFoundException.class); } @@ -209,7 +216,9 @@ public void deleteRepository(NamespaceId namespace) { TransactionRunners.run(transactionRunner, context -> { RepositoryTable repoTable = getRepositoryTable(context); repoTable.delete(namespace); + sourceControlMetadataRefreshService.removeRefreshService(namespace); }); + } /** @@ -397,37 +406,14 @@ private PullAppResponse pullAndValidateApplication(ApplicationReference appRe * @param txBatchSize The transaction batch size for processing metadata in each iteration. * @param consumer The consumer to process the scanned repository metadata records. * @return True if the page limit has reached, false otherwise. - * @throws NotFoundException If the requested repository metadata is not found. - * @throws AuthenticationConfigException If an authentication configuration error occurs. * @throws IOException If an I/O error occurs during the scanning process. */ - public boolean scanRepoMetadata(ScanSourceControlMetadataRequest request, int txBatchSize, + public Pair scanRepoMetadata(ScanSourceControlMetadataRequest request, int txBatchSize, Consumer consumer) throws IOException { NamespaceId namespaceId = new NamespaceId(request.getNamespace()); -// accessEnforcer.enforce(namespaceId, authenticationContext.getPrincipal(), -// NamespacePermission.READ_REPOSITORY); -// RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig(); -// // TODO(CDAP-20993): List API is used here for testing. It will be moved to a separate background job in the next PR -// RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list( -// new NamespaceRepository(namespaceId, repoConfig)); -// LOG.debug("Successfully received apps in namespace {} from repository : response: {}", -// namespaceId, -// repositoryAppsResponse); -// // Cleaning up the repo source control metadata table -// HashSet repoFileNames = new HashSet<>(); -// for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { -// repoFileNames.add(repoApp.getName()); -// } -// TransactionRunners.run(transactionRunner, context -> { -// getRepoSourceControlMetadataStore(context).cleanupRepoSourceControlMeta( -// namespaceId.getNamespace(), repoFileNames); -// }); -// // Updating the namespace and repo source control metadata table -// for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { -// store.updateSourceControlMeta( -// new ApplicationReference(request.getNamespace(), -// repoApp.getName()), repoApp.getFileHash()); -// } + // Get last refresh time of metadata refresh service of given namespace + Long lastRefreshTime = sourceControlMetadataRefreshService.getLastRefreshTime(namespaceId); + // Getting repo files String lastKey = request.getScanAfter(); int currentLimit = request.getLimit(); @@ -447,16 +433,19 @@ public boolean scanRepoMetadata(ScanSourceControlMetadataRequest request, int tx }, IOException.class); if (count < maxLimit) { - return false; + return new Pair(false, lastRefreshTime); } currentLimit -= txBatchSize; } + // Triggering source control metadata refresh service try { - sourceControlMetadataRefreshService.runRefreshService(false, namespaceId); + checkSourceControlMetadataAutoRefreshFlag(); + sourceControlMetadataRefreshService.runRefreshService(namespaceId); } catch (Exception e){ - // log it + LOG.error("Failed to refresh source control metadata for namespace" + + namespaceId.getNamespace(), e); } - return true; + return new Pair(true, lastRefreshTime); } /** @@ -513,4 +502,10 @@ private MetricsContext getMetricContext(NamespaceId namespace) { return metricsCollectionService.getContext( ImmutableMap.of(Tag.NAMESPACE, namespace.getNamespace())); } + + private void checkSourceControlMetadataAutoRefreshFlag() throws ForbiddenException { + if (!Feature.SOURCE_CONTROL_METADATA_AUTO_REFRESH.isEnabled(featureFlagsProvider)) { + throw new ForbiddenException("Source Control Metadata Auto Refresh feature is not enabled."); + } + } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlMetadataRefreshService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlMetadataRefreshService.java deleted file mode 100644 index a1fb8b3e67bd..000000000000 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlMetadataRefreshService.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Copyright © 2024 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.internal.app.services; - -import com.google.common.util.concurrent.AbstractScheduledService; -import io.cdap.cdap.app.store.Store; -import io.cdap.cdap.common.NotFoundException; -import io.cdap.cdap.common.RepositoryNotFoundException; -import io.cdap.cdap.common.namespace.NamespaceAdmin; -import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; -import io.cdap.cdap.proto.id.ApplicationReference; -import io.cdap.cdap.proto.id.NamespaceId; -import io.cdap.cdap.proto.security.NamespacePermission; -import io.cdap.cdap.proto.security.StandardPermission; -import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig; -import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta; -import io.cdap.cdap.security.spi.authentication.AuthenticationContext; -import io.cdap.cdap.security.spi.authorization.AccessEnforcer; -import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository; -import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp; -import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse; -import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; -import io.cdap.cdap.spi.data.StructuredTableContext; -import io.cdap.cdap.spi.data.TableNotFoundException; -import io.cdap.cdap.spi.data.transaction.TransactionRunner; -import io.cdap.cdap.spi.data.transaction.TransactionRunners; -import io.cdap.cdap.store.RepositoryTable; -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.twill.common.Threads; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SourceControlMetadataRefreshService extends AbstractScheduledService { - - private List namespaces; - private HashMap lastRefreshTimes; - - private final AccessEnforcer accessEnforcer; - private final AuthenticationContext authenticationContext; - - private final NamespaceAdmin namespaceAdmin; - private ScheduledExecutorService executor; - private final TransactionRunner transactionRunner; - private final SourceControlOperationRunner sourceControlOperationRunner; - private final Store store; - private static final Logger LOG = LoggerFactory.getLogger( - SourceControlMetadataRefreshService.class); - - public SourceControlMetadataRefreshService(AccessEnforcer accessEnforcer, - AuthenticationContext authenticationContext, NamespaceAdmin namespaceAdmin, - TransactionRunner transactionRunner, - SourceControlOperationRunner sourceControlOperationRunner, Store store) { - this.accessEnforcer = accessEnforcer; - this.authenticationContext = authenticationContext; - this.namespaceAdmin = namespaceAdmin; - this.transactionRunner = transactionRunner; - this.sourceControlOperationRunner = sourceControlOperationRunner; - this.store = store; - } - - private void setNamespaces(List namespaceIds) { - this.namespaces = namespaceIds; - } - - private RepositorySourceControlMetadataStore getRepoSourceControlMetadataStore( - StructuredTableContext context) { - return RepositorySourceControlMetadataStore.create(context); - } - - @Override - protected final ScheduledExecutorService executor() { - executor = Executors.newSingleThreadScheduledExecutor( - Threads.createDaemonThreadFactory("source-control-metadata-refresh-service")); - return executor; - } - - private RepositoryTable getRepositoryTable(StructuredTableContext context) - throws TableNotFoundException { - return new RepositoryTable(context); - } - - - public RepositoryMeta getRepositoryMeta(NamespaceId namespace) - throws RepositoryNotFoundException { - accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), StandardPermission.GET); - - return TransactionRunners.run(transactionRunner, context -> { - RepositoryTable table = getRepositoryTable(context); - RepositoryMeta repoMeta = table.get(namespace); - if (repoMeta == null) { - throw new RepositoryNotFoundException(namespace); - } - - return repoMeta; - }, RepositoryNotFoundException.class); - } - - @Override - protected void runOneIteration() { - try { - if (namespaces == null) { - setNamespaces(namespaceAdmin.list().stream() - .map(meta -> meta.getNamespaceId()).collect(Collectors.toList())); - - } - - for (NamespaceId namespaceId : namespaces) { - RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig(); - - accessEnforcer.enforce(namespaceId, authenticationContext.getPrincipal(), - NamespacePermission.READ_REPOSITORY); - RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list( - new NamespaceRepository(namespaceId, repoConfig)); - - // Cleaning up the repo source control metadata table - HashSet repoFileNames = new HashSet<>(); - for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { - repoFileNames.add(repoApp.getName()); - } - TransactionRunners.run(transactionRunner, context -> { - getRepoSourceControlMetadataStore(context).cleanupRepoSourceControlMeta( - namespaceId.getNamespace(), repoFileNames); - }); - // Updating the namespace and repo source control metadata table - for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { - store.updateSourceControlMeta( - new ApplicationReference(namespaceId.getNamespace(), repoApp.getName()), - repoApp.getFileHash(), - lastRefreshTimes.get(namespaceId)); - } - lastRefreshTimes.put(namespaceId, System.currentTimeMillis()); - } - - setNamespaces(null); - } catch (RepositoryNotFoundException e) { - // LOG - } catch (NotFoundException e) { - // LOG - } catch (Exception e) { - // LOG - } - - } - - public void runRefreshService(boolean forced, NamespaceId namespace) throws Exception { - if (!forced && (System.currentTimeMillis() - lastRefreshTimes.get(namespace) - < 10 * 60 * 1000)) { - return; - } - List namespaceList = new ArrayList<>(); - namespaceList.add(namespace); - setNamespaces(namespaceList); - runOneIteration(); - } - - @Override - protected void startUp() throws Exception { - this.namespaces = namespaceAdmin.list().stream() - .map(meta -> meta.getNamespaceId()).collect(Collectors.toList()); - namespaces.forEach(n -> lastRefreshTimes.put(n, 0L)); - } - - @Override - protected Scheduler scheduler() { - return Scheduler.newFixedRateSchedule(1, 60 * 60, TimeUnit.SECONDS); - } - - @Override - protected void shutDown() throws Exception { - if (executor != null) { - executor.shutdownNow(); - } - } -} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java new file mode 100644 index 000000000000..84e18e02a1ac --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java @@ -0,0 +1,110 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.sourcecontrol; + +import com.google.common.util.concurrent.AbstractIdleService; +import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.namespace.NamespaceAdmin; +import io.cdap.cdap.internal.app.services.NamespaceSourceControlMetadataRefreshService; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import javax.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SourceControlMetadataRefreshService extends AbstractIdleService { + private ConcurrentMap refreshSchedulers = new ConcurrentHashMap<>(); + private final TransactionRunner transactionRunner; + private final NamespaceAdmin namespaceAdmin; + private final CConfiguration cConf; + private final SourceControlOperationRunner sourceControlOperationRunner; + private final Store store; + private static final Logger LOG = LoggerFactory.getLogger(SourceControlMetadataRefreshService.class); + + @Inject + public SourceControlMetadataRefreshService(TransactionRunner transactionRunner, + NamespaceAdmin namespaceAdmin, CConfiguration cConf, SourceControlOperationRunner sourceControlOperationRunner, + Store store) { + this.transactionRunner = transactionRunner; + this.namespaceAdmin = namespaceAdmin; + this.cConf = cConf; + this.sourceControlOperationRunner = sourceControlOperationRunner; + this.store = store; + } + + @Override + protected void startUp() throws Exception { + LOG.info("Starting SourceControlMetadataRefreshManager"); + + List namespaceIds = namespaceAdmin.list().stream() + .map(meta -> meta.getNamespaceId()).collect(Collectors.toList()); + for (NamespaceId namespace : namespaceIds) { + addRefreshService(namespace); + } + } + + @Override + protected void shutDown() throws Exception { + LOG.info("Shutting down SourceControlManagementService"); + + for (NamespaceSourceControlMetadataRefreshService service : refreshSchedulers.values()) { + service.stop(); + } + } + private NamespaceSourceControlMetadataRefreshService createRefreshService(NamespaceId namespace) { + return new NamespaceSourceControlMetadataRefreshService(cConf, + transactionRunner, + sourceControlOperationRunner, store, + namespace, namespaceAdmin); + } + + public void addRefreshService(NamespaceId namespaceId) { + LOG.info("Adding refresh service for " + namespaceId.getNamespace()); + refreshSchedulers.putIfAbsent(namespaceId, createRefreshService(namespaceId)); + refreshSchedulers.get(namespaceId).start(); + } + + public void removeRefreshService(NamespaceId namespaceId) { + if(refreshSchedulers.containsKey(namespaceId)) { + refreshSchedulers.get(namespaceId).stop(); + refreshSchedulers.remove(namespaceId); + } + } + + public Long getLastRefreshTime(NamespaceId namespaceId) { + if (refreshSchedulers.containsKey(namespaceId)) { + return refreshSchedulers.get(namespaceId).getLastRefreshTime(); + } + return null; + } + + public void runRefreshService(NamespaceId namespaceId) { + if (refreshSchedulers.containsKey(namespaceId)) { + refreshSchedulers.get(namespaceId).runOneIteration(); + } + else{ + addRefreshService(namespaceId); + } + } + +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java index c0430669bf90..3fb1bb431d58 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java @@ -618,9 +618,17 @@ public void markApplicationsLatest(Collection appIds) @Override public int addLatestApplication(ApplicationId id, ApplicationMeta meta) throws ConflictException { return TransactionRunners.run(transactionRunner, context -> { + ApplicationReference appRef = id.getAppReference(); + RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context); getNamespaceSourceControlMetadataStore(context).write(id.getAppReference(), meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta() : SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true).build()); + if (repoStore.get(appRef) != null) { + repoStore.write(appRef, meta.getSourceControlMeta() != null , + meta.getSourceControlMeta() == null || + meta.getSourceControlMeta().getLastSyncedAt() == null ? null : + meta.getSourceControlMeta().getLastSyncedAt().toEpochMilli()); + } return getAppMetadataStore(context).createLatestApplicationVersion(id, meta); }, ConflictException.class); } @@ -629,9 +637,18 @@ public int addLatestApplication(ApplicationId id, ApplicationMeta meta) throws C public int addApplication(ApplicationId id, ApplicationMeta meta, boolean isLatest) throws ConflictException { return TransactionRunners.run(transactionRunner, context -> { - getNamespaceSourceControlMetadataStore(context).write(id.getAppReference(), + ApplicationReference appRef = id.getAppReference(); + RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context); + // TODO - REFACTOR THIS + getNamespaceSourceControlMetadataStore(context).write(appRef, meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta() : SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true).build()); + if (repoStore.get(appRef) != null) { + repoStore.write(appRef, meta.getSourceControlMeta() != null , + meta.getSourceControlMeta() == null || + meta.getSourceControlMeta().getLastSyncedAt() == null ? null : + meta.getSourceControlMeta().getLastSyncedAt().toEpochMilli()); + } return getAppMetadataStore(context).createApplicationVersion(id, meta, isLatest); }, ConflictException.class); } @@ -643,12 +660,18 @@ public void updateApplicationSourceControlMeta( TransactionRunners.run(transactionRunner, context -> { NamespaceSourceControlMetadataStore sourceControlMetadataStore = getNamespaceSourceControlMetadataStore( context); + RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context); AppMetadataStore appMetadataStore = getAppMetadataStore(context); for (Map.Entry updateRequest : updateRequests.entrySet()) { ApplicationId appId = updateRequest.getKey(); if (appMetadataStore.getApplication(appId) != null) { sourceControlMetadataStore.write(appId.getAppReference(), SourceControlMeta.builder(updateRequest.getValue()).setSyncStatus(true).build()); + if (repoStore.get(appId.getAppReference()) != null) { + repoStore.write(appId.getAppReference(), true, + updateRequest.getValue().getLastSyncedAt() == null ? 0L : + updateRequest.getValue().getLastSyncedAt().toEpochMilli()); + } } } }, IOException.class); @@ -955,10 +978,16 @@ public Map getApplications(Collection { getNamespaceSourceControlMetadataStore(context).write(appRef, SourceControlMeta.builder(sourceControlMeta).setSyncStatus(true).build()); + RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context); + if (repoStore.get(appRef) != null) { + repoStore.write(appRef, true, sourceControlMeta.getLastSyncedAt() == null ? null : + sourceControlMeta.getLastSyncedAt().toEpochMilli()); + } }); } @@ -993,39 +1022,6 @@ public ApplicationMeta getLatest(ApplicationReference appRef) { }); } - @Override - public void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash, Long lastRefreshTime) { - TransactionRunners.run(transactionRunner, context -> { - RepositorySourceControlMetadataStore repoMetadataStore = getRepoSourceControlMetadataStore( - context); - NamespaceSourceControlMetadataStore namespaceMetadataStore = getNamespaceSourceControlMetadataStore( - context); - SourceControlMeta namespaceSourceControlMeta = namespaceMetadataStore.get(appRef); - - if (namespaceSourceControlMeta == null || namespaceSourceControlMeta.getFileHash() == null) { - repoMetadataStore.write(appRef, false, 0L); - return; - } - if (namespaceSourceControlMeta.getLastSyncedAt() != null && namespaceSourceControlMeta.getLastSyncedAt().isAfter( - Instant.ofEpochMilli(lastRefreshTime))) { - repoMetadataStore.write(appRef, - namespaceSourceControlMeta.getSyncStatus(), namespaceSourceControlMeta.getLastSyncedAt().toEpochMilli()); - return; - } - Boolean isSynced = namespaceSourceControlMeta.getFileHash().equals(repoFileHash); - if (isSynced) { - namespaceMetadataStore.write(appRef, - new SourceControlMeta(namespaceSourceControlMeta.getFileHash(), - namespaceSourceControlMeta.getCommitId(), - namespaceSourceControlMeta.getLastSyncedAt(), isSynced)); - repoMetadataStore.write(appRef, - isSynced, namespaceSourceControlMeta.getLastSyncedAt().toEpochMilli()); - } else { - repoMetadataStore.write(appRef, isSynced, 0L); - } - }); - } - @Override public int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request, Consumer consumer) { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/NamespaceSourceControlMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/NamespaceSourceControlMetadataStore.java index 285f8e6239a2..204ae925aec6 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/NamespaceSourceControlMetadataStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/NamespaceSourceControlMetadataStore.java @@ -371,4 +371,15 @@ void deleteNamespaceSourceControlMetadataTable() throws IOException { Range.Bound.INCLUSIVE)); } + @VisibleForTesting + public List getAll(String namespace, String type) + throws IOException { + ScanSourceControlMetadataRequest request = ScanSourceControlMetadataRequest.builder() + .setNamespace(namespace).build(); + List records = new ArrayList<>(); + scan(request, type, records::add); + return records; + } + + } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStore.java index e140de59e2b5..87f9d0a453d9 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/RepositorySourceControlMetadataStore.java @@ -151,7 +151,7 @@ public int scan(ScanSourceControlMetadataRequest request, * @param lastSynced The timestamp of the last git pull/push of the entity * @throws IOException If an I/O error occurs while writing the source control metadata. */ - public void write(ApplicationReference appRef, Boolean isSynced, + public void write(ApplicationReference appRef, boolean isSynced, Long lastSynced) throws IOException { StructuredTable repoTable = getRepositorySourceControlMetadataTable(); repoTable.upsert(getAllFields(appRef, isSynced, lastSynced)); @@ -169,32 +169,6 @@ public void delete(ApplicationReference appRef) throws IOException { getPrimaryKey(appRef)); } - /** - * Cleans up repository source control metadata. This method removes metadata records for - * repository files that are no longer present in the repository. - * - * @param namespace The namespace for which to clean up the repository source control metadata. - * @param repoFiles The set of repository files to retain metadata records for. - * @throws IOException If an I/O error occurs during the cleanup process. - */ - public void cleanupRepoSourceControlMeta(String namespace, HashSet repoFiles) - throws IOException { - ScanSourceControlMetadataRequest request = ScanSourceControlMetadataRequest - .builder() - .setNamespace(namespace) - .setLimit(Integer.MAX_VALUE) - .build(); - ArrayList records = new ArrayList<>(); - String type = EntityType.APPLICATION.toString(); - scan(request, type, records::add); - for (SourceControlMetadataRecord record : records) { - if (!repoFiles.contains(record.getName())) { - delete( - new ApplicationReference(record.getNamespace(), record.getName())); - } - } - } - private CloseableIterator getScanIterator( StructuredTable table, Range range, @@ -229,7 +203,7 @@ private Collection> getAllFields(ApplicationReference appRef, List> fields = getPrimaryKey(appRef); fields.add( Fields.longField(StoreDefinition.RepositorySourceControlMetadataStore.LAST_MODIFIED_FIELD, - lastSynced)); + lastSynced == null ? 0L : lastSynced)); fields.add( Fields.booleanField(StoreDefinition.RepositorySourceControlMetadataStore.IS_SYNCED_FIELD, isSynced)); @@ -271,8 +245,7 @@ private List> getPrimaryKey(ApplicationReference appRef) { return primaryKey; } - @VisibleForTesting - ImmutablePair get(ApplicationReference appRef) throws IOException { + public ImmutablePair get(ApplicationReference appRef) throws IOException { List> primaryKey = getPrimaryKey(appRef); StructuredTable table = getRepositorySourceControlMetadataTable(); return table.read(primaryKey).map(row -> { @@ -297,4 +270,12 @@ public void deleteAll(String namespace) throws IOException { ImmutableList.of( Fields.stringField(StoreDefinition.AppMetadataStore.NAMESPACE_FIELD, namespace)))); } + + @VisibleForTesting + public List getAll(String namespace, String type) throws IOException { + ScanSourceControlMetadataRequest request = ScanSourceControlMetadataRequest.builder().setNamespace(namespace).build(); + List records = new ArrayList<>(); + scan(request, type, records::add); + return records; + } } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshServiceTest.java new file mode 100644 index 000000000000..ca9c0810d879 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/NamespaceSourceControlMetadataRefreshServiceTest.java @@ -0,0 +1,331 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.services; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.Injector; +import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.common.NotFoundException; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.SourceControlManagement; +import io.cdap.cdap.common.id.Id; +import io.cdap.cdap.common.id.Id.Namespace; +import io.cdap.cdap.common.namespace.NamespaceAdmin; +import io.cdap.cdap.internal.AppFabricTestHelper; +import io.cdap.cdap.internal.app.store.NamespaceSourceControlMetadataStore; +import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; +import io.cdap.cdap.proto.SourceControlMetadataRecord; +import io.cdap.cdap.proto.element.EntityType; +import io.cdap.cdap.proto.id.ApplicationReference; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.sourcecontrol.AuthConfig; +import io.cdap.cdap.proto.sourcecontrol.AuthType; +import io.cdap.cdap.proto.sourcecontrol.PatConfig; +import io.cdap.cdap.proto.sourcecontrol.Provider; +import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig; +import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta; +import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta; +import io.cdap.cdap.sourcecontrol.ApplicationManager; +import io.cdap.cdap.sourcecontrol.AuthenticationConfigException; +import io.cdap.cdap.sourcecontrol.NoChangesToPushException; +import io.cdap.cdap.sourcecontrol.operationrunner.MultiPullAppOperationRequest; +import io.cdap.cdap.sourcecontrol.operationrunner.MultiPushAppOperationRequest; +import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository; +import io.cdap.cdap.sourcecontrol.operationrunner.PullAppOperationRequest; +import io.cdap.cdap.sourcecontrol.operationrunner.PullAppResponse; +import io.cdap.cdap.sourcecontrol.operationrunner.PushAppOperationRequest; +import io.cdap.cdap.sourcecontrol.operationrunner.PushAppsResponse; +import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp; +import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse; +import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import io.cdap.cdap.store.RepositoryTable; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; + +public class NamespaceSourceControlMetadataRefreshServiceTest { + + private static TransactionRunner transactionRunner; + private static CConfiguration cConf; + private static NamespaceAdmin namespaceAdmin; + private static NamespaceId NAMESPACE_ID = new NamespaceId(Id.Namespace.DEFAULT.getId()); + private static RepositoryConfig REPO_CONFIG = new RepositoryConfig.Builder().setProvider(Provider.GITHUB) + .setLink("testUrl") + .setAuth(new AuthConfig(AuthType.PAT, new PatConfig("password", "user"))) + .build(); + private static NamespaceSourceControlMetadataRefreshService refreshService; + private static final String TYPE = EntityType.APPLICATION.toString(); + private static Store store; + + @BeforeClass + public static void beforeClass() throws Exception { + Injector injector = AppFabricTestHelper.getInjector(); + AppFabricTestHelper.ensureNamespaceExists(NAMESPACE_ID.DEFAULT); + store = injector.getInstance(Store.class); + transactionRunner = injector.getInstance(TransactionRunner.class); + cConf = injector.getInstance(CConfiguration.class); + namespaceAdmin = injector.getInstance(NamespaceAdmin.class); + cConf.set(SourceControlManagement.METADATA_REFRESH_INTERVAL_SECONDS, "300"); + cConf.set(SourceControlManagement.METADATA_BUFFER_SECONDS, "120"); + refreshService = new NamespaceSourceControlMetadataRefreshService( + cConf, transactionRunner, sourceControlOperationRunnerSpy, store, NAMESPACE_ID, + namespaceAdmin); + } + + private static final Instant fixedInstant = Instant.ofEpochSecond(1646358109); + private static final Instant instantGreaterThanStartRefresh = Instant.ofEpochMilli(System.currentTimeMillis() + 100000000); + private static final SourceControlOperationRunner sourceControlOperationRunnerSpy = + spy(new MockSourceControlOperationRunner()); + + @Test + public void testRefresh() throws Exception { + RepositoryApp app1 = new RepositoryApp("app1", "hash1"); + RepositoryApp app3 = new RepositoryApp("app3", "hash3"); + RepositoryApp app4 = new RepositoryApp("app4", "hash4"); + RepositoryApp app5 = new RepositoryApp("app5", "hash5"); + RepositoryApp app6 = new RepositoryApp("app6", "hash6"); + RepositoryApp app7 = new RepositoryApp("app7", "hashDiff"); + RepositoryAppsResponse expectedListResult = new RepositoryAppsResponse( + Arrays.asList(app1, app3, app4, app5, app6, app7)); + + RepositoryConfig repoConfig = new RepositoryConfig.Builder().setProvider(Provider.GITHUB) + .setLink("testUrl") + .setAuth(new AuthConfig(AuthType.PAT, new PatConfig("password", "user"))) + .build(); + setRepository(); + + doReturn(expectedListResult) + .when(sourceControlOperationRunnerSpy).list(Mockito.any(NamespaceRepository.class)); + + insertRepoSourceControlMetadataTests(); + insertNamespaceSourceControlTests(); + + List updatedRepoRecords = getUpdatedRepoRecords(); + List updatedNamespaceRecords = getUpdatedNamespaceRecords(); + List expectedRecords = new ArrayList<>(); + List gotRecords = new ArrayList<>(); + + refreshService.runOneIteration(); + + // Checking the all apps stored in repo source control table + gotRecords = getRepoRecords(NAMESPACE_ID.getNamespace()); + expectedRecords = updatedRepoRecords; + + Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); + + // Checking the all apps stored in namespace source control table + gotRecords = getNamespaceRecords(NAMESPACE_ID.getNamespace()); + expectedRecords = updatedNamespaceRecords; + + Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); + + deleteAll(); + + } + + private void deleteAllRepoSourceControlRecords(String namespace) { + TransactionRunners.run(transactionRunner, context -> { + RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( + context); + store.deleteAll(namespace); + }); + } + + + + private List getUpdatedRepoRecords() { + SourceControlMetadataRecord record1 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app1", null, null, fixedInstant.toEpochMilli(), true); + SourceControlMetadataRecord record2 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app3", null, null, null, false); + SourceControlMetadataRecord record3 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app4", null, null, instantGreaterThanStartRefresh.toEpochMilli(), true); + SourceControlMetadataRecord record4 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app5", null, null, instantGreaterThanStartRefresh.toEpochMilli(), false); + SourceControlMetadataRecord record5 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app6", null, null, fixedInstant.toEpochMilli(), true); + SourceControlMetadataRecord record6 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app7", null, null, fixedInstant.toEpochMilli(), false); + List insertedRecords = Arrays.asList(record1, record2, record3, record4, + record5, record6); + return insertedRecords; + } + + private List getUpdatedNamespaceRecords() { + SourceControlMetadataRecord record1 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app1", "hash1", "commit1", + fixedInstant.toEpochMilli(), true); + SourceControlMetadataRecord record2 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app4", "hash4", "commit1", instantGreaterThanStartRefresh.toEpochMilli(), true); + SourceControlMetadataRecord record3 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app5", "hash5", "commit1", instantGreaterThanStartRefresh.toEpochMilli(), false); + SourceControlMetadataRecord record4 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app6", "hash6", "commit1", + fixedInstant.toEpochMilli(), true); + SourceControlMetadataRecord record5 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), + TYPE, "app7", "hash7", "commit1", + fixedInstant.toEpochMilli(), false); + List insertedRecords = Arrays.asList(record1, record2, record3, record4, + record5); + return insertedRecords; + } + + private void insertRepoSourceControlMetadataTests() { + insertRepoRecord(Namespace.DEFAULT.getId(), "app1", TYPE, null, null, 0L, false); + insertRepoRecord(Namespace.DEFAULT.getId(), "app2", TYPE, null, null, + Instant.now().toEpochMilli(), false); + } + + private void insertRepoRecord(String namespace, String name, String type, + String specHash, String commitId, Long lastModified, Boolean isSycned) { + TransactionRunners.run(transactionRunner, context -> { + RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( + context); + store.write(new ApplicationReference(namespace, name), isSycned, lastModified); + }); + } + + private void insertNamespaceSourceControlTests() { + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app1", TYPE, "hash1", "commit1", + fixedInstant.toEpochMilli(), false); + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app4", TYPE, "hash4", "commit1", + instantGreaterThanStartRefresh.toEpochMilli(), true); + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app5", TYPE, "hash5", "commit1", + instantGreaterThanStartRefresh.toEpochMilli(), false); + + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app6", TYPE, "hash6", "commit1", + fixedInstant.toEpochMilli(), false); + insertNamespaceRecord(Namespace.DEFAULT.getId(), "app7", TYPE, "hash7", "commit1", + fixedInstant.toEpochMilli(), false); + } + + private void insertNamespaceRecord(String namespace, String name, String type, + String specHash, String commitId, Long lastModified, Boolean isSycned) { + TransactionRunners.run(transactionRunner, context -> { + NamespaceSourceControlMetadataStore store = NamespaceSourceControlMetadataStore.create( + context); + store.write(new ApplicationReference(namespace, name), + new SourceControlMeta(specHash, commitId, Instant.ofEpochMilli(lastModified), isSycned)); + }); + } + + private List getRepoRecords(String namespace) { + AtomicReference> records = new AtomicReference<>( + new ArrayList<>()); + TransactionRunners.run(transactionRunner, context -> { + RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( + context); + records.set(store.getAll(namespace, TYPE)); + }); + return records.get(); + } + + private List getNamespaceRecords(String namespace) { + AtomicReference> records = new AtomicReference<>( + new ArrayList<>()); + TransactionRunners.run(transactionRunner, context -> { + NamespaceSourceControlMetadataStore store = NamespaceSourceControlMetadataStore.create( + context); + records.set(store.getAll(namespace, TYPE)); + }); + return records.get(); + } + + private void setRepository() { + TransactionRunners.run(transactionRunner, context -> { + RepositoryTable store = new RepositoryTable(context); + store.create(NAMESPACE_ID, REPO_CONFIG); + }); + } + + private void deleteAll() { + TransactionRunners.run(transactionRunner, context -> { + RepositoryTable repoTableStore = new RepositoryTable(context); + repoTableStore.delete(NAMESPACE_ID); + NamespaceSourceControlMetadataStore namespaceStore = NamespaceSourceControlMetadataStore.create(context); + namespaceStore.deleteAll(NAMESPACE_ID.getNamespace()); + RepositorySourceControlMetadataStore repoStore = RepositorySourceControlMetadataStore.create(context); + repoStore.deleteAll(NAMESPACE_ID.getNamespace()); + }); + } + + /** + * A Mock {@link SourceControlOperationRunner} that can be used for tests. + */ + private static class MockSourceControlOperationRunner extends + AbstractIdleService implements + SourceControlOperationRunner { + + @Override + public PushAppsResponse push(PushAppOperationRequest pushAppOperationRequest) + throws NoChangesToPushException, AuthenticationConfigException { + return null; + } + + @Override + public PushAppsResponse multiPush(MultiPushAppOperationRequest pushRequest, + ApplicationManager appManager) + throws NoChangesToPushException, AuthenticationConfigException { + return null; + } + + @Override + public PullAppResponse pull(PullAppOperationRequest pullRequest) + throws NotFoundException, AuthenticationConfigException { + return null; + } + + @Override + public void multiPull(MultiPullAppOperationRequest pullRequest, + Consumer> consumer) + throws NotFoundException, AuthenticationConfigException { + } + + @Override + public RepositoryAppsResponse list(NamespaceRepository nameSpaceRepository) + throws AuthenticationConfigException, NotFoundException { + return null; + } + + @Override + protected void startUp() throws Exception { + // no-op. + } + + @Override + protected void shutDown() throws Exception { + // no-op. + } + } +} \ No newline at end of file diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlManagementServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlManagementServiceTest.java index f43fefc96ca3..e785cf38131f 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlManagementServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlManagementServiceTest.java @@ -29,6 +29,7 @@ import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest; import io.cdap.cdap.app.store.SourceControlMetadataFilter; import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.app.store.SyncStatus; import io.cdap.cdap.common.ApplicationNotFoundException; import io.cdap.cdap.common.NamespaceNotFoundException; import io.cdap.cdap.common.NotFoundException; @@ -39,6 +40,7 @@ import io.cdap.cdap.common.id.Id.Namespace; import io.cdap.cdap.common.namespace.NamespaceAdmin; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.NamespaceSourceControlMetadataStore; import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; import io.cdap.cdap.internal.operation.OperationLifecycleManager; @@ -87,8 +89,10 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; import org.junit.Assert; @@ -104,7 +108,6 @@ public class SourceControlManagementServiceTest extends AppFabricTestBase { private static CConfiguration cConf; private static final String TYPE = EntityType.APPLICATION.toString(); private static NamespaceAdmin namespaceAdmin; - private static SourceControlMetadataRefreshService sourceControlMetadataRefreshService; private static SourceControlManagementService sourceControlService; private static final RepositoryConfig REPOSITORY_CONFIG = new RepositoryConfig.Builder() .setProvider(Provider.GITHUB) @@ -149,13 +152,15 @@ public SourceControlManagementService provideSourceControlManagementService( SourceControlOperationRunner sourceControlRunner, ApplicationLifecycleService applicationLifecycleService, Store store, MetricsCollectionService metricsCollectionService, - OperationLifecycleManager manager, SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { + OperationLifecycleManager manager, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { return new SourceControlManagementService(cConf, secureStore, transactionRunner, accessEnforcer, authenticationContext, sourceControlRunner, applicationLifecycleService, store, manager, metricsCollectionService, - Clock.fixed(fixedInstant, ZoneId.systemDefault()), sourceControlMetadataRefreshService); + Clock.fixed(fixedInstant, ZoneId.systemDefault()), + sourceControlMetadataRefreshService); } }); } @@ -547,20 +552,16 @@ public void testOperationRunnerStarted() { @Test public void testScanRepoMetadata() throws Exception { RepositoryApp app1 = new RepositoryApp("app1", "hash1"); - RepositoryApp app3 = new RepositoryApp("app3", "hash3"); - RepositoryApp app4 = new RepositoryApp("app4", "hash4"); RepositoryAppsResponse expectedListResult = new RepositoryAppsResponse( - Arrays.asList(app1, app3, app4)); + Collections.singletonList(app1)); NamespaceId namespaceId = new NamespaceId(Id.Namespace.DEFAULT.getId()); sourceControlService.setRepository(namespaceId, REPOSITORY_CONFIG); Mockito.doReturn(expectedListResult) .when(sourceControlOperationRunnerSpy).list(Mockito.any(NamespaceRepository.class)); - insertRepoSourceControlMetadataTests(); - insertNamespaceSourceControlTests(); List gotRecords = new ArrayList<>(); - List insertedRecords = getInsertedRecords(); + List insertedRecords = insertRepoSourceControlMetadataTests(); List expectedRecords = new ArrayList<>(); // verify the scan without filters picks all apps for default namespace @@ -578,16 +579,16 @@ public void testScanRepoMetadata() throws Exception { expectedRecords = insertedRecords.stream().limit(2).collect(Collectors.toList()); Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); - // verify pageToken with limit + // verify pageToken with sync status filter gotRecords.clear(); request = ScanSourceControlMetadataRequest.builder().setNamespace(Namespace.DEFAULT.getId()) - .setLimit(5).setScanAfter("app3").build(); + .setFilter(new SourceControlMetadataFilter(null, true)).build(); sourceControlService.scanRepoMetadata(request, batchSize, gotRecords::add); - expectedRecords = insertedRecords.stream().filter(rec -> rec.getName().equals("app4")).collect( + expectedRecords = insertedRecords.stream().filter(rec -> rec.getIsSynced()).collect( Collectors.toList()); Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); - // verify pageToken with limit + // verify pageToken with name filter gotRecords.clear(); request = ScanSourceControlMetadataRequest.builder().setNamespace(Namespace.DEFAULT.getId()) .setFilter(new SourceControlMetadataFilter("1", null)).build(); @@ -611,42 +612,31 @@ public void testScanRepoMetadata() throws Exception { .collect(Collectors.toList()); Assert.assertArrayEquals(expectedRecords.toArray(), gotRecords.toArray()); - deleteAllNamespaceSourceControlRecords(Namespace.DEFAULT.getId()); deleteAllRepoSourceControlRecords(Namespace.DEFAULT.getId()); sourceControlService.deleteRepository(new NamespaceId(Namespace.DEFAULT.getId())); } - private void deleteAllNamespaceSourceControlRecords(String namespace) { + private void deleteAllRepoSourceControlRecords(String namespace) { TransactionRunners.run(transactionRunner, context -> { - NamespaceSourceControlMetadataStore store = NamespaceSourceControlMetadataStore.create( + RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( context); store.deleteAll(namespace); }); } - private void deleteAllRepoSourceControlRecords(String namespace) { + private List insertRepoSourceControlMetadataTests() { + AtomicReference> records = new AtomicReference<>( + new ArrayList<>()); + for (int i =0; i < 10; i++) { + insertRepoRecord(Namespace.DEFAULT.getId(), "app" + i, TYPE, null, null, + Instant.now().toEpochMilli(), i % 3 == 0 ? true : false); + } TransactionRunners.run(transactionRunner, context -> { RepositorySourceControlMetadataStore store = RepositorySourceControlMetadataStore.create( context); - store.deleteAll(namespace); + records.set(store.getAll(Namespace.DEFAULT.getId(), TYPE)); }); - } - - private List getInsertedRecords() { - SourceControlMetadataRecord record1 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), - TYPE, "app1", null, null, fixedInstant.toEpochMilli(), true); - SourceControlMetadataRecord record2 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), - TYPE, "app3", null, null, null, false); - SourceControlMetadataRecord record3 = new SourceControlMetadataRecord(Namespace.DEFAULT.getId(), - TYPE, "app4", null, null, fixedInstant2.toEpochMilli(), true); - List insertedRecords = Arrays.asList(record1, record2, record3); - return insertedRecords; - } - - private void insertRepoSourceControlMetadataTests() { - insertRepoRecord(Namespace.DEFAULT.getId(), "app1", TYPE, null, null, 0L, false); - insertRepoRecord(Namespace.DEFAULT.getId(), "app2", TYPE, null, null, - Instant.now().toEpochMilli(), false); + return records.get(); } private void insertRepoRecord(String namespace, String name, String type, @@ -658,29 +648,11 @@ private void insertRepoRecord(String namespace, String name, String type, }); } - private void insertNamespaceSourceControlTests() { - insertNamespaceRecord(Namespace.DEFAULT.getId(), "app1", TYPE, "hash1", "commit1", - fixedInstant.toEpochMilli(), false); - insertNamespaceRecord(Namespace.DEFAULT.getId(), "app3", TYPE, "hash4", "commit1", - fixedInstant.toEpochMilli(), false); - insertNamespaceRecord(Namespace.DEFAULT.getId(), "app4", TYPE, "hash4", "commit1", - fixedInstant2.toEpochMilli(), true); - } - - private void insertNamespaceRecord(String namespace, String name, String type, - String specHash, String commitId, Long lastModified, Boolean isSycned) { - TransactionRunners.run(transactionRunner, context -> { - NamespaceSourceControlMetadataStore store = NamespaceSourceControlMetadataStore.create( - context); - store.write(new ApplicationReference(namespace, name), - new SourceControlMeta(specHash, commitId, Instant.ofEpochMilli(lastModified), isSycned)); - }); - } /** * A Mock {@link SourceControlOperationRunner} that can be used for tests. */ - private static class MockSourceControlOperationRunner extends + static class MockSourceControlOperationRunner extends AbstractIdleService implements SourceControlOperationRunner { diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlMetadataRefreshServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlMetadataRefreshServiceTest.java deleted file mode 100644 index 412fd7d8b744..000000000000 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/SourceControlMetadataRefreshServiceTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright © 2024 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.internal.app.services; - -import com.google.common.util.concurrent.AbstractScheduledService; -import io.cdap.cdap.common.NotFoundException; -import org.junit.Test; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.runners.MockitoJUnitRunner; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -@RunWith(MockitoJUnitRunner.class) -public class SourceControlMetadataRefreshServiceTest { - @Mock - private SourceControlMetadataRefreshService sourceControlMetadataRefreshService; - @Before - public void setUp() { - //sourceControlMetadataRefreshService = new SourceControlMetadataRefreshService(); - } - @After - public void tearDown() { - // sourceControlMetadataRefreshService.stop(); - } - @Test - public void testStartUp() throws Exception { - sourceControlMetadataRefreshService.startUp(); - verify(sourceControlMetadataRefreshService, times(1)).startUp(); - } - @Test - public void testRunOneIteration() throws NotFoundException { -// sourceControlMetadataRefreshService.startAsync().awaitRunning(); - sourceControlMetadataRefreshService.runOneIteration(); - verify(sourceControlMetadataRefreshService, times(1)).runOneIteration(); - } - - @Test - public void testDuetNegativePeriod() { - try { - SourceControlMetadataRefreshService.Scheduler.newFixedRateSchedule(1, -5, TimeUnit.SECONDS); - // If the above line does not throw an exception, fail the test - fail("Expected IllegalArgumentException was not thrown"); - } catch (IllegalArgumentException e) { - // The expected exception was thrown, so the test passes - } - } - - -// @Test(expected = IllegalArgumentException.class) -// public void testNegativePeriod() { -// SourceControlMetadataRefreshService service = new SourceControlMetadataRefreshService(); -// -// try { -// service.start(); // This should throw IllegalArgumentException -// } finally { -// // executor.shutdownNow(); -// service.stop(); -// } -// } - -} \ No newline at end of file diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java index 11fcae89246b..3559b8e6905f 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java @@ -55,6 +55,7 @@ import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepository; import io.cdap.cdap.internal.app.services.ApplicationLifecycleService; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.store.state.AppStateKey; import io.cdap.cdap.internal.app.store.state.AppStateKeyValue; import io.cdap.cdap.internal.capability.CapabilityReader; @@ -109,7 +110,6 @@ public class AppLifecycleHttpHandlerTest extends AppFabricTestBase { private static CConfiguration cConf; private static final String FEATURE_FLAG_PREFIX = "feature."; - @BeforeClass public static void beforeClass() throws Throwable { cConf = createBasicCconf(); @@ -143,12 +143,14 @@ public ApplicationLifecycleService createLifeCycleService(CConfiguration cConf, MetadataServiceClient metadataServiceClient, AccessEnforcer accessEnforcer, AuthenticationContext authenticationContext, MessagingService messagingService, Impersonator impersonator, - CapabilityReader capabilityReader, TransactionRunner transactionRunner) { + CapabilityReader capabilityReader, TransactionRunner transactionRunner, + SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { return Mockito.spy(new ApplicationLifecycleService(cConf, store, scheduler, usageRegistry, preferencesService, metricsSystemClient, ownerAdmin, artifactRepository, managerFactory, metadataServiceClient, accessEnforcer, authenticationContext, - messagingService, impersonator, capabilityReader, new NoOpMetricsCollectionService(), transactionRunner)); + messagingService, impersonator, capabilityReader, new NoOpMetricsCollectionService(), transactionRunner, + sourceControlMetadataRefreshService)); } }); } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java index cc0ab0f7dda1..ec2ec69ebf9c 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java @@ -36,8 +36,8 @@ import io.cdap.cdap.gateway.handlers.SourceControlManagementHttpHandler; import io.cdap.cdap.internal.app.services.ApplicationLifecycleService; import io.cdap.cdap.internal.app.services.SourceControlManagementService; -import io.cdap.cdap.internal.app.services.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; +import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService; import io.cdap.cdap.internal.operation.OperationLifecycleManager; import io.cdap.cdap.metadata.MetadataSubscriberService; import io.cdap.cdap.proto.ApplicationRecord; @@ -125,19 +125,19 @@ protected void configure() { @Singleton public SourceControlManagementService provideSourceControlManagementService( CConfiguration cConf, - SecureStore secureStore, - TransactionRunner transactionRunner, - AccessEnforcer accessEnforcer, - AuthenticationContext authenticationContext, - SourceControlOperationRunner sourceControlRunner, - ApplicationLifecycleService applicationLifecycleService, - Store store, OperationLifecycleManager manager, MetricsCollectionService metricsService, + SecureStore secureStore, + TransactionRunner transactionRunner, + AccessEnforcer accessEnforcer, + AuthenticationContext authenticationContext, + SourceControlOperationRunner sourceControlRunner, + ApplicationLifecycleService applicationLifecycleService, + Store store, OperationLifecycleManager manager, MetricsCollectionService metricsService, SourceControlMetadataRefreshService sourceControlMetadataRefreshService) { return Mockito.spy(new SourceControlManagementService(cConf, secureStore, transactionRunner, - accessEnforcer, authenticationContext, - sourceControlRunner, applicationLifecycleService, - store, manager, metricsService, sourceControlMetadataRefreshService)); + accessEnforcer, authenticationContext, + sourceControlRunner, applicationLifecycleService, + store, manager, metricsService, sourceControlMetadataRefreshService)); } }); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index c5b98e048b22..2f8aab8a05ed 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -2482,6 +2482,10 @@ public static final class SourceControlManagement { public static final String REPOSITORY_TTL_SECONDS = "source.control.repository.ttl.seconds"; public static final String METADATA_MIGRATION_INTERVAL_SECONDS = "source.control.metadata.migration.interval.seconds"; + public static final String METADATA_REFRESH_INTERVAL_SECONDS = + "source.control.metadata.refresh.interval.seconds"; + public static final String METADATA_BUFFER_SECONDS = + "source.control.metadata.buffer.seconds"; } /** diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 9194e492791b..4121023283c6 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -6132,6 +6132,22 @@ The service only retries when the migration job fails. + + source.control.metadata.refresh.interval.seconds + 7200 + + Scheduler interval for SCM metadata refresh service, by default 2 hrs + + + + source.control.metadata.buffer.seconds + 300 + + Buffer time interval for SCM metadata refresh service, by default, + is set to 5 minutes. If a new refresh request occurs within the + buffer time span since the last request, it will be skipped. + + diff --git a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java index a1b79703135f..44562203f772 100644 --- a/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java +++ b/cdap-features/src/main/java/io/cdap/cdap/features/Feature.java @@ -44,7 +44,8 @@ public enum Feature { WRANGLER_SCHEMA_MANAGEMENT("6.10.0"), NAMESPACED_SERVICE_ACCOUNTS("6.10.0"), WRANGLER_KRYO_SERIALIZATION("6.10.1"), - SOURCE_CONTROL_MANAGEMENT_GITLAB_BITBUCKET("6.10.1"); + SOURCE_CONTROL_MANAGEMENT_GITLAB_BITBUCKET("6.10.1"), + SOURCE_CONTROL_METADATA_AUTO_REFRESH("6.10.1"); private final PlatformInfo.Version versionIntroduced; private final boolean defaultAfterIntroduction; diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/SourceControlMetadataDetail.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/SourceControlMetadataDetail.java deleted file mode 100644 index 08d09c0ff45b..000000000000 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/SourceControlMetadataDetail.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright © 2024 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.proto; - -/** - * This class is used for responses of API calls that fetch source control metadata. - */ -public class SourceControlMetadataDetail { - - private final String name; - private final Long lastModified; - private final Boolean syncStatus; - - - /** - * Represents detailed information about source control metadata. - * - * @param name The name of the source control metadata. - * @param lastModified The timestamp when the metadata was last modified. - * @param syncStatus The synchronization status of the metadata with the source control system. - */ - public SourceControlMetadataDetail(String name, Long lastModified, Boolean syncStatus) { - this.name = name; - this.lastModified = lastModified; - this.syncStatus = syncStatus; - } - - public String getName() { - return name; - } - - public Boolean getSyncStatus() { - return syncStatus; - } - - public Long getLastModified() { - return lastModified; - } - - @Override - public String toString() { - return "SourceControlMetadataDetail{" - + "name='" + name + '\'' - + ", lastModified=" + lastModified - + ", syncStatus='" + syncStatus + '\'' - + '}'; - } -}