@@ -112,6 +112,10 @@ <T> CompletionStage<T> to(org.apache.kafka.common.KafkaFuture<T> kafkaFuture) {
112112 return f ;
113113 }
114114
115+ private org .apache .kafka .clients .admin .Admin admin (boolean fromController ) {
116+ return fromController ? controllerAdmin : kafkaAdmin ;
117+ }
118+
115119 @ Override
116120 public String clientId () {
117121 return clientId ;
@@ -140,7 +144,7 @@ public CompletionStage<Void> feature(Map<String, Short> maxVersions) {
140144
141145 @ Override
142146 public CompletionStage <FeatureInfo > feature () {
143- return to (kafkaAdmin .describeFeatures ().featureMetadata ())
147+ return to (admin ( false ) .describeFeatures ().featureMetadata ())
144148 .thenApply (
145149 f ->
146150 new FeatureInfo (
@@ -172,7 +176,7 @@ public CompletionStage<Set<String>> topicNames(boolean listInternal) {
172176
173177 @ Override
174178 public CompletionStage <Set <String >> internalTopicNames () {
175- return to (kafkaAdmin .listTopics (new ListTopicsOptions ().listInternal (true )).namesToListings ())
179+ return to (admin ( false ) .listTopics (new ListTopicsOptions ().listInternal (true )).namesToListings ())
176180 .thenApply (
177181 ts ->
178182 ts .entrySet ().stream ()
@@ -182,15 +186,16 @@ public CompletionStage<Set<String>> internalTopicNames() {
182186 }
183187
184188 @ Override
185- public CompletionStage <List <Topic >> topics (Set <String > topics ) {
189+ public CompletionStage <List <Topic >> topics (Set <String > topics , boolean fromController ) {
186190 if (topics .isEmpty ()) return CompletableFuture .completedFuture (List .of ());
187-
188191 return FutureUtils .combine (
189192 doGetConfigs (
190193 topics .stream ()
191194 .map (topic -> new ConfigResource (ConfigResource .Type .TOPIC , topic ))
192- .collect (Collectors .toList ())),
193- to (kafkaAdmin .describeTopics (topics ).allTopicNames ()),
195+ .collect (Collectors .toList ()),
196+ fromController ),
197+ // Quorum controller does not support DescribeTopicPartitionsRequest
198+ to (admin (false ).describeTopics (topics ).allTopicNames ()),
194199 (configs , desc ) ->
195200 configs .entrySet ().stream ()
196201 .map (entry -> Topic .of (entry .getKey (), desc .get (entry .getKey ()), entry .getValue ()))
@@ -201,7 +206,7 @@ public CompletionStage<List<Topic>> topics(Set<String> topics) {
201206 @ Override
202207 public CompletionStage <Void > deleteTopics (Set <String > topics ) {
203208 if (topics .isEmpty ()) return CompletableFuture .completedFuture (null );
204- return to (kafkaAdmin .deleteTopics (topics ).all ());
209+ return to (admin ( false ) .deleteTopics (topics ).all ());
205210 }
206211
207212 @ Override
@@ -255,7 +260,7 @@ public CompletionStage<Void> deleteInstanceMembers(Map<String, Set<String>> grou
255260 public CompletionStage <Void > deleteMembers (Set <String > consumerGroups ) {
256261 // kafka APIs disallow to remove all members when there are no members ...
257262 // Hence, we have to filter the non-empty groups first.
258- return to (kafkaAdmin .describeConsumerGroups (consumerGroups ).all ())
263+ return to (admin ( false ) .describeConsumerGroups (consumerGroups ).all ())
259264 .thenApply (
260265 groups ->
261266 groups .entrySet ().stream ()
@@ -280,13 +285,13 @@ group, new RemoveMembersFromConsumerGroupOptions())
280285 @ Override
281286 public CompletionStage <Void > deleteGroups (Set <String > consumerGroups ) {
282287 return deleteMembers (consumerGroups )
283- .thenCompose (ignored -> to (kafkaAdmin .deleteConsumerGroups (consumerGroups ).all ()));
288+ .thenCompose (ignored -> to (admin ( false ) .deleteConsumerGroups (consumerGroups ).all ()));
284289 }
285290
286291 @ Override
287292 public CompletionStage <Set <TopicPartition >> topicPartitions (Set <String > topics ) {
288293 if (topics .isEmpty ()) return CompletableFuture .completedFuture (Set .of ());
289- return to (kafkaAdmin .describeTopics (topics ).allTopicNames ())
294+ return to (admin ( false ) .describeTopics (topics ).allTopicNames ())
290295 .thenApply (
291296 r ->
292297 r .entrySet ().stream ()
@@ -301,7 +306,7 @@ public CompletionStage<Set<TopicPartition>> topicPartitions(Set<String> topics)
301306 public CompletionStage <Set <TopicPartitionReplica >> topicPartitionReplicas (Set <Integer > brokers ) {
302307 if (brokers .isEmpty ()) return CompletableFuture .completedFuture (Set .of ());
303308 return topicNames (true )
304- .thenCompose (topics -> to (kafkaAdmin .describeTopics (topics ).allTopicNames ()))
309+ .thenCompose (topics -> to (admin ( false ) .describeTopics (topics ).allTopicNames ()))
305310 .thenApply (
306311 r ->
307312 r .entrySet ().stream ()
@@ -327,7 +332,7 @@ public CompletionStage<Set<TopicPartitionReplica>> topicPartitionReplicas(Set<In
327332 */
328333 private CompletionStage <Set <TopicPartition >> updatableTopicPartitions (Set <String > topics ) {
329334 if (topics .isEmpty ()) return CompletableFuture .completedFuture (Set .of ());
330- return to (kafkaAdmin .describeTopics (topics ).allTopicNames ())
335+ return to (admin ( false ) .describeTopics (topics ).allTopicNames ())
331336 .thenApply (
332337 ts ->
333338 ts .entrySet ().stream ()
@@ -441,7 +446,7 @@ public CompletionStage<Map<TopicPartition, Long>> maxTimestamps(
441446 public CompletionStage <List <Partition >> partitions (Set <String > topics ) {
442447 if (topics .isEmpty ()) return CompletableFuture .completedFuture (List .of ());
443448 var updatableTopicPartitions = updatableTopicPartitions (topics );
444- var topicDesc = to (kafkaAdmin .describeTopics (topics ).allTopicNames ());
449+ var topicDesc = to (admin ( false ) .describeTopics (topics ).allTopicNames ());
445450 return FutureUtils .combine (
446451 updatableTopicPartitions .thenCompose (this ::earliestOffsets ),
447452 updatableTopicPartitions .thenCompose (this ::latestOffsets ),
@@ -512,7 +517,7 @@ public CompletionStage<List<Broker>> brokers() {
512517
513518 @ Override
514519 public CompletionStage <List <Controller >> controllers () {
515- return to (controllerAdmin .describeCluster ().nodes ())
520+ return to (admin ( true ) .describeCluster ().nodes ())
516521 .thenCompose (
517522 nodes ->
518523 to (controllerAdmin
@@ -554,7 +559,7 @@ public CompletionStage<List<Controller>> controllers() {
554559 }
555560
556561 private CompletionStage <Map .Entry <String , List <Broker >>> clusterIdAndBrokers () {
557- var cluster = kafkaAdmin .describeCluster ();
562+ var cluster = admin ( false ) .describeCluster ();
558563 var nodeFuture = to (cluster .nodes ());
559564 return FutureUtils .combine (
560565 to (cluster .clusterId ()),
@@ -573,7 +578,8 @@ private CompletionStage<Map.Entry<String, List<Broker>>> clusterIdAndBrokers() {
573578 n ->
574579 new ConfigResource (
575580 ConfigResource .Type .BROKER , String .valueOf (n .id ())))
576- .collect (Collectors .toList ()))),
581+ .collect (Collectors .toList ()),
582+ false )),
577583 nodeFuture ,
578584 (id , controller , logDirs , configs , nodes ) ->
579585 Map .entry (
@@ -592,7 +598,7 @@ private CompletionStage<Map.Entry<String, List<Broker>>> clusterIdAndBrokers() {
592598
593599 @ Override
594600 public CompletionStage <Set <String >> consumerGroupIds () {
595- return to (kafkaAdmin .listGroups ().all ())
601+ return to (admin ( false ) .listGroups ().all ())
596602 .thenApply (
597603 gs ->
598604 gs .stream ()
@@ -604,7 +610,7 @@ public CompletionStage<Set<String>> consumerGroupIds() {
604610 public CompletionStage <List <ConsumerGroup >> consumerGroups (Set <String > consumerGroupIds ) {
605611 if (consumerGroupIds .isEmpty ()) return CompletableFuture .completedFuture (List .of ());
606612 return FutureUtils .combine (
607- to (kafkaAdmin .describeConsumerGroups (consumerGroupIds ).all ()),
613+ to (admin ( false ) .describeConsumerGroups (consumerGroupIds ).all ()),
608614 FutureUtils .sequence (
609615 consumerGroupIds .stream ()
610616 .map (
@@ -694,7 +700,7 @@ public CompletionStage<List<ProducerState>> producerStates(Set<TopicPartition> p
694700
695701 @ Override
696702 public CompletionStage <Set <String >> transactionIds () {
697- return to (kafkaAdmin .listTransactions ().all ())
703+ return to (admin ( false ) .listTransactions ().all ())
698704 .thenApply (
699705 t ->
700706 t .stream ()
@@ -705,7 +711,7 @@ public CompletionStage<Set<String>> transactionIds() {
705711 @ Override
706712 public CompletionStage <List <Transaction >> transactions (Set <String > transactionIds ) {
707713 if (transactionIds .isEmpty ()) return CompletableFuture .completedFuture (List .of ());
708- return to (kafkaAdmin .describeTransactions (transactionIds ).all ())
714+ return to (admin ( false ) .describeTransactions (transactionIds ).all ())
709715 .thenApply (
710716 ts ->
711717 ts .entrySet ().stream ()
@@ -734,8 +740,8 @@ private CompletionStage<List<Replica>> replicas(Set<String> topics) {
734740 // pre-group folders by (broker -> topic partition) to speedup seek
735741 return FutureUtils .combine (
736742 logDirs (),
737- to (kafkaAdmin .describeTopics (topics ).allTopicNames ()),
738- to (kafkaAdmin .listPartitionReassignments ().reassignments ())
743+ to (admin ( false ) .describeTopics (topics ).allTopicNames ()),
744+ to (admin ( false ) .listPartitionReassignments ().reassignments ())
739745 // supported version: 2.4.0
740746 // https://issues.apache.org/jira/browse/KAFKA-8345
741747 .exceptionally (exceptionHandler (UnsupportedVersionException .class , Map .of ())),
@@ -854,7 +860,7 @@ public CompletionStage<List<Quota>> quotas() {
854860
855861 private CompletionStage <Map <ClientQuotaEntity , Map <String , Double >>> quotas (
856862 ClientQuotaFilter filter ) {
857- return to (kafkaAdmin .describeClientQuotas (filter ).entities ());
863+ return to (admin ( false ) .describeClientQuotas (filter ).entities ());
858864 }
859865
860866 @ Override
@@ -897,7 +903,7 @@ public CompletionStage<Void> setConnectionQuota(int rate) {
897903
898904 @ Override
899905 public CompletionStage <QuorumInfo > quorumInfo () {
900- return to (kafkaAdmin .describeMetadataQuorum ().quorumInfo ())
906+ return to (admin ( false ) .describeMetadataQuorum ().quorumInfo ())
901907 .thenApply (
902908 quorumInfo ->
903909 new QuorumInfo (
@@ -951,7 +957,7 @@ public CompletionStage<Void> addVoter(
951957
952958 @ Override
953959 public CompletionStage <Void > removeVoter (int nodeId , String directoryId ) {
954- return to (kafkaAdmin .removeRaftVoter (nodeId , Uuid .fromString (directoryId )).all ());
960+ return to (admin ( false ) .removeRaftVoter (nodeId , Uuid .fromString (directoryId )).all ());
955961 }
956962
957963 @ Override
@@ -1295,7 +1301,7 @@ public CompletionStage<Void> preferredLeaderElection(Set<TopicPartition> partiti
12951301
12961302 @ Override
12971303 public CompletionStage <Void > addPartitions (String topic , int total ) {
1298- return to (kafkaAdmin .createPartitions (Map .of (topic , NewPartitions .increaseTo (total ))).all ());
1304+ return to (admin ( false ) .createPartitions (Map .of (topic , NewPartitions .increaseTo (total ))).all ());
12991305 }
13001306
13011307 @ Override
@@ -1381,8 +1387,8 @@ public CompletionStage<org.astraea.common.admin.Config> clusterConfigs() {
13811387 }
13821388
13831389 private CompletionStage <Map <String , Map <String , String >>> doGetConfigs (
1384- Collection <ConfigResource > resources ) {
1385- return to (kafkaAdmin .describeConfigs (resources ).all ())
1390+ Collection <ConfigResource > resources , boolean fromController ) {
1391+ return to (admin ( fromController ) .describeConfigs (resources ).all ())
13861392 .thenApply (
13871393 allConfigs ->
13881394 allConfigs .entrySet ().stream ()
@@ -1405,7 +1411,7 @@ private CompletionStage<Void> doAppendConfigs(Map<ConfigResource, Map<String, St
14051411 .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
14061412 if (nonEmptyAppend .isEmpty ()) return CompletableFuture .completedFuture (null );
14071413
1408- return doGetConfigs (nonEmptyAppend .keySet ())
1414+ return doGetConfigs (nonEmptyAppend .keySet (), false )
14091415 .thenCompose (
14101416 allConfigs -> {
14111417 // append to empty will cause bug (see https://github.com/apache/kafka/pull/12503)
@@ -1467,7 +1473,7 @@ private CompletionStage<Void> doAppendConfigs(Map<ConfigResource, Map<String, St
14671473
14681474 return doSetConfigs (requestToSet )
14691475 .thenCompose (
1470- ignored -> to (kafkaAdmin .incrementalAlterConfigs (requestToAppend ).all ()));
1476+ ignored -> to (admin ( false ) .incrementalAlterConfigs (requestToAppend ).all ()));
14711477 });
14721478 }
14731479
@@ -1481,7 +1487,7 @@ private CompletionStage<Void> doSubtractConfigs(
14811487
14821488 if (nonEmptySubtract .isEmpty ()) return CompletableFuture .completedFuture (null );
14831489
1484- return doGetConfigs (nonEmptySubtract .keySet ())
1490+ return doGetConfigs (nonEmptySubtract .keySet (), false )
14851491 .thenCompose (
14861492 configs -> {
14871493 Map <ConfigResource , Collection <AlterConfigOp >> requestToSubtract =
@@ -1528,7 +1534,7 @@ private CompletionStage<Void> doSubtractConfigs(
15281534 new ConfigEntry (e .getKey (), e .getValue ()),
15291535 AlterConfigOp .OpType .SUBTRACT ))
15301536 .collect (Collectors .toList ())));
1531- return to (kafkaAdmin .incrementalAlterConfigs (requestToSubtract ).all ());
1537+ return to (admin ( false ) .incrementalAlterConfigs (requestToSubtract ).all ());
15321538 });
15331539 }
15341540
@@ -1557,7 +1563,7 @@ private CompletionStage<Void> doSetConfigs(Map<ConfigResource, Map<String, Strin
15571563 .collect (Collectors .toList ())))
15581564 .filter (e -> !e .getValue ().isEmpty ())
15591565 .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
1560- return to (kafkaAdmin .incrementalAlterConfigs (newVersion .get ()).all ());
1566+ return to (admin ( false ) .incrementalAlterConfigs (newVersion .get ()).all ());
15611567 }
15621568
15631569 private CompletionStage <Void > doUnsetConfigs (Map <ConfigResource , Set <String >> unset ) {
@@ -1582,12 +1588,12 @@ private CompletionStage<Void> doUnsetConfigs(Map<ConfigResource, Set<String>> un
15821588 new ConfigEntry (key , "" ), AlterConfigOp .OpType .DELETE ))
15831589 .collect (Collectors .toList ())));
15841590
1585- return to (kafkaAdmin .incrementalAlterConfigs (newVersion .get ()).all ());
1591+ return to (admin ( false ) .incrementalAlterConfigs (newVersion .get ()).all ());
15861592 }
15871593
15881594 @ Override
15891595 public void close () {
1590- kafkaAdmin .close ();
1596+ admin ( false ) .close ();
15911597 controllerAdmin .close ();
15921598 }
15931599
@@ -1599,7 +1605,7 @@ public void close() {
15991605 return brokers ()
16001606 .thenApply (
16011607 brokers -> brokers .stream ().map (Broker ::id ).collect (Collectors .toUnmodifiableSet ()))
1602- .thenCompose (ids -> to (kafkaAdmin .describeLogDirs (ids ).allDescriptions ()))
1608+ .thenCompose (ids -> to (admin ( false ) .describeLogDirs (ids ).allDescriptions ()))
16031609 .thenApply (
16041610 ds ->
16051611 ds .entrySet ().stream ()
0 commit comments