-
Notifications
You must be signed in to change notification settings - Fork 662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MG-2142 - Consume Things connect/disconnect event in bootstrap #2192
base: main
Are you sure you want to change the base?
Conversation
90b9f69
to
3198bd8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have question here, The event published by connectThingEvent
and disconnectThingEvent
are same event published by thing service on disconnect
and connect
?
They are not the same. The event published by things service is a notification that a thing has been connected or disconnected from a channel. The event published by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JeffMboya I have suggestion for handling error rollbacks
bootstrap/events/consumer/streams.go
Outdated
thingConnect = "policy.create" | ||
thingDisconnect = "policy.delete" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be group.assign
and group.unassign
But I guess we could not use this operation, because adding channel to group also use the same operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While trying to assign parent group for channel , like below request
curl --location 'http://localhost/channels/795b0be8-f8d9-4101-a40f-859ed573c71c/groups/assign' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer <Token>' \
--data '{
"group_ids" : [ "5c6de392-6dc4-46fc-8e2c-e6f4ce4fffab" ]
}'
In events received below logs, which use group.assign
operation , the same operation is used for thing connect, so could not different between thing connection event and group assign channel events
[#8] Received on "events.magistrala.things" with reply "_INBOX.7g7jymtBXH0wOSok79kUGo.0LlN1Z1P"
*�{"group_id":"795b0be8-f8d9-4101-a40f-859ed573c71c","member_ids":["5c6de392-6dc4-46fc-8e2c-e6f4ce4fffab"],"occurred_at":1714131210669902557,"operation":"group.assign"}
Additionally,
In the event message group_id filed contains channel id
actually here we should have group id
In member_ids field , it contains
group_id , actually here we should have channel id
This issue might be a blocker for this PR
0b87686
to
9380eef
Compare
15ddd1f
to
068d129
Compare
b78c9aa
to
f6aa2fe
Compare
repo := postgres.NewConfigRepository(db, testLog) | ||
err := deleteChannels(context.Background(), repo) | ||
require.Nil(t, err, "Channels cleanup expected to succeed.") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test this occurrences using table testing:
- Connect for an already connected thing
- Connect for a disconnected thing
- Connect for an invalid thing
- Connect for a random thing
- Connect for an empty thing
386c14c
to
73f7017
Compare
8369752
to
2e5c476
Compare
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]> Add ConnectThing method to bootstrap Signed-off-by: JeffMboya <[email protected]> Consume ThingConnect in bootstrap Signed-off-by: JeffMboya <[email protected]> Consume ThingConnect in bootstrap Signed-off-by: JeffMboya <[email protected]> Consume ThingConnect in bootstrap Signed-off-by: JeffMboya <[email protected]> Consume ThingConnect event Signed-off-by: JeffMboya <[email protected]> Consume ThingsConnect event Signed-off-by: JeffMboya <[email protected]> Consume ThingsConnect event Signed-off-by: JeffMboya <[email protected]> Consume ThingsConnect event Signed-off-by: JeffMboya <[email protected]> Implement ReadStringSlice Signed-off-by: JeffMboya <[email protected]> Add memberKind and relation checks Signed-off-by: JeffMboya <[email protected]> Add memberKind and relation checks Signed-off-by: JeffMboya <[email protected]> Add memberKind and relation checks Signed-off-by: JeffMboya <[email protected]> Add memberKind and relation checks Signed-off-by: JeffMboya <[email protected]> Add memberKind and relation checks Signed-off-by: JeffMboya <[email protected]> Add TestDisconnectThing to configs_test Signed-off-by: JeffMboya <[email protected]> Update mocks Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]> Refactor: revert to thingID and channelID Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]> Refactor: add memberKind ans relation constants Signed-off-by: JeffMboya <[email protected]> Refactor: add memberKind ans relation constants Signed-off-by: JeffMboya <[email protected]> Refactor: add memberKind ans relation constants Signed-off-by: JeffMboya <[email protected]> Refactor: add memberKind ans relation constants Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
Signed-off-by: JeffMboya <[email protected]>
cte := decodeConnectThing(msg) | ||
if cte.channelID == "" { | ||
return svcerr.ErrMalformedEntity | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, check length of cte.thingID
is not zero
|
||
return connectionEvent{ | ||
channelID: read(event, "group_id", ""), | ||
thingID: ReadStringSlice(event, "member_ids"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to thingIDs
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation { | ||
e = repoerr.ErrConflict | ||
return "", repoerr.ErrConflict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use switch statement
func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error { | ||
q := `UPDATE configs SET state = $1 WHERE EXISTS ( | ||
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3) | ||
RETURNING *` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need of returning anything since we don't use it
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` | ||
if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil { | ||
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3) | ||
RETURNING *` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
@@ -451,12 +454,33 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { | |||
return nil | |||
} | |||
|
|||
func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error { | |||
q := `UPDATE configs SET state = $1 WHERE EXISTS ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for subquery have a look at https://github.com/absmach/magistrala/blob/31d0e8c4529339a6626c9ff03a65972bf407dd89/pkg/clients/postgres/clients.go#L27C1-L54C2
What type of PR is this?
This is a feature because it adds the connectEvent struct to the consumer package.
What does this do?
This PR adds a new struct, connectEvent, to the consumer package. This struct contains the thingID and channelID, and is triggered whenever a Thing connects to the system.
Which issue(s) does this PR fix/relate to?
Have you included tests for your changes?
Yes, I have included tests for my changes.
Did you document any new/modified feature?
No