Skip to content

Commit bf78376

Browse files
author
Harmandeep Singh
committed
changing totalCount to AtomicInteger data type
1 parent b74276c commit bf78376

File tree

1 file changed

+13
-12
lines changed

1 file changed

+13
-12
lines changed

rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.Semaphore;
2424
import java.util.concurrent.ThreadLocalRandom;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicInteger;
2627

2728
import static com.google.common.base.Preconditions.checkState;
2829

@@ -36,11 +37,11 @@ public class ObjectPoolPartition<T>
3637
private final PoolConfig config;
3738
private final BlockingQueue<Poolable<T>> objectQueue;
3839
private final ObjectFactory<T> objectFactory;
39-
private int totalCount;
4040
private final String host;
4141
private final int socketTimeout;
4242
private final int connectTimeout;
4343
private final Semaphore takeSemaphore;
44+
private final AtomicInteger totalCount;
4445

4546
public ObjectPoolPartition(ObjectPool<T> pool, PoolConfig config,
4647
ObjectFactory<T> objectFactory, BlockingQueue<Poolable<T>> queue, String host, String name)
@@ -52,14 +53,14 @@ public ObjectPoolPartition(ObjectPool<T> pool, PoolConfig config,
5253
this.host = host;
5354
this.socketTimeout = config.getSocketTimeoutMilliseconds();
5455
this.connectTimeout = config.getConnectTimeoutMilliseconds();
55-
this.totalCount = 0;
56+
this.totalCount = new AtomicInteger();
5657
this.log = new CustomLogger(name, host);
5758
this.takeSemaphore = new Semaphore(config.getMaxSize(), true);
5859
try {
5960
for (int i = 0; i < config.getMinSize(); i++) {
6061
T object = objectFactory.create(host, socketTimeout, connectTimeout);
6162
objectQueue.add(new Poolable<>(object, pool, host));
62-
totalCount++;
63+
totalCount.incrementAndGet();
6364
}
6465
}
6566
catch (Exception e) {
@@ -116,11 +117,11 @@ private Poolable<T> tryGetObject() throws Exception
116117
try {
117118
T object = objectFactory.create(host, socketTimeout, connectTimeout);
118119
poolable = new Poolable<>(object, pool, host);
119-
totalCount++;
120-
log.debug(String.format("Added a connection, new totalCount: %d, queueSize: %d", totalCount, objectQueue.size()));
120+
totalCount.incrementAndGet();
121+
log.debug(String.format("Added a connection, Pool state: totalCount: %s, queueSize: %d", totalCount, objectQueue.size()));
121122
}
122123
catch (Exception e) {
123-
log.warn(String.format("Unable create a connection. Pool state: totalCount=%d queueSize=%d", totalCount, objectQueue.size()), e);
124+
log.warn(String.format("Unable create a connection. Pool state: totalCount=%s queueSize=%d", totalCount, objectQueue.size()), e);
124125
if (poolable != null) {
125126
objectFactory.destroy(poolable.getObject());
126127
poolable.destroy();
@@ -144,20 +145,20 @@ public boolean decreaseObject(Poolable<T> obj)
144145
return true;
145146
}
146147

147-
private synchronized void objectRemoved()
148+
private void objectRemoved()
148149
{
149-
totalCount--;
150+
totalCount.decrementAndGet();
150151
}
151152

152-
public synchronized int getTotalCount()
153+
public int getTotalCount()
153154
{
154-
return totalCount;
155+
return totalCount.get();
155156
}
156157

157158
// set the scavenge interval carefully
158159
public void scavenge() throws InterruptedException
159160
{
160-
int delta = this.totalCount - config.getMinSize();
161+
int delta = this.totalCount.get() - config.getMinSize();
161162
if (delta <= 0) {
162163
log.debug("Scavenge for delta <= 0, Skipping !!!");
163164
return;
@@ -195,7 +196,7 @@ public void scavenge() throws InterruptedException
195196
public synchronized int shutdown()
196197
{
197198
int removed = 0;
198-
while (this.totalCount > 0) {
199+
while (this.totalCount.get() > 0) {
199200
Poolable<T> obj = objectQueue.poll();
200201
if (obj != null) {
201202
decreaseObject(obj);

0 commit comments

Comments
 (0)