Skip to content
This repository has been archived by the owner on Oct 29, 2021. It is now read-only.

Rough draft for changes for path+healing+forwarder changes. #2050

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

edwarnicke
Copy link
Member

Note: This whole PR is too large, and will need to be pulled
into smaller pieces.

The purpose of this PR is to get directional feedback.

Please also note this is not quite finished yet. The goal
is to get feedback, and start breaking pieces of this off into
things which can be merged in easy stages and other folks
can start working on.

Description

Motivation and Context

How Has This Been Tested?

  • Covered by existing integration testing
  • Added integration testing to cover
  • Tested locally
  • Have not tested

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Checklist:

  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.


func NewExecutor() Executor {
rv := &executor{execCh: make(chan func(), 100)}
rv.eventLoop()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean go rv.eventLoop()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixed :)

}
}
for id, request := range f.requestors {
if _, ok := f.reported[id]; !ok && f.reported[id] != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ok equals false then f.reported[id] equals nil. That whole line is not really clear to me, maybe there is some typo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also correct, fixed :)

Copy link
Member

@denis-tingaikin denis-tingaikin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few comments.
In general, I very like these simplifications.
I'll look more and I'll provide a more detailed review.

func NewExecutor() Executor {
rv := &executor{execCh: make(chan func(), 100)}
go rv.eventLoop()
runtime.SetFinalizer(rv, func(f *executor) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an interesting solution for closing worker goroutine but looks like finalizedCh channel is not initialized and it could panic close of nil channel

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed :)


func TestBasic(t *testing.T) {
nsmgr_forwarder := chain.NewNetworkServiceServer(
authorize.NewServer(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks nice

}

// Log - return FieldLogger from context
func Log(ctx context.Context) logrus.FieldLogger {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a similar function for forwarder chained request handlers and for nsm request chained handlers.
Should we create some base sdk/context helper and put there all popular context functions to exclude code duplication?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tentatively leaning towards a little bit of code repetition fo what seems to me to be a lot of increased comprehensibility.


type timeoutServer struct {
connections map[string]*time.Timer
executor serialize.Executor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting is this pattern better than using map with a locker or using sync.map

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for some uses. It preserves type safety (type checking with sync.Map is a PITA) and its guarantees are quite specific: order and one at a time. So if you have something that needs to manipulate various parts of the struct (maybe more than one) and you don't care about it happening before you go on (but it needs to happen, and in order), serializer.Executor is great.

Its not perfect for everything, but its good for a lot of things :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its also super simple to use :)

event = nil
}
f.updateExecutor.Exec(func() {
if event.GetType() == connection.ConnectionEventType_INITIAL_STATE_TRANSFER {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] probably switch event.GetType() will look better here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super good idea :)

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also switch-case variant can be more simple, for example:

Suggested change
if event.GetType() == connection.ConnectionEventType_INITIAL_STATE_TRANSFER {
switch event.GetType(){
case connection.ConnectionEventType_INITIAL_STATE_TRANSFER:
f.reported = event.GetConnections()
case connection.ConnectionEventType_UPDATE:
for _, conn := range event.GetConnections() {
f.reported[conn.GetId()] = conn
}
case connection.ConnectionEventType_DELETE:
for _, conn := range event.GetConnections() {
delete(f.reported, conn.GetId())
if f.requestors[conn.GetId()] != nil {
f.requestors[conn.GetId()]()
}
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! :)

@@ -137,6 +138,13 @@ func DialTCPInsecure(address string, opts ...grpc.DialOption) (*grpc.ClientConn,
return dialCtx(context.Background(), address, opts...)
}

func DialUrl(u *url.URL, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
if u.Scheme == "file" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use 'unix' here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes... fixed.

}
err = rv.IsValid()
if err != nil {
return nil, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Should we use errors.WithStack here?

}
})
select {
case <-s.Context().Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can get rid of using select here because we have only one channel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point! Fixed :)

See:
https://docs.google.com/presentation/d/1QU5FEq7QloLqEjJs-MMMWvcgPzkz6j-OYk-9k2gDTjc/edit#slide=id.g73e6edae28_0_0
https://drive.google.com/file/d/12AhnjrZnqwT5w8rP7Gv91xjKMgbQjPan/view?usp=sharing

This is the minimal patch that can introduce the new API.

As Path is replacing:

repeated string network_service_managers = 6;

in the Connection Message, this PR simply replaces how that was being used with Path.
It does not introduce the new style healing.

Signed-off-by: Ed Warnicke <[email protected]>
api has *really* clean go.mod deps.

Signed-off-by: Ed Warnicke <[email protected]>
Note: This whole PR is to large, and will need to be pulled
into smaller pieces.

The purpose of this PR is to get directional feedback.

Please also note this is not *quite* finished yet.  The goal
is to get feedback, and start breaking pieces of this off into
things which can be merged in easy stages and other folks
can start working on.

Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
Signed-off-by: Ed Warnicke <[email protected]>
cancelFunc context.CancelFunc
}

func NewClient(client connection.MonitorConnectionClient, onHeal ...networkservice.NetworkServiceClient) networkservice.NetworkServiceClient {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why onHeal is an array? Each item in that array is networkservice.NetworkServiceClient which is potentially implies a chain. So onHeal in other words is an array of chains. But seems like we need only one chain to rerun the request, don't we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I've fixed this to only take one onHeal.

eventReceiver: nil, // This is intentionally nil
recvEventExecutor: nil, // This is intentionally nil
}
rv.onHeal = append(rv.onHeal, rv)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it is not clear why here we add ourselves to onHeal array? I think healClient already a piece of passed chain?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct in the case where the client is part of a pass through Endpoint. To handle the case where the client is the origin of the chain, if a nil onHeal is passed in we add the Client itself as onHeal.

return nil, errors.Wrapf(err, "Error creating timer from Request.Connection.Path.PathSegment[%d].ExpireTime", request.GetConnection().GetPath().GetIndex())
}
t.executor.AsyncExec(func() {
if timer, ok := t.connectionTimers[request.GetConnection().GetId()]; !ok || timer.Stop() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you were about to use req here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
t.executor.AsyncExec(func() {
if timer, ok := t.connectionTimers[request.GetConnection().GetId()]; !ok || timer.Stop() {
t.connectionTimers[request.GetConnection().GetId()] = ct
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

_, _ = healer.Request(ctx, req, opts...)
}
}
f.closers[request.GetConnection().GetId()] = func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// TODO handle deadline err
deadline, _ := ctx.Deadline()
duration := deadline.Sub(time.Now())
f.requestors[request.GetConnection().GetId()] = func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

// TODO handle deadline err
deadline, _ := ctx.Deadline()
duration := deadline.Sub(time.Now())
f.requestors[request.GetConnection().GetId()] = func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should modification of f.requestors map happens inside AsyncExec?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

_, _ = healer.Request(ctx, req, opts...)
}
}
f.closers[request.GetConnection().GetId()] = func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the same for f.closers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@matrohon matrohon mentioned this pull request Jan 15, 2020
9 tasks
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants