-
Notifications
You must be signed in to change notification settings - Fork 147
Rough draft for changes for path+healing+forwarder changes. #2050
base: master
Are you sure you want to change the base?
Conversation
If you are going to check this out and walk through it, I recommend starting with: and |
new/sdk/tools/serialize/serialize.go
Outdated
|
||
func NewExecutor() Executor { | ||
rv := &executor{execCh: make(chan func(), 100)} | ||
rv.eventLoop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean go rv.eventLoop()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, fixed :)
} | ||
} | ||
for id, request := range f.requestors { | ||
if _, ok := f.reported[id]; !ok && f.reported[id] != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If ok
equals false
then f.reported[id]
equals nil
. That whole line is not really clear to me, maybe there is some typo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also correct, fixed :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few comments.
In general, I very like these simplifications.
I'll look more and I'll provide a more detailed review.
func NewExecutor() Executor { | ||
rv := &executor{execCh: make(chan func(), 100)} | ||
go rv.eventLoop() | ||
runtime.SetFinalizer(rv, func(f *executor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is an interesting solution for closing worker goroutine but looks like finalizedCh
channel is not initialized and it could panic close of nil channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed :)
|
||
func TestBasic(t *testing.T) { | ||
nsmgr_forwarder := chain.NewNetworkServiceServer( | ||
authorize.NewServer(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks nice
} | ||
|
||
// Log - return FieldLogger from context | ||
func Log(ctx context.Context) logrus.FieldLogger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw a similar function for forwarder chained request handlers and for nsm request chained handlers.
Should we create some base sdk/context helper and put there all popular context functions to exclude code duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm tentatively leaning towards a little bit of code repetition fo what seems to me to be a lot of increased comprehensibility.
|
||
type timeoutServer struct { | ||
connections map[string]*time.Timer | ||
executor serialize.Executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting is this pattern better than using map with a locker or using sync.map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is for some uses. It preserves type safety (type checking with sync.Map is a PITA) and its guarantees are quite specific: order and one at a time. So if you have something that needs to manipulate various parts of the struct (maybe more than one) and you don't care about it happening before you go on (but it needs to happen, and in order), serializer.Executor is great.
Its not perfect for everything, but its good for a lot of things :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its also super simple to use :)
event = nil | ||
} | ||
f.updateExecutor.Exec(func() { | ||
if event.GetType() == connection.ConnectionEventType_INITIAL_STATE_TRANSFER { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] probably switch event.GetType()
will look better here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super good idea :)
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also switch-case variant can be more simple, for example:
if event.GetType() == connection.ConnectionEventType_INITIAL_STATE_TRANSFER { | |
switch event.GetType(){ | |
case connection.ConnectionEventType_INITIAL_STATE_TRANSFER: | |
f.reported = event.GetConnections() | |
case connection.ConnectionEventType_UPDATE: | |
for _, conn := range event.GetConnections() { | |
f.reported[conn.GetId()] = conn | |
} | |
case connection.ConnectionEventType_DELETE: | |
for _, conn := range event.GetConnections() { | |
delete(f.reported, conn.GetId()) | |
if f.requestors[conn.GetId()] != nil { | |
f.requestors[conn.GetId()]() | |
} | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed! :)
pkg/tools/socket.go
Outdated
@@ -137,6 +138,13 @@ func DialTCPInsecure(address string, opts ...grpc.DialOption) (*grpc.ClientConn, | |||
return dialCtx(context.Background(), address, opts...) | |||
} | |||
|
|||
func DialUrl(u *url.URL, opts ...grpc.DialOption) (*grpc.ClientConn, error) { | |||
if u.Scheme == "file" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use 'unix' here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes... fixed.
} | ||
err = rv.IsValid() | ||
if err != nil { | ||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] Should we use errors.WithStack
here?
} | ||
}) | ||
select { | ||
case <-s.Context().Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can get rid of using select here because we have only one channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent point! Fixed :)
See: https://docs.google.com/presentation/d/1QU5FEq7QloLqEjJs-MMMWvcgPzkz6j-OYk-9k2gDTjc/edit#slide=id.g73e6edae28_0_0 https://drive.google.com/file/d/12AhnjrZnqwT5w8rP7Gv91xjKMgbQjPan/view?usp=sharing This is the minimal patch that can introduce the new API. As Path is replacing: repeated string network_service_managers = 6; in the Connection Message, this PR simply replaces how that was being used with Path. It does not introduce the new style healing. Signed-off-by: Ed Warnicke <[email protected]>
api has *really* clean go.mod deps. Signed-off-by: Ed Warnicke <[email protected]>
Note: This whole PR is to large, and will need to be pulled into smaller pieces. The purpose of this PR is to get directional feedback. Please also note this is not *quite* finished yet. The goal is to get feedback, and start breaking pieces of this off into things which can be merged in easy stages and other folks can start working on. Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
cancelFunc context.CancelFunc | ||
} | ||
|
||
func NewClient(client connection.MonitorConnectionClient, onHeal ...networkservice.NetworkServiceClient) networkservice.NetworkServiceClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why onHeal
is an array? Each item in that array is networkservice.NetworkServiceClient
which is potentially implies a chain. So onHeal
in other words is an array of chains. But seems like we need only one chain to rerun the request, don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I've fixed this to only take one onHeal.
eventReceiver: nil, // This is intentionally nil | ||
recvEventExecutor: nil, // This is intentionally nil | ||
} | ||
rv.onHeal = append(rv.onHeal, rv) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it is not clear why here we add ourselves to onHeal
array? I think healClient
already a piece of passed chain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct in the case where the client is part of a pass through Endpoint. To handle the case where the client is the origin of the chain, if a nil onHeal is passed in we add the Client itself as onHeal.
return nil, errors.Wrapf(err, "Error creating timer from Request.Connection.Path.PathSegment[%d].ExpireTime", request.GetConnection().GetPath().GetIndex()) | ||
} | ||
t.executor.AsyncExec(func() { | ||
if timer, ok := t.connectionTimers[request.GetConnection().GetId()]; !ok || timer.Stop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you were about to use req
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
t.executor.AsyncExec(func() { | ||
if timer, ok := t.connectionTimers[request.GetConnection().GetId()]; !ok || timer.Stop() { | ||
t.connectionTimers[request.GetConnection().GetId()] = ct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
_, _ = healer.Request(ctx, req, opts...) | ||
} | ||
} | ||
f.closers[request.GetConnection().GetId()] = func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
// TODO handle deadline err | ||
deadline, _ := ctx.Deadline() | ||
duration := deadline.Sub(time.Now()) | ||
f.requestors[request.GetConnection().GetId()] = func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// TODO handle deadline err | ||
deadline, _ := ctx.Deadline() | ||
duration := deadline.Sub(time.Now()) | ||
f.requestors[request.GetConnection().GetId()] = func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should modification of f.requestors
map happens inside AsyncExec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
_, _ = healer.Request(ctx, req, opts...) | ||
} | ||
} | ||
f.closers[request.GetConnection().GetId()] = func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the same for f.closers
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Note: This whole PR is too large, and will need to be pulled
into smaller pieces.
The purpose of this PR is to get directional feedback.
Please also note this is not quite finished yet. The goal
is to get feedback, and start breaking pieces of this off into
things which can be merged in easy stages and other folks
can start working on.
Description
Motivation and Context
How Has This Been Tested?
Types of changes
Checklist: