Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Harmandeep Singh committed Jul 28, 2020
1 parent bf78376 commit 48388c8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 35 deletions.
10 changes: 4 additions & 6 deletions rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import com.google.common.util.concurrent.AbstractScheduledService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static java.lang.Thread.currentThread;
Expand Down Expand Up @@ -78,7 +76,7 @@ public Poolable<T> borrowObject(String host)
}
log.debug(this.name + " : Borrowing object for partition: " + host);
for (int i = 0; i < 3; i++) { // try at most three times
Poolable<T> result = getObject(false, host);
Poolable<T> result = getObject(host);
if (factory.validate(result.getObject())) {
return result;
}
Expand All @@ -89,10 +87,10 @@ public Poolable<T> borrowObject(String host)
throw new RuntimeException("Cannot find a valid object");
}

private Poolable<T> getObject(boolean blocking, String host)
private Poolable<T> getObject(String host)
{
ObjectPoolPartition<T> subPool = this.hostToPoolMap.get(host);
return subPool.getObject(blocking);
return subPool.getObject();
}

public void returnObject(Poolable<T> obj)
Expand All @@ -105,7 +103,7 @@ public int getSize()
{
int size = 0;
for (ObjectPoolPartition<T> subPool : hostToPoolMap.values()) {
size += subPool.getTotalCount();
size += subPool.getAliveObjectCount();
}
return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,31 @@ public void returnObject(Poolable<T> object)
}
}

public Poolable<T> getObject(boolean blocking)
public boolean decreaseObject(Poolable<T> obj)
{
checkState(obj.getHost() != null, "Invalid object");
checkState(obj.getHost().equals(this.host),
"Call to free object of wrong partition, current partition=%s requested partition = %s",
this.host, obj.getHost());
log.debug("Decreasing pool size object: " + obj);
objectFactory.destroy(obj.getObject());
obj.destroy();
return true;
}

public Poolable<T> getObject()
{
Poolable<T> object;
try {
if (blocking) {
takeSemaphore.acquire();
object = objectQueue.take();
}
else {
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}

try {
object = tryGetObject();
}
object.setLastAccessTs(System.currentTimeMillis());
object.setLastAccessTs(System.currentTimeMillis());
}
catch (Exception e) {
takeSemaphore.release();
Expand Down Expand Up @@ -132,27 +144,9 @@ private Poolable<T> tryGetObject() throws Exception
return poolable;
}

public boolean decreaseObject(Poolable<T> obj)
{
checkState(obj.getHost() != null, "Invalid object");
checkState(obj.getHost().equals(this.host),
"Call to free object of wrong partition, current partition=%s requested partition = %s",
this.host, obj.getHost());
objectRemoved();
log.debug("Decreasing pool size object: " + obj);
objectFactory.destroy(obj.getObject());
obj.destroy();
return true;
}

private void objectRemoved()
{
totalCount.decrementAndGet();
}

public int getTotalCount()
public int getAliveObjectCount()
{
return totalCount.get();
return this.config.getMaxSize() - takeSemaphore.availablePermits();
}

// set the scavenge interval carefully
Expand Down

0 comments on commit 48388c8

Please sign in to comment.