@@ -53,18 +53,24 @@ func init() {
5353 balancer .Register (builder )
5454}
5555
56+ // Represents gRPC server internally
57+ type server struct {
58+ ip string
59+ nodeName string
60+ }
61+
5662// Represents the callback argument to the dispatch methods
5763type DispatchHandler func (ctx context.Context , conn * grpc.ClientConn )
5864
5965// Represents interest in pod ips that are part of a Kubernetes service
6066type Subscription struct {
61- ipCh chan string
62- cleanup func ()
67+ serverCh chan server
68+ cleanup func ()
6369}
6470
6571// Ends subscription
6672func (sub * Subscription ) Unsubscribe () {
67- close (sub .ipCh )
73+ close (sub .serverCh )
6874 sub .cleanup ()
6975}
7076
@@ -78,22 +84,108 @@ type Dispatcher struct {
7884 informerReg cache.ResourceEventHandlerRegistration
7985 resolver * manual.Resolver
8086 conn * grpc.ClientConn
81- ips mapset.Set [string ]
87+ servers mapset.Set [server ]
8288 mu sync.Mutex
8389 eventbus eventbus.Bus
8490 stopCh chan struct {}
8591}
8692
93+ // Sends query to matching server at query-time
94+ func (d * Dispatcher ) Unicast (ctx context.Context , nodeName string , fn DispatchHandler ) {
95+ d .mu .Lock ()
96+ currentServers := d .servers .ToSlice ()
97+ d .mu .Unlock ()
98+
99+ // Get ip for a server at `nodeName`
100+ var ip string
101+ for _ , server := range currentServers {
102+ if server .nodeName == nodeName {
103+ ip = server .ip
104+ }
105+ }
106+
107+ // Exit if server not found
108+ if ip == "" {
109+ return
110+ }
111+
112+ doneCh := make (chan struct {})
113+ go func () {
114+ defer close (doneCh )
115+ connCtx := context .WithValue (ctx , dispatcherAddrCtxKey , fmt .Sprintf ("%s:%s" , ip , d .connectArgs .Port ))
116+ fn (connCtx , d .conn )
117+ }()
118+
119+ // wait for func or ctx to finish whichever comes first
120+ select {
121+ case <- ctx .Done ():
122+ case <- doneCh :
123+ }
124+ }
125+
126+ // Sends query to matching server at query-time and all subsequent servers when
127+ // they become available until Unsubscribe() is called
128+ func (d * Dispatcher ) UnicastSubscribe (ctx context.Context , nodeName string , fn DispatchHandler ) (* Subscription , error ) {
129+ serverCh := make (chan server )
130+
131+ // server handler
132+ handleNewServers := func (newServers []server ) {
133+ for _ , server := range newServers {
134+ if server .nodeName == nodeName {
135+ serverCh <- server
136+ }
137+ }
138+ }
139+
140+ // worker
141+ go func () {
142+ for {
143+ select {
144+ case <- ctx .Done ():
145+ return
146+ case server , ok := <- serverCh :
147+ if ! ok {
148+ // unsubscribe was called
149+ return
150+ }
151+
152+ // execute dispatch handler in goroutine
153+ connCtx := context .WithValue (ctx , dispatcherAddrCtxKey , fmt .Sprintf ("%s:%s" , server .ip , d .connectArgs .Port ))
154+ go fn (connCtx , d .conn )
155+ }
156+ }
157+ }()
158+
159+ // get current ips and subscribe to new ones in a lock
160+ d .mu .Lock ()
161+ currentServers := d .servers .ToSlice ()
162+ err := d .eventbus .SubscribeAsync ("add:servers" , handleNewServers , false )
163+ if err != nil {
164+ d .mu .Unlock ()
165+ return nil , err
166+ }
167+ d .mu .Unlock ()
168+
169+ handleNewServers (currentServers )
170+
171+ return & Subscription {
172+ serverCh : serverCh ,
173+ cleanup : func () {
174+ d .eventbus .Unsubscribe ("add:servers" , handleNewServers )
175+ },
176+ }, nil
177+ }
178+
87179// Sends queries to all available ips at query-time
88180func (d * Dispatcher ) Fanout (ctx context.Context , fn DispatchHandler ) {
89181 var wg sync.WaitGroup
90182
91183 d .mu .Lock ()
92- ips := d .ips .ToSlice ()
184+ servers := d .servers .ToSlice ()
93185 d .mu .Unlock ()
94186
95- for _ , ip := range ips {
96- connCtx := context .WithValue (ctx , dispatcherAddrCtxKey , fmt .Sprintf ("%s:%s" , ip , d .connectArgs .Port ))
187+ for _ , server := range servers {
188+ connCtx := context .WithValue (ctx , dispatcherAddrCtxKey , fmt .Sprintf ("%s:%s" , server . ip , d .connectArgs .Port ))
97189 wg .Add (1 )
98190 go func (lclCtx context.Context ) {
99191 defer wg .Done ()
@@ -117,12 +209,12 @@ func (d *Dispatcher) Fanout(ctx context.Context, fn DispatchHandler) {
117209// Sends queries to all available ips at query-time and all subsequent ips when
118210// they become available until Unsubscribe() is called
119211func (d * Dispatcher ) FanoutSubscribe (ctx context.Context , fn DispatchHandler ) (* Subscription , error ) {
120- ipCh := make (chan string )
212+ serverCh := make (chan server )
121213
122- // ip handler
123- handleNewIps := func (newIps []string ) {
124- for _ , ip := range newIps {
125- ipCh <- ip
214+ // server handler
215+ handleNewServers := func (newServers []server ) {
216+ for _ , server := range newServers {
217+ serverCh <- server
126218 }
127219 }
128220
@@ -132,35 +224,35 @@ func (d *Dispatcher) FanoutSubscribe(ctx context.Context, fn DispatchHandler) (*
132224 select {
133225 case <- ctx .Done ():
134226 return
135- case ip , ok := <- ipCh :
227+ case server , ok := <- serverCh :
136228 if ! ok {
137229 // unsubscribe was called
138230 return
139231 }
140232
141233 // execute dispatch handler in goroutine
142- connCtx := context .WithValue (ctx , dispatcherAddrCtxKey , fmt .Sprintf ("%s:%s" , ip , d .connectArgs .Port ))
234+ connCtx := context .WithValue (ctx , dispatcherAddrCtxKey , fmt .Sprintf ("%s:%s" , server . ip , d .connectArgs .Port ))
143235 go fn (connCtx , d .conn )
144236 }
145237 }
146238 }()
147239
148240 // get current ips and subscribe to new ones in a lock
149241 d .mu .Lock ()
150- currentIps := d .ips .ToSlice ()
151- err := d .eventbus .SubscribeAsync ("add:addrs " , handleNewIps , false )
242+ currentServers := d .servers .ToSlice ()
243+ err := d .eventbus .SubscribeAsync ("add:servers " , handleNewServers , false )
152244 if err != nil {
153245 d .mu .Unlock ()
154246 return nil , err
155247 }
156248 d .mu .Unlock ()
157249
158- handleNewIps ( currentIps )
250+ handleNewServers ( currentServers )
159251
160252 return & Subscription {
161- ipCh : ipCh ,
253+ serverCh : serverCh ,
162254 cleanup : func () {
163- d .eventbus .Unsubscribe ("add:addrs " , handleNewIps )
255+ d .eventbus .Unsubscribe ("add:servers " , handleNewServers )
164256 },
165257 }, nil
166258}
@@ -210,14 +302,14 @@ func (d *Dispatcher) Shutdown() error {
210302
211303// Handle add
212304func (d * Dispatcher ) handleAddEndpointSlice (es * discoveryv1.EndpointSlice ) {
213- newIps := getIpsFromEndpointSlice (es )
214- d .updateState (newIps , nil )
305+ newServers := getServersFromEndpointSlice (es )
306+ d .updateState (newServers , nil )
215307}
216308
217309// Handle updates
218310func (d * Dispatcher ) handleUpdateEndpointSlice (esOld * discoveryv1.EndpointSlice , esNew * discoveryv1.EndpointSlice ) {
219- oldIps := mapset .NewSet (getIpsFromEndpointSlice (esOld )... )
220- newIps := mapset .NewSet (getIpsFromEndpointSlice (esNew )... )
311+ oldIps := mapset .NewSet (getServersFromEndpointSlice (esOld )... )
312+ newIps := mapset .NewSet (getServersFromEndpointSlice (esNew )... )
221313
222314 toDelete := oldIps .Difference (newIps )
223315 toAdd := newIps .Difference (oldIps )
@@ -226,16 +318,16 @@ func (d *Dispatcher) handleUpdateEndpointSlice(esOld *discoveryv1.EndpointSlice,
226318}
227319
228320// Adds and deletes ips, updates clientconn state, publishes change to eventbus
229- func (d * Dispatcher ) updateState (toAdd []string , toDelete []string ) {
321+ func (d * Dispatcher ) updateState (toAdd []server , toDelete []server ) {
230322 d .mu .Lock ()
231323
232324 // update local state
233325 if len (toDelete ) > 0 {
234- d .ips .RemoveAll (toDelete ... )
326+ d .servers .RemoveAll (toDelete ... )
235327 }
236328
237329 if len (toAdd ) > 0 {
238- d .ips .Append (toAdd ... )
330+ d .servers .Append (toAdd ... )
239331 }
240332
241333 // exit if no changes
@@ -245,11 +337,11 @@ func (d *Dispatcher) updateState(toAdd []string, toDelete []string) {
245337 }
246338
247339 // update clientconn state
248- ips := d .ips .ToSlice ()
340+ servers := d .servers .ToSlice ()
249341
250- addrs := make ([]resolver.Address , len (ips ))
251- for i , ip := range ips {
252- addrs [i ] = resolver.Address {Addr : fmt .Sprintf ("%s:%s" , ip , d .connectArgs .Port )}
342+ addrs := make ([]resolver.Address , len (servers ))
343+ for i , server := range servers {
344+ addrs [i ] = resolver.Address {Addr : fmt .Sprintf ("%s:%s" , server . ip , d .connectArgs .Port )}
253345 }
254346
255347 d .resolver .UpdateState (resolver.State {Addresses : addrs })
@@ -258,7 +350,7 @@ func (d *Dispatcher) updateState(toAdd []string, toDelete []string) {
258350
259351 // publish change
260352 if len (toAdd ) > 0 {
261- d .eventbus .Publish ("add:addrs " , toAdd )
353+ d .eventbus .Publish ("add:servers " , toAdd )
262354 }
263355}
264356
@@ -331,7 +423,7 @@ func NewDispatcher(connectUrl string, options ...DispatcherOption) (*Dispatcher,
331423 informer : informer ,
332424 resolver : resolver ,
333425 conn : conn ,
334- ips : mapset .NewSet [string ](),
426+ servers : mapset .NewSet [server ](),
335427 eventbus : eventbus .New (),
336428 }, nil
337429}
@@ -379,12 +471,18 @@ func parseConnectUrl(connectUrl string) (*connectArgs, error) {
379471 }, nil
380472}
381473
382- func getIpsFromEndpointSlice (es * discoveryv1.EndpointSlice ) []string {
383- var ips []string
474+ func getServersFromEndpointSlice (es * discoveryv1.EndpointSlice ) []server {
475+ var servers []server
384476 for _ , endpoint := range es .Endpoints {
385477 if * endpoint .Conditions .Serving {
386- ips = append (ips , endpoint .Addresses ... )
478+ for _ , addr := range endpoint .Addresses {
479+ s := server {
480+ nodeName : * endpoint .NodeName ,
481+ ip : addr ,
482+ }
483+ servers = append (servers , s )
484+ }
387485 }
388486 }
389- return ips
487+ return servers
390488}
0 commit comments