-
Notifications
You must be signed in to change notification settings - Fork 345
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added implementation for periodic refresh of soure control metadata
- Loading branch information
1 parent
964d7a4
commit c742f77
Showing
10 changed files
with
424 additions
and
37 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
26 changes: 26 additions & 0 deletions
26
cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/SyncStatus.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright © 2024 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.cdap.app.store; | ||
|
||
/** | ||
* Represents the sync status of source control metadata. | ||
* It is used in filtering. | ||
*/ | ||
public enum SyncStatus { | ||
SYNCED, | ||
UNSYNCED | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
197 changes: 197 additions & 0 deletions
197
...src/main/java/io/cdap/cdap/internal/app/services/SourceControlMetadataRefreshService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,197 @@ | ||
/* | ||
* Copyright © 2024 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.cdap.internal.app.services; | ||
|
||
import com.google.common.util.concurrent.AbstractScheduledService; | ||
import io.cdap.cdap.app.store.Store; | ||
import io.cdap.cdap.common.NotFoundException; | ||
import io.cdap.cdap.common.RepositoryNotFoundException; | ||
import io.cdap.cdap.common.namespace.NamespaceAdmin; | ||
import io.cdap.cdap.internal.app.store.RepositorySourceControlMetadataStore; | ||
import io.cdap.cdap.proto.id.ApplicationReference; | ||
import io.cdap.cdap.proto.id.NamespaceId; | ||
import io.cdap.cdap.proto.security.NamespacePermission; | ||
import io.cdap.cdap.proto.security.StandardPermission; | ||
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig; | ||
import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta; | ||
import io.cdap.cdap.security.spi.authentication.AuthenticationContext; | ||
import io.cdap.cdap.security.spi.authorization.AccessEnforcer; | ||
import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository; | ||
import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryApp; | ||
import io.cdap.cdap.sourcecontrol.operationrunner.RepositoryAppsResponse; | ||
import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; | ||
import io.cdap.cdap.spi.data.StructuredTableContext; | ||
import io.cdap.cdap.spi.data.TableNotFoundException; | ||
import io.cdap.cdap.spi.data.transaction.TransactionRunner; | ||
import io.cdap.cdap.spi.data.transaction.TransactionRunners; | ||
import io.cdap.cdap.store.RepositoryTable; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
import org.apache.twill.common.Threads; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class SourceControlMetadataRefreshService extends AbstractScheduledService { | ||
|
||
private List<NamespaceId> namespaces; | ||
private HashMap<NamespaceId, Long> lastRefreshTimes; | ||
|
||
private final AccessEnforcer accessEnforcer; | ||
private final AuthenticationContext authenticationContext; | ||
|
||
private final NamespaceAdmin namespaceAdmin; | ||
private ScheduledExecutorService executor; | ||
private final TransactionRunner transactionRunner; | ||
private final SourceControlOperationRunner sourceControlOperationRunner; | ||
private final Store store; | ||
private static final Logger LOG = LoggerFactory.getLogger( | ||
SourceControlMetadataRefreshService.class); | ||
|
||
public SourceControlMetadataRefreshService(AccessEnforcer accessEnforcer, | ||
AuthenticationContext authenticationContext, NamespaceAdmin namespaceAdmin, | ||
TransactionRunner transactionRunner, | ||
SourceControlOperationRunner sourceControlOperationRunner, Store store) { | ||
this.accessEnforcer = accessEnforcer; | ||
this.authenticationContext = authenticationContext; | ||
this.namespaceAdmin = namespaceAdmin; | ||
this.transactionRunner = transactionRunner; | ||
this.sourceControlOperationRunner = sourceControlOperationRunner; | ||
this.store = store; | ||
} | ||
|
||
private void setNamespaces(List<NamespaceId> namespaceIds) { | ||
this.namespaces = namespaceIds; | ||
} | ||
|
||
private RepositorySourceControlMetadataStore getRepoSourceControlMetadataStore( | ||
StructuredTableContext context) { | ||
return RepositorySourceControlMetadataStore.create(context); | ||
} | ||
|
||
@Override | ||
protected final ScheduledExecutorService executor() { | ||
executor = Executors.newSingleThreadScheduledExecutor( | ||
Threads.createDaemonThreadFactory("source-control-metadata-refresh-service")); | ||
return executor; | ||
} | ||
|
||
private RepositoryTable getRepositoryTable(StructuredTableContext context) | ||
throws TableNotFoundException { | ||
return new RepositoryTable(context); | ||
} | ||
|
||
|
||
public RepositoryMeta getRepositoryMeta(NamespaceId namespace) | ||
throws RepositoryNotFoundException { | ||
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), StandardPermission.GET); | ||
|
||
return TransactionRunners.run(transactionRunner, context -> { | ||
RepositoryTable table = getRepositoryTable(context); | ||
RepositoryMeta repoMeta = table.get(namespace); | ||
if (repoMeta == null) { | ||
throw new RepositoryNotFoundException(namespace); | ||
} | ||
|
||
return repoMeta; | ||
}, RepositoryNotFoundException.class); | ||
} | ||
|
||
@Override | ||
protected void runOneIteration() { | ||
try { | ||
if (namespaces == null) { | ||
setNamespaces(namespaceAdmin.list().stream() | ||
.map(meta -> meta.getNamespaceId()).collect(Collectors.toList())); | ||
|
||
} | ||
|
||
for (NamespaceId namespaceId : namespaces) { | ||
RepositoryConfig repoConfig = getRepositoryMeta(namespaceId).getConfig(); | ||
|
||
accessEnforcer.enforce(namespaceId, authenticationContext.getPrincipal(), | ||
NamespacePermission.READ_REPOSITORY); | ||
RepositoryAppsResponse repositoryAppsResponse = sourceControlOperationRunner.list( | ||
new NamespaceRepository(namespaceId, repoConfig)); | ||
|
||
// Cleaning up the repo source control metadata table | ||
HashSet<String> repoFileNames = new HashSet<>(); | ||
for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { | ||
repoFileNames.add(repoApp.getName()); | ||
} | ||
TransactionRunners.run(transactionRunner, context -> { | ||
getRepoSourceControlMetadataStore(context).cleanupRepoSourceControlMeta( | ||
namespaceId.getNamespace(), repoFileNames); | ||
}); | ||
// Updating the namespace and repo source control metadata table | ||
for (RepositoryApp repoApp : repositoryAppsResponse.getApps()) { | ||
store.updateSourceControlMeta( | ||
new ApplicationReference(namespaceId.getNamespace(), repoApp.getName()), | ||
repoApp.getFileHash(), | ||
lastRefreshTimes.get(namespaceId)); | ||
} | ||
lastRefreshTimes.put(namespaceId, System.currentTimeMillis()); | ||
} | ||
|
||
setNamespaces(null); | ||
} catch (RepositoryNotFoundException e) { | ||
// LOG | ||
} catch (NotFoundException e) { | ||
// LOG | ||
} catch (Exception e) { | ||
// LOG | ||
} | ||
|
||
} | ||
|
||
public void runRefreshService(boolean forced, NamespaceId namespace) throws Exception { | ||
if (!forced && (System.currentTimeMillis() - lastRefreshTimes.get(namespace) | ||
< 10 * 60 * 1000)) { | ||
return; | ||
} | ||
List<NamespaceId> namespaceList = new ArrayList<>(); | ||
namespaceList.add(namespace); | ||
setNamespaces(namespaceList); | ||
runOneIteration(); | ||
} | ||
|
||
@Override | ||
protected void startUp() throws Exception { | ||
this.namespaces = namespaceAdmin.list().stream() | ||
.map(meta -> meta.getNamespaceId()).collect(Collectors.toList()); | ||
namespaces.forEach(n -> lastRefreshTimes.put(n, 0L)); | ||
} | ||
|
||
@Override | ||
protected Scheduler scheduler() { | ||
return Scheduler.newFixedRateSchedule(1, 60 * 60, TimeUnit.SECONDS); | ||
} | ||
|
||
@Override | ||
protected void shutDown() throws Exception { | ||
if (executor != null) { | ||
executor.shutdownNow(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.