@@ -54,11 +54,11 @@ public class FlightShimProducer
5454{
5555 private static final Logger log = Logger .get (FlightShimProducer .class );
5656 private static final JsonCodec <FlightShimRequest > REQUEST_JSON_CODEC = jsonCodec (FlightShimRequest .class );
57+ private static final int CLIENT_POLL_TIME = 5000 ; // Backpressure poll time ms
5758 private final BufferAllocator allocator ;
5859 private final FlightShimPluginManager pluginManager ;
5960 private final FlightShimConfig config ;
6061 private final ExecutorService shimExecutor ;
61- private final BackpressureStrategy backpressureStrategy ;
6262
6363 @ Inject
6464 public FlightShimProducer (BufferAllocator allocator , FlightShimPluginManager pluginManager , FlightShimConfig config , @ ForFlightShimServer ExecutorService shimExecutor )
@@ -67,19 +67,19 @@ public FlightShimProducer(BufferAllocator allocator, FlightShimPluginManager plu
6767 this .pluginManager = requireNonNull (pluginManager , "pluginManager is null" );
6868 this .config = requireNonNull (config , "config is null" );
6969 this .shimExecutor = requireNonNull (shimExecutor , "shimExecutor is null" );
70- this .backpressureStrategy = new BackpressureStrategy .CallbackBackpressureStrategy ();
7170 }
7271
7372 @ Override
7473 public void getStream (CallContext context , Ticket ticket , ServerStreamListener listener )
7574 {
7675 log .debug ("Handling GetStream request" );
77- backpressureStrategy .register (listener );
7876 shimExecutor .submit (() -> runGetStreamAsync (context , ticket , listener ));
7977 }
8078
8179 private void runGetStreamAsync (CallContext context , Ticket ticket , ServerStreamListener listener )
8280 {
81+ final BackpressureStrategy backpressureStrategy = new BackpressureStrategy .CallbackBackpressureStrategy ();
82+ backpressureStrategy .register (listener );
8383 try {
8484 log .debug ("Starting GetStream processing" );
8585 FlightShimRequest request = REQUEST_JSON_CODEC .fromJson (ticket .getBytes ());
@@ -125,8 +125,13 @@ private void runGetStreamAsync(CallContext context, Ticket ticket, ServerStreamL
125125 listener .setUseZeroCopy (true );
126126 listener .start (batchSource .getVectorSchemaRoot ());
127127 while (batchSource .nextBatch ()) {
128- if (backpressureStrategy .waitForListener (0 ) != BackpressureStrategy .WaitResult .READY || listener .isCancelled ()) {
129- return ;
128+ BackpressureStrategy .WaitResult waitResult ;
129+ while ((waitResult = backpressureStrategy .waitForListener (CLIENT_POLL_TIME )) == BackpressureStrategy .WaitResult .TIMEOUT ) {
130+ log .info (format ("Waiting for client to read from connector %s" , request .getConnectorId ()));
131+ }
132+ if (waitResult != BackpressureStrategy .WaitResult .READY ) {
133+ log .info (format ("Read stopped from connector %s due to client wait result: %s" , request .getConnectorId (), waitResult ));
134+ break ;
130135 }
131136 listener .putNext ();
132137 }
0 commit comments