Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Harmandeep Singh committed Aug 4, 2020
1 parent 48388c8 commit 6f3d482
Showing 1 changed file with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ObjectPoolPartition<T>
private final int socketTimeout;
private final int connectTimeout;
private final Semaphore takeSemaphore;
private final AtomicInteger totalCount;
private final AtomicInteger aliveObjectCount;

public ObjectPoolPartition(ObjectPool<T> pool, PoolConfig config,
ObjectFactory<T> objectFactory, BlockingQueue<Poolable<T>> queue, String host, String name)
Expand All @@ -53,14 +53,14 @@ public ObjectPoolPartition(ObjectPool<T> pool, PoolConfig config,
this.host = host;
this.socketTimeout = config.getSocketTimeoutMilliseconds();
this.connectTimeout = config.getConnectTimeoutMilliseconds();
this.totalCount = new AtomicInteger();
this.aliveObjectCount = new AtomicInteger();
this.log = new CustomLogger(name, host);
this.takeSemaphore = new Semaphore(config.getMaxSize(), true);
try {
for (int i = 0; i < config.getMinSize(); i++) {
T object = objectFactory.create(host, socketTimeout, connectTimeout);
objectQueue.add(new Poolable<>(object, pool, host));
totalCount.incrementAndGet();
aliveObjectCount.incrementAndGet();
}
}
catch (Exception e) {
Expand All @@ -79,8 +79,10 @@ public void returnObject(Poolable<T> object)

log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size()));
if (!objectQueue.offer(object)) {
log.warn("Created more objects than configured. Created=" + totalCount + " QueueSize=" + objectQueue.size());
String errorLog = "Created more objects than configured. Created=" + aliveObjectCount + " QueueSize=" + objectQueue.size();
log.warn(errorLog);
decreaseObject(object);
throw new RuntimeException(errorLog);
}
}
finally {
Expand Down Expand Up @@ -129,11 +131,13 @@ private Poolable<T> tryGetObject() throws Exception
try {
T object = objectFactory.create(host, socketTimeout, connectTimeout);
poolable = new Poolable<>(object, pool, host);
totalCount.incrementAndGet();
log.debug(String.format("Added a connection, Pool state: totalCount: %s, queueSize: %d", totalCount, objectQueue.size()));
aliveObjectCount.incrementAndGet();
log.debug(String.format("Added a connection, Pool state: totalCount: %s, queueSize: %d", aliveObjectCount,
objectQueue.size()));
}
catch (Exception e) {
log.warn(String.format("Unable create a connection. Pool state: totalCount=%s queueSize=%d", totalCount, objectQueue.size()), e);
log.warn(String.format("Unable create a connection. Pool state: totalCount=%s queueSize=%d", aliveObjectCount,
objectQueue.size()), e);
if (poolable != null) {
objectFactory.destroy(poolable.getObject());
poolable.destroy();
Expand All @@ -146,13 +150,13 @@ private Poolable<T> tryGetObject() throws Exception

public int getAliveObjectCount()
{
return this.config.getMaxSize() - takeSemaphore.availablePermits();
return aliveObjectCount.get();
}

// set the scavenge interval carefully
public void scavenge() throws InterruptedException
{
int delta = this.totalCount.get() - config.getMinSize();
int delta = this.aliveObjectCount.get() - config.getMinSize();
if (delta <= 0) {
log.debug("Scavenge for delta <= 0, Skipping !!!");
return;
Expand Down Expand Up @@ -190,7 +194,7 @@ public void scavenge() throws InterruptedException
public synchronized int shutdown()
{
int removed = 0;
while (this.totalCount.get() > 0) {
while (this.aliveObjectCount.get() > 0) {
Poolable<T> obj = objectQueue.poll();
if (obj != null) {
decreaseObject(obj);
Expand Down

0 comments on commit 6f3d482

Please sign in to comment.