Skip to content

Commit

Permalink
RANGER-4302: RangerCache updated to support value loader to use calle…
Browse files Browse the repository at this point in the history
…rs context in Ranger admin - #2
  • Loading branch information
mneethiraj committed Jan 3, 2024
1 parent b009908 commit e4da912
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,28 +93,7 @@ protected RangerCache(String name, ValueLoader<K, V> loader, int loaderThreadsCo
public long getValueRefreshLoadTimeoutMs() { return valueRefreshLoadTimeoutMs; }

public V get(K key) {
final long startTime = System.currentTimeMillis();
final CachedValue value = cache.computeIfAbsent(key, f -> new CachedValue(key));
final long timeoutMs = value.isInitialized() ? valueRefreshLoadTimeoutMs : valueInitLoadTimeoutMs;
final V ret;

if (timeoutMs >= 0) {
final long timeTaken = System.currentTimeMillis() - startTime;

if (timeoutMs <= timeTaken) {
ret = value.getCurrentValue();

if (LOG.isDebugEnabled()) {
LOG.debug("key={}: cache-lookup={}ms took longer than timeout={}ms. Using current value {}", key, timeTaken, timeoutMs, ret);
}
} else {
ret = value.getValue(timeoutMs - timeTaken);
}
} else {
ret = value.getValue();
}

return ret;
return get(key, null);
}

public Set<K> getKeys() {
Expand Down Expand Up @@ -147,6 +126,31 @@ public boolean isLoaded(K key) {
return value != null;
}

protected V get(K key, Object context) {
final long startTime = System.currentTimeMillis();
final CachedValue value = cache.computeIfAbsent(key, f -> new CachedValue(key));
final long timeoutMs = value.isInitialized() ? valueRefreshLoadTimeoutMs : valueInitLoadTimeoutMs;
final V ret;

if (timeoutMs >= 0) {
final long timeTaken = System.currentTimeMillis() - startTime;

if (timeoutMs <= timeTaken) {
ret = value.getCurrentValue();

if (LOG.isDebugEnabled()) {
LOG.debug("key={}: cache-lookup={}ms took longer than timeout={}ms. Using current value {}", key, timeTaken, timeoutMs, ret);
}
} else {
ret = value.getValue(timeoutMs - timeTaken);
}
} else {
ret = value.getValue(context);
}

return ret;
}

public static class RefreshableValue<V> {
private final V value;
private long nextRefreshTimeMs = -1;
Expand All @@ -165,7 +169,7 @@ public boolean needsRefresh() {
}

public static abstract class ValueLoader<K, V> {
public abstract RefreshableValue<V> load(K key, RefreshableValue<V> currentValue) throws Exception;
public abstract RefreshableValue<V> load(K key, RefreshableValue<V> currentValue, Object context) throws Exception;
}

private class CachedValue {
Expand All @@ -185,17 +189,17 @@ private CachedValue(K key) {

public K getKey() { return key; }

public V getValue() {
refreshIfNeeded();
public V getValue(Object context) {
refreshIfNeeded(context);

return getCurrentValue();
}

public V getValue(long timeoutMs) {
public V getValue(long timeoutMs, Object context) {
if (timeoutMs < 0) {
refreshIfNeeded();
refreshIfNeeded(context);
} else {
refreshIfNeeded(timeoutMs);
refreshIfNeeded(timeoutMs, context);
}

return getCurrentValue();
Expand All @@ -217,7 +221,7 @@ public boolean isInitialized() {
return value != null;
}

private void refreshIfNeeded() {
private void refreshIfNeeded(Object context) {
if (needsRefresh()) {
try (AutoClosableLock ignored = new AutoClosableLock(lock)) {
if (needsRefresh()) {
Expand All @@ -228,7 +232,7 @@ private void refreshIfNeeded() {
LOG.debug("refreshIfNeeded(key={}): using caller thread", key);
}

refreshValue();
refreshValue(context);
} else { // wait for the refresher to complete
try {
future.get();
Expand All @@ -243,7 +247,7 @@ private void refreshIfNeeded() {
}
}

private void refreshIfNeeded(long timeoutMs) {
private void refreshIfNeeded(long timeoutMs, Object context) {
if (needsRefresh()) {
long startTime = System.currentTimeMillis();

Expand All @@ -253,7 +257,7 @@ private void refreshIfNeeded(long timeoutMs) {
Future<?> future = this.refresher;

if (future == null) {
future = this.refresher = loaderThreadPool.submit(this::refreshValue);
future = this.refresher = loaderThreadPool.submit(new RefreshWithContext(context));

if (LOG.isDebugEnabled()) {
LOG.debug("refresher scheduled for key {}", key);
Expand Down Expand Up @@ -287,7 +291,7 @@ private void refreshIfNeeded(long timeoutMs) {
}
}

private Boolean refreshValue() {
private Boolean refreshValue(Object context) {
long startTime = System.currentTimeMillis();
boolean isSuccess = false;
RefreshableValue<V> newValue = null;
Expand All @@ -296,7 +300,7 @@ private Boolean refreshValue() {
ValueLoader<K, V> loader = RangerCache.this.loader;

if (loader != null) {
newValue = loader.load(key, value);
newValue = loader.load(key, value, context);
isSuccess = true;
}
} catch (KeyNotFoundException excp) {
Expand All @@ -319,7 +323,7 @@ private Boolean refreshValue() {
if (!isRemoved) {
ScheduledExecutorService scheduledExecutor = ((ScheduledExecutorService) loaderThreadPool);

scheduledExecutor.schedule(this::refreshValue, valueValidityPeriodMs, TimeUnit.MILLISECONDS);
scheduledExecutor.schedule(new RefreshWithContext(context), valueValidityPeriodMs, TimeUnit.MILLISECONDS);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("key {} was removed. Not scheduling next refresh ", key);
Expand All @@ -338,6 +342,19 @@ private void setValue(RefreshableValue<V> value) {
this.value.setNextRefreshTimeMs(System.currentTimeMillis() + valueValidityPeriodMs);
}
}

private class RefreshWithContext implements Callable<Boolean> {
private final Object context;

public RefreshWithContext(Object context) {
this.context = context;
}

@Override
public Boolean call() {
return refreshValue(context);
}
}
}

private ThreadFactory createThreadFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ public UserGroupLoader() {
}

@Override
public RefreshableValue<List<String>> load(String userName, RefreshableValue<List<String>> currVal) throws Exception {
public RefreshableValue<List<String>> load(String userName, RefreshableValue<List<String>> currVal, Object context) throws Exception {
long startTimeMs = System.currentTimeMillis();

UserStats userStats = stats.get(userName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.ranger.common.db.RangerTransactionSynchronizationAdapter;
import org.apache.ranger.db.*;
import org.apache.ranger.entity.*;
import org.apache.ranger.plugin.model.RangerGds;
import org.apache.ranger.plugin.model.RangerGds.*;
import org.apache.ranger.plugin.model.RangerPolicy;
import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyItem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.ranger.authorization.hadoop.config.RangerAdminConfig;
import org.apache.ranger.plugin.util.RangerCache;
import org.apache.ranger.security.context.RangerContextHolder;
import org.apache.ranger.security.context.RangerSecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;
Expand Down Expand Up @@ -48,6 +50,11 @@ protected RangerAdminCache(String name, RangerDBValueLoader<K, V> loader, int lo
super(name, loader, loaderThreadsCount, refreshMode, valueValidityPeriodMs, valueInitLoadTimeoutMs, valueRefreshLoadTimeoutMs);
}

@Override
public V get(K key) {
return super.get(key, RangerContextHolder.getSecurityContext());
}

private static int getLoaderThreadPoolSize(String cacheName) {
return RangerAdminConfig.getInstance().getInt(PROP_PREFIX + cacheName + PROP_LOADER_THREAD_POOL_SIZE, DEFAULT_ADMIN_CACHE_LOADER_THREADS_COUNT);
}
Expand All @@ -70,16 +77,28 @@ public RangerDBValueLoader(PlatformTransactionManager txManager) {
}

@Override
final public RefreshableValue<V> load(K key, RefreshableValue<V> currentValue) throws Exception {
final public RefreshableValue<V> load(K key, RefreshableValue<V> currentValue, Object context) throws Exception {
Exception[] ex = new Exception[1];

RefreshableValue<V> ret = txTemplate.execute(status -> {
RangerSecurityContext currentContext = null;

try {
if (context instanceof RangerSecurityContext) {
currentContext = RangerContextHolder.getSecurityContext();

RangerContextHolder.setSecurityContext((RangerSecurityContext) context);
}

return dbLoad(key, currentValue);
} catch (Exception excp) {
LOG.error("RangerDBLoaderCache.load(): failed to load for key={}", key, excp);

ex[0] = excp;
} finally {
if (context instanceof RangerSecurityContext) {
RangerContextHolder.setSecurityContext(currentContext);
}
}

return null;
Expand Down

0 comments on commit e4da912

Please sign in to comment.