Skip to content

Commit b74276c

Browse files
author
Harmandeep Singh
committed
addressing the review comments
1 parent d56b982 commit b74276c

File tree

1 file changed

+37
-40
lines changed

1 file changed

+37
-40
lines changed

rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -55,80 +55,77 @@ public ObjectPoolPartition(ObjectPool<T> pool, PoolConfig config,
5555
this.totalCount = 0;
5656
this.log = new CustomLogger(name, host);
5757
this.takeSemaphore = new Semaphore(config.getMaxSize(), true);
58-
for (int i = 0; i < config.getMinSize(); i++) {
59-
try {
60-
T object = objectFactory.create(host, socketTimeout, connectTimeout);
61-
objectQueue.add(new Poolable<>(object, pool, host));
62-
totalCount++;
63-
}
64-
catch (Exception e) {
65-
log.warn("Error in initializing pool, error: " + e);
58+
try {
59+
for (int i = 0; i < config.getMinSize(); i++) {
60+
T object = objectFactory.create(host, socketTimeout, connectTimeout);
61+
objectQueue.add(new Poolable<>(object, pool, host));
62+
totalCount++;
6663
}
6764
}
65+
catch (Exception e) {
66+
// skipping logging the exception as factories are already logging.
67+
}
6868
}
6969

7070
public void returnObject(Poolable<T> object)
7171
{
72-
takeSemaphore.release();
73-
if (!objectFactory.validate(object.getObject())) {
74-
log.debug(String.format("Invalid object...removing: %s ", object));
75-
decreaseObject(object);
76-
return;
77-
}
72+
try {
73+
if (!objectFactory.validate(object.getObject())) {
74+
log.debug(String.format("Invalid object...removing: %s ", object));
75+
decreaseObject(object);
76+
return;
77+
}
7878

79-
log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size()));
80-
if (!objectQueue.offer(object)) {
81-
log.warn("Created more objects than configured. Created=" + totalCount + " QueueSize=" + objectQueue.size());
82-
decreaseObject(object);
79+
log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size()));
80+
if (!objectQueue.offer(object)) {
81+
log.warn("Created more objects than configured. Created=" + totalCount + " QueueSize=" + objectQueue.size());
82+
decreaseObject(object);
83+
}
84+
}
85+
finally {
86+
takeSemaphore.release();
8387
}
8488
}
8589

8690
public Poolable<T> getObject(boolean blocking)
8791
{
88-
try {
89-
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
90-
}
91-
catch (InterruptedException e) {
92-
e.printStackTrace();
93-
throw new RuntimeException("Cannot get a free object from the pool");
94-
}
95-
9692
Poolable<T> object;
97-
if (blocking) {
98-
try {
93+
try {
94+
if (blocking) {
95+
takeSemaphore.acquire();
9996
object = objectQueue.take();
10097
}
101-
catch (InterruptedException e) {
102-
throw new RuntimeException(e); // will never happen
98+
else {
99+
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
100+
object = tryGetObject();
103101
}
102+
object.setLastAccessTs(System.currentTimeMillis());
104103
}
105-
else
106-
{
107-
object = tryGetObject(objectQueue.poll());
104+
catch (Exception e) {
105+
takeSemaphore.release();
106+
throw new RuntimeException("Cannot get a free object from the pool", e);
108107
}
109-
object.setLastAccessTs(System.currentTimeMillis());
110108
return object;
111109
}
112110

113-
private Poolable<T> tryGetObject(Poolable<T> poolable)
111+
private Poolable<T> tryGetObject() throws Exception
114112
{
113+
Poolable<T> poolable = objectQueue.poll();
115114
if (poolable == null)
116115
{
117116
try {
118117
T object = objectFactory.create(host, socketTimeout, connectTimeout);
119118
poolable = new Poolable<>(object, pool, host);
120119
totalCount++;
121-
log.debug(String.format("Increased pool size by %d, to new size: %d, current queue size: %d, delta: %d",
122-
1, totalCount, objectQueue.size(), 1));
120+
log.debug(String.format("Added a connection, new totalCount: %d, queueSize: %d", totalCount, objectQueue.size()));
123121
}
124122
catch (Exception e) {
125-
log.warn(String.format("Unable to increase pool size. Pool state: totalCount=%d queueSize=%d delta=%d", totalCount, objectQueue.size(), 1), e);
126-
// objectToReturn is not on the queue hence untracked, clean it up before forwarding exception
123+
log.warn(String.format("Unable create a connection. Pool state: totalCount=%d queueSize=%d", totalCount, objectQueue.size()), e);
127124
if (poolable != null) {
128125
objectFactory.destroy(poolable.getObject());
129126
poolable.destroy();
130127
}
131-
throw new RuntimeException(e);
128+
throw e;
132129
}
133130
}
134131
return poolable;

0 commit comments

Comments
 (0)