Skip to content

Commit a02de8f

Browse files
committed
Fix backpressure to use new instance per call
1 parent 70cd011 commit a02de8f

File tree

2 files changed

+11
-6
lines changed

2 files changed

+11
-6
lines changed

presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimProducer.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ public class FlightShimProducer
5454
{
5555
private static final Logger log = Logger.get(FlightShimProducer.class);
5656
private static final JsonCodec<FlightShimRequest> REQUEST_JSON_CODEC = jsonCodec(FlightShimRequest.class);
57+
private static final int CLIENT_POLL_TIME = 5000; // Backpressure poll time ms
5758
private final BufferAllocator allocator;
5859
private final FlightShimPluginManager pluginManager;
5960
private final FlightShimConfig config;
6061
private final ExecutorService shimExecutor;
61-
private final BackpressureStrategy backpressureStrategy;
6262

6363
@Inject
6464
public FlightShimProducer(BufferAllocator allocator, FlightShimPluginManager pluginManager, FlightShimConfig config, @ForFlightShimServer ExecutorService shimExecutor)
@@ -67,19 +67,19 @@ public FlightShimProducer(BufferAllocator allocator, FlightShimPluginManager plu
6767
this.pluginManager = requireNonNull(pluginManager, "pluginManager is null");
6868
this.config = requireNonNull(config, "config is null");
6969
this.shimExecutor = requireNonNull(shimExecutor, "shimExecutor is null");
70-
this.backpressureStrategy = new BackpressureStrategy.CallbackBackpressureStrategy();
7170
}
7271

7372
@Override
7473
public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener)
7574
{
7675
log.debug("Handling GetStream request");
77-
backpressureStrategy.register(listener);
7876
shimExecutor.submit(() -> runGetStreamAsync(context, ticket, listener));
7977
}
8078

8179
private void runGetStreamAsync(CallContext context, Ticket ticket, ServerStreamListener listener)
8280
{
81+
final BackpressureStrategy backpressureStrategy = new BackpressureStrategy.CallbackBackpressureStrategy();
82+
backpressureStrategy.register(listener);
8383
try {
8484
log.debug("Starting GetStream processing");
8585
FlightShimRequest request = REQUEST_JSON_CODEC.fromJson(ticket.getBytes());
@@ -125,8 +125,13 @@ private void runGetStreamAsync(CallContext context, Ticket ticket, ServerStreamL
125125
listener.setUseZeroCopy(true);
126126
listener.start(batchSource.getVectorSchemaRoot());
127127
while (batchSource.nextBatch()) {
128-
if (backpressureStrategy.waitForListener(0) != BackpressureStrategy.WaitResult.READY || listener.isCancelled()) {
129-
return;
128+
BackpressureStrategy.WaitResult waitResult;
129+
while ((waitResult = backpressureStrategy.waitForListener(CLIENT_POLL_TIME)) == BackpressureStrategy.WaitResult.TIMEOUT) {
130+
log.info(format("Waiting for client to read from connector %s", request.getConnectorId()));
131+
}
132+
if (waitResult != BackpressureStrategy.WaitResult.READY) {
133+
log.info(format("Read stopped from connector %s due to client wait result: %s", request.getConnectorId(), waitResult));
134+
break;
130135
}
131136
listener.putNext();
132137
}

0 commit comments

Comments
 (0)