Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor gRPC pool connection management #2895

Open
wants to merge 1 commit into
base: refactor/test-e2e/add-v2-e2e-strategic-testing
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,13 +996,11 @@ func TestGetActualValue(t *testing.T) {
func() test {
fname := "version"
return test{
name: "return file contents when val is file://env",
name: "return empty when not exists file contents",
args: args{
val: "file://" + fname,
},
want: want{
wantRes: "file://" + fname,
},
want: want{},
}
}(),
}
Expand Down
2 changes: 1 addition & 1 deletion internal/config/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (g *GRPCClient) Opts() ([]grpc.Option, error) {
if g.ConnectionPool != nil {
opts = append(opts,
grpc.WithConnectionPoolSize(g.ConnectionPool.Size),
grpc.WithOldConnCloseDuration(g.ConnectionPool.OldConnCloseDuration),
grpc.WithOldConnCloseDelay(g.ConnectionPool.OldConnCloseDuration),
grpc.WithResolveDNS(g.ConnectionPool.ResolveDNS),
)
if g.ConnectionPool.EnableRebalance {
Expand Down
20 changes: 20 additions & 0 deletions internal/errors/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//
// Copyright (C) 2019-2025 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package errors provides error types and function
package errors

var ErrArgumentParserNotFound = New("argument parser not found")
2 changes: 1 addition & 1 deletion internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
atomic.AddUint64(&g.clientCount, ^uint64(0))
if p != nil {
log.Debugf("gRPC client connection pool addr = %s will disconnect soon...", addr)
return nil, p.Disconnect()
return nil, p.Disconnect(ctx)
}
return nil, nil
})
Expand Down
2 changes: 1 addition & 1 deletion internal/net/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func WithClientInterceptors(names ...string) Option {
}
}

func WithOldConnCloseDuration(dur string) Option {
func WithOldConnCloseDelay(dur string) Option {
return func(g *gRPCClient) {
if len(dur) == 0 {
return
Expand Down
4 changes: 2 additions & 2 deletions internal/net/grpc/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3258,7 +3258,7 @@ package grpc
// }
// }
//
// func TestWithOldConnCloseDuration(t *testing.T) {
// func TestWithOldConnCloseDelay(t *testing.T) {
// type args struct {
// dur string
// }
Expand Down Expand Up @@ -3335,7 +3335,7 @@ package grpc
// checkFunc = defaultCheckFunc
// }
//
// got := WithOldConnCloseDuration(test.args.dur)
// got := WithOldConnCloseDelay(test.args.dur)
// if err := checkFunc(test.want, got); err != nil {
// tt.Errorf("error = %v", err)
// }
Expand Down
73 changes: 40 additions & 33 deletions internal/net/grpc/pool/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,133 +19,140 @@ package pool

import (
"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/sync/errgroup"
"github.com/vdaas/vald/internal/timeutil"
)

// Option defines a functional option for configuring the pool.
type Option func(*pool)

// Default options.
var defaultOptions = []Option{
WithSize(defaultPoolSize),
WithStartPort(80),
WithEndPort(65535),
WithErrGroup(errgroup.Get()),
WithDialTimeout("1s"),
WithOldConnCloseDuration("2m"),
WithOldConnCloseDelay("2m"),
WithResolveDNS(true),
}

// WithAddr sets the target address. It also extracts the host and port.
func WithAddr(addr string) Option {
return func(p *pool) {
if len(addr) == 0 {
if addr == "" {
return
}
p.addr = addr
var err error
// Attempt to split host and port.
if p.host, p.port, err = net.SplitHostPort(addr); err != nil {
p.host = addr
}
}
}

// WithHost sets the target host.
func WithHost(host string) Option {
return func(p *pool) {
if len(host) == 0 {
return
if host != "" {
p.host = host
}
p.host = host
}
}

// WithPort sets the target port.
func WithPort(port int) Option {
return func(p *pool) {
if port > 0 {
return
p.port = uint16(port)
}
p.port = uint16(port)
}
}

// WithStartPort sets the starting port for scanning.
func WithStartPort(port int) Option {
return func(p *pool) {
if port > 0 {
return
p.startPort = uint16(port)
}
p.startPort = uint16(port)
}
}

// WithEndPort sets the ending port for scanning.
func WithEndPort(port int) Option {
return func(p *pool) {
if port > 0 {
return
p.endPort = uint16(port)
}
p.endPort = uint16(port)
}
}

func WithResolveDNS(flg bool) Option {
// WithResolveDNS enables or disables DNS resolution.
func WithResolveDNS(enable bool) Option {
return func(p *pool) {
p.resolveDNS = flg
p.enableDNSLookup = enable
}
}

// WithBackoff sets the backoff strategy.
func WithBackoff(bo backoff.Backoff) Option {
return func(p *pool) {
if bo != nil {
return
p.bo = bo
}
p.bo = bo
}
}

// WithSize sets the pool size.
func WithSize(size uint64) Option {
return func(p *pool) {
if size < 1 {
return
}
p.size.Store(size)
p.poolSize.Store(size)
}
}

// WithDialOptions appends gRPC dial options.
func WithDialOptions(opts ...DialOption) Option {
return func(p *pool) {
if len(opts) > 0 {
if len(p.dopts) > 0 {
p.dopts = append(p.dopts, opts...)
} else {
p.dopts = opts
}
p.dialOpts = append(p.dialOpts, opts...)
}
}
}

// WithDialTimeout sets the dial timeout duration.
func WithDialTimeout(dur string) Option {
return func(p *pool) {
if len(dur) == 0 {
if dur == "" {
return
}
d, err := timeutil.Parse(dur)
if err != nil {
return
if t, err := timeutil.Parse(dur); err == nil {
p.dialTimeout = t
}
p.dialTimeout = d
}
}

func WithOldConnCloseDuration(dur string) Option {
// WithOldConnCloseDelay sets the delay before closing old connections.
func WithOldConnCloseDelay(dur string) Option {
return func(p *pool) {
if len(dur) == 0 {
if dur == "" {
return
}
d, err := timeutil.Parse(dur)
if err != nil {
return
if t, err := timeutil.Parse(dur); err == nil {
p.oldConnCloseDelay = t
}
p.roccd = d
}
}

// WithErrGroup sets the errgroup for goroutine management.
func WithErrGroup(eg errgroup.Group) Option {
return func(p *pool) {
if eg != nil {
p.eg = eg
p.errGroup = eg
}
}
}
4 changes: 2 additions & 2 deletions internal/net/grpc/pool/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ package pool
// }
// }
//
// func TestWithOldConnCloseDuration(t *testing.T) {
// func TestWithOldConnCloseDelay(t *testing.T) {
// type args struct {
// dur string
// }
Expand Down Expand Up @@ -946,7 +946,7 @@ package pool
// checkFunc = defaultCheckFunc
// }
//
// got := WithOldConnCloseDuration(test.args.dur)
// got := WithOldConnCloseDelay(test.args.dur)
// if err := checkFunc(test.want, got); err != nil {
// tt.Errorf("error = %v", err)
// }
Expand Down
Loading
Loading