Skip to content

Commit 8d976db

Browse files
author
Harmandeep Singh
committed
Use semaphore to allow parallel creation of pool connections.
1 parent bf60c93 commit 8d976db

File tree

5 files changed

+90
-110
lines changed

5 files changed

+90
-110
lines changed

rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
*/
2222
public interface ObjectFactory<T>
2323
{
24-
T create(String host, int socketTimeout, int connectTimeout);
24+
T create(String host, int socketTimeout, int connectTimeout)
25+
throws Exception;
2526

2627
void destroy(T t);
2728

rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPool.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,11 @@ public Poolable<T> borrowObject(String host)
7676
}
7777
log.debug(this.name + " : Borrowing object for partition: " + host);
7878
for (int i = 0; i < 3; i++) { // try at most three times
79-
Poolable<T> result = getObject(false, host);
80-
if (factory.validate(result.getObject())) {
79+
Poolable<T> result = getObject(host);
80+
if (result == null) {
81+
continue;
82+
}
83+
else if (factory.validate(result.getObject())) {
8184
return result;
8285
}
8386
else {
@@ -87,10 +90,10 @@ public Poolable<T> borrowObject(String host)
8790
throw new RuntimeException("Cannot find a valid object");
8891
}
8992

90-
private Poolable<T> getObject(boolean blocking, String host)
93+
private Poolable<T> getObject(String host)
9194
{
9295
ObjectPoolPartition<T> subPool = this.hostToPoolMap.get(host);
93-
return subPool.getObject(blocking);
96+
return subPool.getObject();
9497
}
9598

9699
public void returnObject(Poolable<T> obj)
@@ -103,7 +106,7 @@ public int getSize()
103106
{
104107
int size = 0;
105108
for (ObjectPoolPartition<T> subPool : hostToPoolMap.values()) {
106-
size += subPool.getTotalCount();
109+
size += subPool.getAliveObjectCount();
107110
}
108111
return size;
109112
}

rubix-spi/src/main/java/com/qubole/rubix/spi/fop/ObjectPoolPartition.java

Lines changed: 74 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import org.apache.commons.logging.LogFactory;
2121

2222
import java.util.concurrent.BlockingQueue;
23+
import java.util.concurrent.Semaphore;
2324
import java.util.concurrent.ThreadLocalRandom;
2425
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicInteger;
2527

2628
import static com.google.common.base.Preconditions.checkState;
2729

@@ -35,10 +37,11 @@ public class ObjectPoolPartition<T>
3537
private final PoolConfig config;
3638
private final BlockingQueue<Poolable<T>> objectQueue;
3739
private final ObjectFactory<T> objectFactory;
38-
private int totalCount;
3940
private final String host;
4041
private final int socketTimeout;
4142
private final int connectTimeout;
43+
private final Semaphore takeSemaphore;
44+
private final AtomicInteger aliveObjectCount;
4245

4346
public ObjectPoolPartition(ObjectPool<T> pool, PoolConfig config,
4447
ObjectFactory<T> objectFactory, BlockingQueue<Poolable<T>> queue, String host, String name)
@@ -50,113 +53,41 @@ public ObjectPoolPartition(ObjectPool<T> pool, PoolConfig config,
5053
this.host = host;
5154
this.socketTimeout = config.getSocketTimeoutMilliseconds();
5255
this.connectTimeout = config.getConnectTimeoutMilliseconds();
53-
this.totalCount = 0;
56+
this.aliveObjectCount = new AtomicInteger();
5457
this.log = new CustomLogger(name, host);
55-
for (int i = 0; i < config.getMinSize(); i++) {
56-
T object = objectFactory.create(host, socketTimeout, connectTimeout);
57-
if (object != null) {
58-
objectQueue.add(new Poolable<>(object, pool, host));
59-
totalCount++;
60-
}
61-
}
62-
}
63-
64-
public void returnObject(Poolable<T> object)
65-
{
66-
if (!objectFactory.validate(object.getObject())) {
67-
log.debug(String.format("Invalid object...removing: %s ", object));
68-
decreaseObject(object);
69-
return;
70-
}
71-
72-
log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size()));
73-
if (!objectQueue.offer(object)) {
74-
log.warn("Created more objects than configured. Created=" + totalCount + " QueueSize=" + objectQueue.size());
75-
decreaseObject(object);
76-
}
77-
}
78-
79-
public Poolable<T> getObject(boolean blocking)
80-
{
81-
if (objectQueue.size() == 0) {
82-
// increase objects and return one, it will return null if pool reaches max size or if object creation fails
83-
Poolable<T> object = increaseObjects(this.config.getDelta(), true);
84-
85-
if (object != null) {
86-
return object;
87-
}
88-
89-
if (totalCount == 0) {
90-
// Could not create objects, this is mostly due to connection timeouts hence no point blocking as there is not other producer of sockets
91-
throw new RuntimeException("Could not add connections to pool");
92-
}
93-
// else wait for a connection to get free
94-
}
95-
96-
Poolable<T> freeObject;
58+
this.takeSemaphore = new Semaphore(config.getMaxSize(), true);
9759
try {
98-
if (blocking) {
99-
freeObject = objectQueue.take();
100-
}
101-
else {
102-
freeObject = objectQueue.poll(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
103-
if (freeObject == null) {
104-
throw new RuntimeException("Cannot get a free object from the pool");
105-
}
60+
for (int i = 0; i < config.getMinSize(); i++) {
61+
T object = objectFactory.create(host, socketTimeout, connectTimeout);
62+
objectQueue.add(new Poolable<>(object, pool, host));
63+
aliveObjectCount.incrementAndGet();
10664
}
10765
}
108-
catch (InterruptedException e) {
109-
throw new RuntimeException(e); // will never happen
66+
catch (Exception e) {
67+
// skipping logging the exception as factories are already logging.
11068
}
111-
112-
freeObject.setLastAccessTs(System.currentTimeMillis());
113-
return freeObject;
11469
}
11570

116-
private Poolable<T> increaseObjects(int delta, boolean returnObject)
71+
public void returnObject(Poolable<T> object)
11772
{
118-
int oldCount = totalCount;
119-
if (delta + totalCount > config.getMaxSize()) {
120-
delta = config.getMaxSize() - totalCount;
121-
}
122-
123-
Poolable<T> objectToReturn = null;
12473
try {
125-
for (int i = 0; i < delta; i++) {
126-
T object = objectFactory.create(host, socketTimeout, connectTimeout);
127-
if (object != null) {
128-
// Do not put the first object on queue
129-
// it will be returned to the caller to ensure it's request is satisfied first if object is requested
130-
Poolable<T> poolable = new Poolable<>(object, pool, host);
131-
if (objectToReturn == null && returnObject) {
132-
objectToReturn = poolable;
133-
}
134-
else {
135-
objectQueue.put(poolable);
136-
}
137-
totalCount++;
138-
}
74+
if (!objectFactory.validate(object.getObject())) {
75+
log.debug(String.format("Invalid object...removing: %s ", object));
76+
decreaseObject(object);
77+
return;
13978
}
14079

141-
if (delta > 0 && (totalCount - oldCount) == 0) {
142-
log.warn(String.format("Could not increase pool size. Pool state: totalCount=%d queueSize=%d delta=%d", totalCount, objectQueue.size(), delta));
143-
}
144-
else {
145-
log.debug(String.format("Increased pool size by %d, to new size: %d, current queue size: %d, delta: %d",
146-
totalCount - oldCount, totalCount, objectQueue.size(), delta));
80+
log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size()));
81+
if (!objectQueue.offer(object)) {
82+
String errorLog = "Created more objects than configured. Created=" + aliveObjectCount + " QueueSize=" + objectQueue.size();
83+
log.warn(errorLog);
84+
decreaseObject(object);
85+
throw new RuntimeException(errorLog);
14786
}
14887
}
149-
catch (Exception e) {
150-
log.warn(String.format("Unable to increase pool size. Pool state: totalCount=%d queueSize=%d delta=%d", totalCount, objectQueue.size(), delta), e);
151-
// objectToReturn is not on the queue hence untracked, clean it up before forwarding exception
152-
if (objectToReturn != null) {
153-
objectFactory.destroy(objectToReturn.getObject());
154-
objectToReturn.destroy();
155-
}
156-
throw new RuntimeException(e);
88+
finally {
89+
takeSemaphore.release();
15790
}
158-
159-
return objectToReturn;
16091
}
16192

16293
public boolean decreaseObject(Poolable<T> obj)
@@ -165,27 +96,69 @@ public boolean decreaseObject(Poolable<T> obj)
16596
checkState(obj.getHost().equals(this.host),
16697
"Call to free object of wrong partition, current partition=%s requested partition = %s",
16798
this.host, obj.getHost());
168-
objectRemoved();
16999
log.debug("Decreasing pool size object: " + obj);
170100
objectFactory.destroy(obj.getObject());
101+
aliveObjectCount.decrementAndGet();
171102
obj.destroy();
172103
return true;
173104
}
174105

175-
private synchronized void objectRemoved()
106+
public Poolable<T> getObject()
107+
{
108+
Poolable<T> object;
109+
try {
110+
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
111+
}
112+
catch (InterruptedException e) {
113+
Thread.currentThread().interrupt();
114+
return null;
115+
}
116+
117+
try {
118+
object = tryGetObject();
119+
object.setLastAccessTs(System.currentTimeMillis());
120+
}
121+
catch (Exception e) {
122+
takeSemaphore.release();
123+
throw new RuntimeException("Cannot get a free object from the pool", e);
124+
}
125+
return object;
126+
}
127+
128+
private Poolable<T> tryGetObject() throws Exception
176129
{
177-
totalCount--;
130+
Poolable<T> poolable = objectQueue.poll();
131+
if (poolable == null)
132+
{
133+
try {
134+
T object = objectFactory.create(host, socketTimeout, connectTimeout);
135+
poolable = new Poolable<>(object, pool, host);
136+
aliveObjectCount.incrementAndGet();
137+
log.debug(String.format("Added a connection, Pool state: totalCount: %s, queueSize: %d", aliveObjectCount,
138+
objectQueue.size()));
139+
}
140+
catch (Exception e) {
141+
log.warn(String.format("Unable create a connection. Pool state: totalCount=%s queueSize=%d", aliveObjectCount,
142+
objectQueue.size()), e);
143+
if (poolable != null) {
144+
objectFactory.destroy(poolable.getObject());
145+
poolable.destroy();
146+
}
147+
throw e;
148+
}
149+
}
150+
return poolable;
178151
}
179152

180-
public synchronized int getTotalCount()
153+
public int getAliveObjectCount()
181154
{
182-
return totalCount;
155+
return aliveObjectCount.get();
183156
}
184157

185158
// set the scavenge interval carefully
186159
public void scavenge() throws InterruptedException
187160
{
188-
int delta = this.totalCount - config.getMinSize();
161+
int delta = this.aliveObjectCount.get() - config.getMinSize();
189162
if (delta <= 0) {
190163
log.debug("Scavenge for delta <= 0, Skipping !!!");
191164
return;
@@ -223,7 +196,7 @@ public void scavenge() throws InterruptedException
223196
public synchronized int shutdown()
224197
{
225198
int removed = 0;
226-
while (this.totalCount > 0) {
199+
while (this.aliveObjectCount.get() > 0) {
227200
Poolable<T> obj = objectQueue.poll();
228201
if (obj != null) {
229202
decreaseObject(obj);

rubix-spi/src/main/java/com/qubole/rubix/spi/fop/SocketChannelObjectFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ public SocketChannelObjectFactory(int port)
3737

3838
@Override
3939
public SocketChannel create(String host, int socketTimeout, int connectTimeout)
40+
throws IOException
4041
{
4142
SocketAddress sad = new InetSocketAddress(host, this.port);
42-
SocketChannel socket = null;
43+
SocketChannel socket;
4344
try {
4445
socket = SocketChannel.open();
4546
socket.socket().setSoTimeout(socketTimeout);
@@ -49,6 +50,7 @@ public SocketChannel create(String host, int socketTimeout, int connectTimeout)
4950
}
5051
catch (IOException e) {
5152
log.warn(LDS_POOL + " : Unable to open connection to host " + host, e);
53+
throw e;
5254
}
5355
return socket;
5456
}

rubix-spi/src/main/java/com/qubole/rubix/spi/fop/SocketObjectFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,17 @@ public SocketObjectFactory(int port)
3636

3737
@Override
3838
public TSocket create(String host, int socketTimeout, int connectTimeout)
39+
throws TTransportException
3940
{
4041
log.debug(BKS_POOL + " : Opening connection to host: " + host);
41-
TSocket socket = null;
42+
TSocket socket;
4243
try {
4344
socket = new TSocket(host, port, socketTimeout, connectTimeout);
4445
socket.open();
4546
}
4647
catch (TTransportException e) {
47-
socket = null;
4848
log.warn("Unable to open connection to host " + host, e);
49+
throw e;
4950
}
5051
return socket;
5152
}

0 commit comments

Comments
 (0)