Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5a48f29

Browse files
committedMar 19, 2025··
Refactor gRPC pool connection management
Signed-off-by: kpango <kpango@vdaas.org>
1 parent 18682c3 commit 5a48f29

File tree

19 files changed

+762
-926
lines changed

19 files changed

+762
-926
lines changed
 

‎internal/config/config_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -996,13 +996,11 @@ func TestGetActualValue(t *testing.T) {
996996
func() test {
997997
fname := "version"
998998
return test{
999-
name: "return file contents when val is file://env",
999+
name: "return empty when not exists file contents",
10001000
args: args{
10011001
val: "file://" + fname,
10021002
},
1003-
want: want{
1004-
wantRes: "file://" + fname,
1005-
},
1003+
want: want{},
10061004
}
10071005
}(),
10081006
}

‎internal/config/grpc.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (g *GRPCClient) Opts() ([]grpc.Option, error) {
183183
if g.ConnectionPool != nil {
184184
opts = append(opts,
185185
grpc.WithConnectionPoolSize(g.ConnectionPool.Size),
186-
grpc.WithOldConnCloseDuration(g.ConnectionPool.OldConnCloseDuration),
186+
grpc.WithOldConnCloseDelay(g.ConnectionPool.OldConnCloseDuration),
187187
grpc.WithResolveDNS(g.ConnectionPool.ResolveDNS),
188188
)
189189
if g.ConnectionPool.EnableRebalance {

‎internal/errors/params.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
//
2+
// Copyright (C) 2019-2025 vdaas.org vald team <vald@vdaas.org>
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// You may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
// Package errors provides error types and function
18+
package errors
19+
20+
var ErrArgumentParserNotFound = New("argument parser not found")

‎internal/net/grpc/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1066,7 +1066,7 @@ func (g *gRPCClient) Disconnect(ctx context.Context, addr string) error {
10661066
atomic.AddUint64(&g.clientCount, ^uint64(0))
10671067
if p != nil {
10681068
log.Debugf("gRPC client connection pool addr = %s will disconnect soon...", addr)
1069-
return nil, p.Disconnect()
1069+
return nil, p.Disconnect(ctx)
10701070
}
10711071
return nil, nil
10721072
})

‎internal/net/grpc/option.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ func WithClientInterceptors(names ...string) Option {
653653
}
654654
}
655655

656-
func WithOldConnCloseDuration(dur string) Option {
656+
func WithOldConnCloseDelay(dur string) Option {
657657
return func(g *gRPCClient) {
658658
if len(dur) == 0 {
659659
return

‎internal/net/grpc/option_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -3258,7 +3258,7 @@ package grpc
32583258
// }
32593259
// }
32603260
//
3261-
// func TestWithOldConnCloseDuration(t *testing.T) {
3261+
// func TestWithOldConnCloseDelay(t *testing.T) {
32623262
// type args struct {
32633263
// dur string
32643264
// }
@@ -3335,7 +3335,7 @@ package grpc
33353335
// checkFunc = defaultCheckFunc
33363336
// }
33373337
//
3338-
// got := WithOldConnCloseDuration(test.args.dur)
3338+
// got := WithOldConnCloseDelay(test.args.dur)
33393339
// if err := checkFunc(test.want, got); err != nil {
33403340
// tt.Errorf("error = %v", err)
33413341
// }

‎internal/net/grpc/pool/option.go

+40-33
Original file line numberDiff line numberDiff line change
@@ -19,133 +19,140 @@ package pool
1919

2020
import (
2121
"github.com/vdaas/vald/internal/backoff"
22+
"github.com/vdaas/vald/internal/net"
2223
"github.com/vdaas/vald/internal/sync/errgroup"
2324
"github.com/vdaas/vald/internal/timeutil"
2425
)
2526

27+
// Option defines a functional option for configuring the pool.
2628
type Option func(*pool)
2729

30+
// Default options.
2831
var defaultOptions = []Option{
2932
WithSize(defaultPoolSize),
3033
WithStartPort(80),
3134
WithEndPort(65535),
3235
WithErrGroup(errgroup.Get()),
3336
WithDialTimeout("1s"),
34-
WithOldConnCloseDuration("2m"),
37+
WithOldConnCloseDelay("2m"),
3538
WithResolveDNS(true),
3639
}
3740

41+
// WithAddr sets the target address. It also extracts the host and port.
3842
func WithAddr(addr string) Option {
3943
return func(p *pool) {
40-
if len(addr) == 0 {
44+
if addr == "" {
4145
return
4246
}
4347
p.addr = addr
48+
var err error
49+
// Attempt to split host and port.
50+
if p.host, p.port, err = net.SplitHostPort(addr); err != nil {
51+
p.host = addr
52+
}
4453
}
4554
}
4655

56+
// WithHost sets the target host.
4757
func WithHost(host string) Option {
4858
return func(p *pool) {
49-
if len(host) == 0 {
50-
return
59+
if host != "" {
60+
p.host = host
5161
}
52-
p.host = host
5362
}
5463
}
5564

65+
// WithPort sets the target port.
5666
func WithPort(port int) Option {
5767
return func(p *pool) {
5868
if port > 0 {
59-
return
69+
p.port = uint16(port)
6070
}
61-
p.port = uint16(port)
6271
}
6372
}
6473

74+
// WithStartPort sets the starting port for scanning.
6575
func WithStartPort(port int) Option {
6676
return func(p *pool) {
6777
if port > 0 {
68-
return
78+
p.startPort = uint16(port)
6979
}
70-
p.startPort = uint16(port)
7180
}
7281
}
7382

83+
// WithEndPort sets the ending port for scanning.
7484
func WithEndPort(port int) Option {
7585
return func(p *pool) {
7686
if port > 0 {
77-
return
87+
p.endPort = uint16(port)
7888
}
79-
p.endPort = uint16(port)
8089
}
8190
}
8291

83-
func WithResolveDNS(flg bool) Option {
92+
// WithResolveDNS enables or disables DNS resolution.
93+
func WithResolveDNS(enable bool) Option {
8494
return func(p *pool) {
85-
p.resolveDNS = flg
95+
p.enableDNSLookup = enable
8696
}
8797
}
8898

99+
// WithBackoff sets the backoff strategy.
89100
func WithBackoff(bo backoff.Backoff) Option {
90101
return func(p *pool) {
91102
if bo != nil {
92-
return
103+
p.bo = bo
93104
}
94-
p.bo = bo
95105
}
96106
}
97107

108+
// WithSize sets the pool size.
98109
func WithSize(size uint64) Option {
99110
return func(p *pool) {
100111
if size < 1 {
101112
return
102113
}
103-
p.size.Store(size)
114+
p.poolSize.Store(size)
104115
}
105116
}
106117

118+
// WithDialOptions appends gRPC dial options.
107119
func WithDialOptions(opts ...DialOption) Option {
108120
return func(p *pool) {
109121
if len(opts) > 0 {
110-
if len(p.dopts) > 0 {
111-
p.dopts = append(p.dopts, opts...)
112-
} else {
113-
p.dopts = opts
114-
}
122+
p.dialOpts = append(p.dialOpts, opts...)
115123
}
116124
}
117125
}
118126

127+
// WithDialTimeout sets the dial timeout duration.
119128
func WithDialTimeout(dur string) Option {
120129
return func(p *pool) {
121-
if len(dur) == 0 {
130+
if dur == "" {
122131
return
123132
}
124-
d, err := timeutil.Parse(dur)
125-
if err != nil {
126-
return
133+
if t, err := timeutil.Parse(dur); err == nil {
134+
p.dialTimeout = t
127135
}
128-
p.dialTimeout = d
129136
}
130137
}
131138

132-
func WithOldConnCloseDuration(dur string) Option {
139+
// WithOldConnCloseDelay sets the delay before closing old connections.
140+
func WithOldConnCloseDelay(dur string) Option {
133141
return func(p *pool) {
134-
if len(dur) == 0 {
142+
if dur == "" {
135143
return
136144
}
137-
d, err := timeutil.Parse(dur)
138-
if err != nil {
139-
return
145+
if t, err := timeutil.Parse(dur); err == nil {
146+
p.oldConnCloseDelay = t
140147
}
141-
p.roccd = d
142148
}
143149
}
144150

151+
// WithErrGroup sets the errgroup for goroutine management.
145152
func WithErrGroup(eg errgroup.Group) Option {
146153
return func(p *pool) {
147154
if eg != nil {
148-
p.eg = eg
155+
p.errGroup = eg
149156
}
150157
}
151158
}

‎internal/net/grpc/pool/option_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ package pool
869869
// }
870870
// }
871871
//
872-
// func TestWithOldConnCloseDuration(t *testing.T) {
872+
// func TestWithOldConnCloseDelay(t *testing.T) {
873873
// type args struct {
874874
// dur string
875875
// }
@@ -946,7 +946,7 @@ package pool
946946
// checkFunc = defaultCheckFunc
947947
// }
948948
//
949-
// got := WithOldConnCloseDuration(test.args.dur)
949+
// got := WithOldConnCloseDelay(test.args.dur)
950950
// if err := checkFunc(test.want, got); err != nil {
951951
// tt.Errorf("error = %v", err)
952952
// }

0 commit comments

Comments
 (0)
Please sign in to comment.