diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java index 944d3c2d..3562b04d 100755 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java @@ -41,7 +41,7 @@ public class ObjectPoolPartition private final int socketTimeout; private final int connectTimeout; private final Semaphore takeSemaphore; - private final AtomicInteger totalCount; + private final AtomicInteger aliveObjectCount; public ObjectPoolPartition(ObjectPool pool, PoolConfig config, ObjectFactory objectFactory, BlockingQueue> queue, String host, String name) @@ -53,14 +53,14 @@ public ObjectPoolPartition(ObjectPool pool, PoolConfig config, this.host = host; this.socketTimeout = config.getSocketTimeoutMilliseconds(); this.connectTimeout = config.getConnectTimeoutMilliseconds(); - this.totalCount = new AtomicInteger(); + this.aliveObjectCount = new AtomicInteger(); this.log = new CustomLogger(name, host); this.takeSemaphore = new Semaphore(config.getMaxSize(), true); try { for (int i = 0; i < config.getMinSize(); i++) { T object = objectFactory.create(host, socketTimeout, connectTimeout); objectQueue.add(new Poolable<>(object, pool, host)); - totalCount.incrementAndGet(); + aliveObjectCount.incrementAndGet(); } } catch (Exception e) { @@ -79,8 +79,10 @@ public void returnObject(Poolable object) log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size())); if (!objectQueue.offer(object)) { - log.warn("Created more objects than configured. Created=" + totalCount + " QueueSize=" + objectQueue.size()); + String errorLog = "Created more objects than configured. Created=" + aliveObjectCount + " QueueSize=" + objectQueue.size(); + log.warn(errorLog); decreaseObject(object); + throw new RuntimeException(errorLog); } } finally { @@ -129,11 +131,13 @@ private Poolable tryGetObject() throws Exception try { T object = objectFactory.create(host, socketTimeout, connectTimeout); poolable = new Poolable<>(object, pool, host); - totalCount.incrementAndGet(); - log.debug(String.format("Added a connection, Pool state: totalCount: %s, queueSize: %d", totalCount, objectQueue.size())); + aliveObjectCount.incrementAndGet(); + log.debug(String.format("Added a connection, Pool state: totalCount: %s, queueSize: %d", aliveObjectCount, + objectQueue.size())); } catch (Exception e) { - log.warn(String.format("Unable create a connection. Pool state: totalCount=%s queueSize=%d", totalCount, objectQueue.size()), e); + log.warn(String.format("Unable create a connection. Pool state: totalCount=%s queueSize=%d", aliveObjectCount, + objectQueue.size()), e); if (poolable != null) { objectFactory.destroy(poolable.getObject()); poolable.destroy(); @@ -146,13 +150,13 @@ private Poolable tryGetObject() throws Exception public int getAliveObjectCount() { - return this.config.getMaxSize() - takeSemaphore.availablePermits(); + return aliveObjectCount.get(); } // set the scavenge interval carefully public void scavenge() throws InterruptedException { - int delta = this.totalCount.get() - config.getMinSize(); + int delta = this.aliveObjectCount.get() - config.getMinSize(); if (delta <= 0) { log.debug("Scavenge for delta <= 0, Skipping !!!"); return; @@ -190,7 +194,7 @@ public void scavenge() throws InterruptedException public synchronized int shutdown() { int removed = 0; - while (this.totalCount.get() > 0) { + while (this.aliveObjectCount.get() > 0) { Poolable obj = objectQueue.poll(); if (obj != null) { decreaseObject(obj);