Skip to content

seeing an invalid api request while connecting to kafka server with kerberos and tls #1220

@Naveenrajp26

Description

@Naveenrajp26

Hi,

i am seeing an error in the kafka server logs when i am connecting to kafka using kerberos with tls authentication.

But when i am using the confluent go package i am not seeing these errors ("https://github.com/confluentinc/confluent-kafka-go")

[2026-01-27 06:05:15,482] DEBUG Set SASL server state to HANDSHAKE_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2026-01-27 06:05:16,378] DEBUG Failed during authentication: Error parsing request header. Our best guess of the apiKey is: 24706 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2026-01-27 06:05:16,378] WARN [SocketServer brokerId=0] Unexpected error from inblr01lvpzpa03.apac.nsn-net.net/10.143.211.247; closing connection (org.apache.kafka.common.network.Selector) org.apache.kafka.common.errors.InvalidRequestException: Error parsing request header. Our best guess of the apiKey is: 24706 Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id 24706, it should be between 0and49(inclusive) at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:295) at org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:91) at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleKafkaRequest(SaslServerAuthenticator.java:478) at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:259) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:176) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547) at org.apache.kafka.common.network.Selector.poll(Selector.java:485) at kafka.network.Processor.poll(SocketServer.scala:913) at kafka.network.Processor.run(SocketServer.scala:816) at java.base/java.lang.Thread.run(Thread.java:1570) [2026-01-27 06:05:16,862] DEBUG connections.max.reauth.ms for mechanism=GSSAPI: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)

Code:
`package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"os"
"time"

"github.com/jcmturner/gokrb5/v8/client"
"github.com/jcmturner/gokrb5/v8/config"
"github.com/jcmturner/gokrb5/v8/keytab"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/sasl/kerberos"

)

func main() {
if len(os.Args) != 2 {
log.Fatal("Usage: go run main.go ")
}
topic := os.Args[1]

// --- Kerberos Configuration ---
krb5Conf, err := config.Load("/etc/krb5.conf")
if err != nil {
	log.Fatalf("Failed to load krb5.conf: %v", err)
}

// Adjust keytab path and principal to match your setup
kt, err := keytab.Load("/home/pavithra/downloads/kafka_client.keytab") // e.g., /etc/security/keytabs/kafka.keytab
if err != nil {
	log.Fatalf("Failed to load keytab: %v", err)
}

principal := "kafka_client/sim0142node02.tre.nsn-rdnet.net"
realm := "TRE.NSN-RDNET.NET"

krbClient := client.NewWithKeytab(principal, realm, kt, krb5Conf, client.DisablePAFXFAST(true))

// --- Franz-go Kerberos Mechanism ---
auth := kerberos.Auth{
	Client:           krbClient,
	Service:          "kafka", // matches sasl.kerberos.service.name=kafka in broker config
	PersistAfterAuth: true,
}
mech := auth.AsMechanismWithClose()

cert, err := tls.LoadX509KeyPair("/home/pavithra/downloads/kafka_2.12-2.6.0/certs/client_cert.pem", "/home/pavithra/downloads/kafka_2.12-2.6.0/certs/client_cert_key.pem")
if err != nil {
	panic(err)
}

caCert, err := os.ReadFile("/home/pavithra/downloads/kafka_2.12-2.6.0/certs/ca_cert.pem")
if err != nil {
	fmt.Errorf("failed to reaf the cacert file", err)
}
pool := x509.NewCertPool()
ok := pool.AppendCertsFromPEM(caCert)
if !ok {
	fmt.Errorf("failed to parse root certificate")
}

// --- Client Options ---
opts := []kgo.Opt{
	kgo.SeedBrokers(
		"sim0142node02.tre.nsn-rdnet.net:9093",
	),
	kgo.SASL(mech),
	kgo.DialTLSConfig(&tls.Config{
		//InsecureSkipVerify: true,
		Certificates: []tls.Certificate{cert},
		RootCAs:      pool,
		// InsecureSkipVerify: true, // Uncomment only for testing with self-signed certs
	}),
	kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, nil)),
}

cl, err := kgo.NewClient(opts...)
if err != nil {
	log.Fatalf("Failed to create client: %v", err)
}
defer cl.Close()
fmt.Printf("Kafka client created successfully\n")
fmt.Println("topic:" + topic)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

req := kmsg.NewPtrMetadataRequest()
topic1 := kmsg.NewMetadataRequestTopic()
topic1.Topic = kmsg.StringPtr(topic)
req.Topics = append(req.Topics, topic1)

res, err := req.RequestWith(ctx, cl)
if err != nil {
	panic(err)
}

fmt.Printf("Metadata response: %+v\n", res)

// Check response for Kafka error codes and print them.
// Other requests might have top level error codes, which indicate completed but failed requests.
for _, topic := range res.Topics {
	err := kerr.ErrorForCode(topic.ErrorCode)
	if err != nil {
		fmt.Printf("topic %v response has errored: %v\n", topic.Topic, err.Error())
	}

	fmt.Printf("topics:%v", topic)

	for _, p := range topic.Partitions {
		fmt.Printf("parition details:%v\n", p)
		fmt.Printf("Leader of partition:%v\n", p.Leader)
		fmt.Printf("ISR:%v\n", p.ISR)
		fmt.Printf("replicas:%v\n", p.Replicas)
	}
}

fmt.Printf("received '%v' topics and '%v' brokers\n", len(res.Topics), len(res.Brokers))

}
output:bash-5.1# ./kafka-frantz test123
Kafka client created successfully
topic:test123
[DEBUG] opening connection to broker; addr: sim0142node02.tre.nsn-rdnet.net:9093, broker: seed_0
[DEBUG] connection opened to broker; addr: sim0142node02.tre.nsn-rdnet.net:9093, broker: seed_0
[DEBUG] issuing api versions request; broker: seed_0, version: 4
[DEBUG] wrote ApiVersions v4; broker: seed_0, bytes_written: 31, write_wait: 2.689752ms, time_to_write: 492.506µs, err:
[DEBUG] read ApiVersions v4; broker: seed_0, bytes_read: 20, read_wait: 183.345µs, time_to_read: 172.497419ms, err:
[DEBUG] broker does not know our ApiVersions version but replied version 3, downgrading to version 3 and retrying; broker: seed_0
[DEBUG] issuing api versions request; broker: seed_0, version: 3
[DEBUG] wrote ApiVersions v3; broker: seed_0, bytes_written: 31, write_wait: 178.95µs, time_to_write: 138.998µs, err:
[DEBUG] read ApiVersions v3; broker: seed_0, bytes_read: 366, read_wait: 123.6µs, time_to_read: 249.627057ms, err:
[DEBUG] beginning sasl authentication; broker: seed_0, addr: sim0142node02.tre.nsn-rdnet.net:9093, mechanism: GSSAPI, authenticate: false
[DEBUG] issuing raw sasl authenticate; broker: seed_0, addr: sim0142node02.tre.nsn-rdnet.net:9093, step: 0
[DEBUG] connection initialization failed; addr: sim0142node02.tre.nsn-rdnet.net:9093, broker: seed_0, err: EOF
[DEBUG] opening connection to broker; addr: sim0142node02.tre.nsn-rdnet.net:9093, broker: seed_0
[DEBUG] connection opened to broker; addr: sim0142node02.tre.nsn-rdnet.net:9093, broker: seed_0
[DEBUG] beginning sasl authentication; broker: seed_0, addr: sim0142node02.tre.nsn-rdnet.net:9093, mechanism: GSSAPI, authenticate: false
[DEBUG] issuing raw sasl authenticate; broker: seed_0, addr: sim0142node02.tre.nsn-rdnet.net:9093, step: 0
[DEBUG] issuing raw sasl authenticate; broker: seed_0, addr: sim0142node02.tre.nsn-rdnet.net:9093, step: 1
[DEBUG] connection initialized successfully; addr: sim0142node02.tre.nsn-rdnet.net:9093, broker: seed_0
[DEBUG] wrote Metadata v9; broker: seed_0, bytes_written: 32, write_wait: 2.899156161s, time_to_write: 2.466342ms, err:
[DEBUG] read Metadata v9; broker: seed_0, bytes_read: 132, read_wait: 1.739665ms, time_to_read: 203.34131ms, err:
Metadata response: &{Version:9 ThrottleMillis:0 Brokers:[{NodeID:0 Host:sim0142node02.tre.nsn-rdnet.net Port:9093 Rack: UnknownTags:{keyvals:map[]}}] ClusterID:0xc0000222a0 ControllerID:0 Topics:[{ErrorCode:0 Topic:0xc0000222b0 TopicID:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] IsInternal:false Partitions:[{ErrorCode:0 Partition:0 Leader:0 LeaderEpoch:0 Replicas:[0] ISR:[0] OfflineReplicas:[] UnknownTags:{keyvals:map[]}}] AuthorizedOperations:-2147483648 UnknownTags:{keyvals:map[]}}] AuthorizedOperations:-2147483648 ErrorCode:0 UnknownTags:{keyvals:map[]}}
topics:{0 0xc0000222b0 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] false [{0 0 0 0 [0] [0] [] {map[]}}] -2147483648 {map[]}}parition details:{0 0 0 0 [0] [0] [] {map[]}}
Leader of partition:0
ISR:[0]
replicas:[0]
received '1' topics and '1' brokers`

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions