From 5f0a3faa0bc4be644301e2ec7645caf7b304df12 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 30 Dec 2023 23:39:07 -0700 Subject: [PATCH] kgo tests: support TLS via KGO_TEST_TLS small dsl, hopefully simple enough --- pkg/kgo/client_test.go | 4 +-- pkg/kgo/consumer_direct_test.go | 24 +++++---------- pkg/kgo/group_test.go | 6 ++-- pkg/kgo/helpers_test.go | 54 ++++++++++++++++++++++++++++++++- pkg/kgo/txn_test.go | 5 ++- 5 files changed, 66 insertions(+), 27 deletions(-) diff --git a/pkg/kgo/client_test.go b/pkg/kgo/client_test.go index 8b29fa11..968609ee 100644 --- a/pkg/kgo/client_test.go +++ b/pkg/kgo/client_test.go @@ -113,9 +113,7 @@ func TestUnknownGroupOffsetFetchPinned(t *testing.T) { req := kmsg.NewOffsetFetchRequest() req.Group = "unknown-" + strconv.FormatInt(time.Now().UnixNano(), 10) - cl, _ := NewClient( - getSeedBrokers(), - ) + cl, _ := newTestClient() defer cl.Close() defer func() { if err := recover(); err != nil { diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index f4ddc4db..8f74fa89 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -18,8 +18,7 @@ func TestIssue325(t *testing.T) { topic, cleanup := tmpTopic(t) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( DefaultProduceTopic(topic), UnknownTopicRetries(-1), ) @@ -45,8 +44,7 @@ func TestIssue337(t *testing.T) { topic, cleanup := tmpTopicPartitions(t, 2) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( DefaultProduceTopic(topic), RecordPartitioner(ManualPartitioner()), UnknownTopicRetries(-1), @@ -92,8 +90,7 @@ func TestDirectPartitionPurge(t *testing.T) { topic, cleanup := tmpTopicPartitions(t, 2) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( DefaultProduceTopic(topic), RecordPartitioner(ManualPartitioner()), UnknownTopicRetries(-1), @@ -155,8 +152,7 @@ func TestIssue434(t *testing.T) { defer cleanup1() defer cleanup2() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( UnknownTopicRetries(-1), ConsumeTopics(fmt.Sprintf("(%s|%s)", t1, t2)), ConsumeRegex(), @@ -209,8 +205,7 @@ func TestAddRemovePartitions(t *testing.T) { t1, cleanup := tmpTopicPartitions(t, 2) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( UnknownTopicRetries(-1), RecordPartitioner(ManualPartitioner()), FetchMaxWait(100*time.Millisecond), @@ -278,8 +273,7 @@ func TestPauseIssue489(t *testing.T) { t1, cleanup := tmpTopicPartitions(t, 3) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( UnknownTopicRetries(-1), DefaultProduceTopic(t1), RecordPartitioner(ManualPartitioner()), @@ -360,8 +354,7 @@ func TestPauseIssueOct2023(t *testing.T) { defer cleanup3() ts := []string{t1, t2, t3} - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( UnknownTopicRetries(-1), ConsumeTopics(ts...), MetadataMinAge(50*time.Millisecond), @@ -438,8 +431,7 @@ func TestIssue523(t *testing.T) { g1, gcleanup := tmpGroup(t) defer gcleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( DefaultProduceTopic(t1), ConsumeTopics(".*"+t1+".*"), ConsumeRegex(), diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 6e06da23..c0fa8d28 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -35,8 +35,7 @@ func TestGroupETL(t *testing.T) { //////////////////// go func() { - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), MaxBufferedRecords(10000), MaxBufferedBytes(50000), @@ -118,7 +117,6 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { netls := 0 // for if etlsBeforeQuit is non-negative opts := []Opt{ - getSeedBrokers(), UnknownTopicRetries(-1), // see txn_test comment WithLogger(testLogger()), ConsumerGroup(c.group), @@ -152,7 +150,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { OnPartitionsLost(func(context.Context, *Client, map[string][]int32) {}), } - cl, _ := NewClient(opts...) + cl, _ := newTestClient(opts...) defer cl.Close() defer func() { diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index f7b8e97f..d6f7fed3 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -3,6 +3,8 @@ package kgo import ( "context" "crypto/sha256" + "crypto/tls" + "crypto/x509" "encoding/hex" "errors" "fmt" @@ -36,6 +38,9 @@ var ( // cannot use EndAndBeginTransaction with EndBeginTxnUnsafe. allowUnsafe = false + // DSL syntax is ({ca|cert|key}:path),{1,3} + testCert *tls.Config + // We create topics with a different number of partitions to exercise // a few extra code paths; we index into npartitions with npartitionsAt, // an atomic that we modulo after load. @@ -45,7 +50,7 @@ var ( func init() { var err error - adm, err = NewClient(getSeedBrokers()) + adm, err = newTestClient() if err != nil { panic(fmt.Sprintf("unable to create admin client: %v", err)) } @@ -62,6 +67,53 @@ func init() { if _, exists := os.LookupEnv("KGO_TEST_UNSAFE"); exists { allowUnsafe = true } + if paths, exists := os.LookupEnv("KGO_TEST_TLS"); exists { + var caPath, certPath, keyPath string + for _, path := range strings.Split(paths, ",") { + switch { + case strings.HasPrefix(path, "ca:"): + caPath = path[3:] + case strings.HasPrefix(path, "cert:"): + certPath = path[5:] + case strings.HasPrefix(path, "key:"): + keyPath = path[4:] + default: + panic(fmt.Sprintf("invalid tls format %q", path)) + } + } + if caPath != "" { + ca, err := os.ReadFile(caPath) + if err != nil { + panic(fmt.Sprintf("unable to read ca: %v", err)) + } + testCert = &tls.Config{RootCAs: x509.NewCertPool()} + if !testCert.RootCAs.AppendCertsFromPEM(ca) { + panic("unable to append ca") + } + } + if certPath != "" || keyPath != "" { + if certPath == "" || keyPath == "" { + panic("both cert path and key path must be specified") + } + cert, err := tls.LoadX509KeyPair(certPath, keyPath) + if err != nil { + panic(fmt.Sprintf("unable to load cert/key pair: %v", err)) + } + testCert.Certificates = append(testCert.Certificates, cert) + } + } +} + +func testClientOpts(opts ...Opt) []Opt { + opts = append(opts, getSeedBrokers()) + if testCert != nil { + opts = append(opts, DialTLSConfig(testCert)) + } + return opts +} + +func newTestClient(opts ...Opt) (*Client, error) { + return NewClient(testClientOpts(opts...)...) } func getSeedBrokers() Opt { diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index 63a6f5c8..fc1fdbe8 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -26,8 +26,7 @@ func TestTxnEtl(t *testing.T) { //////////////////// go func() { - cl, err := NewClient( - getSeedBrokers(), + cl, err := newTestClient( WithLogger(BasicLogger(os.Stderr, testLogLevel, func() string { return time.Now().UTC().Format("15:04:05.999") + " " })), @@ -139,7 +138,6 @@ func (c *testConsumer) transact(txnsBeforeQuit int) { defer c.wg.Done() opts := []Opt{ - getSeedBrokers(), // Kraft sometimes returns success from topic creation, and // then returns UnknownTopicXyz for a while in metadata loads. // It also returns NotLeaderXyz; we handle both problems. @@ -160,6 +158,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) { if requireStableFetch { opts = append(opts, RequireStableFetchOffsets()) } + opts = append(opts, testClientOpts()...) txnSess, _ := NewGroupTransactSession(opts...) defer txnSess.Close()