Skip to content

Commit

Permalink
Fixed notification DB read after DB was closed (#519)
Browse files Browse the repository at this point in the history
Fixes #517 #518

Ensure the notification stream go-routine is completely done, before
closing the DB
  • Loading branch information
merlimat authored Sep 20, 2024
1 parent 15f9379 commit b6f044f
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 110 deletions.
124 changes: 14 additions & 110 deletions server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,20 @@ type leaderController struct {
headOffsetGauge metrics.Gauge
commitOffsetGauge metrics.Gauge
followerAckOffsetGauges map[string]metrics.Gauge

notificationDispatchers map[int64]*notificationDispatcher
}

func NewLeaderController(config Config, namespace string, shardId int64, rpcClient ReplicationRpcProvider, walFactory wal.Factory, kvFactory kv.Factory) (LeaderController, error) {
labels := metrics.LabelsForShard(namespace, shardId)
lc := &leaderController{
status: proto.ServingStatus_NOT_MEMBER,
namespace: namespace,
shardId: shardId,
quorumAckTracker: nil,
rpcClient: rpcClient,
followers: make(map[string]FollowerCursor),
status: proto.ServingStatus_NOT_MEMBER,
namespace: namespace,
shardId: shardId,
quorumAckTracker: nil,
rpcClient: rpcClient,
followers: make(map[string]FollowerCursor),
notificationDispatchers: make(map[int64]*notificationDispatcher),

writeLatencyHisto: metrics.NewLatencyHistogram("oxia_server_leader_write_latency",
"Latency for write operations in the leader", labels),
Expand Down Expand Up @@ -963,110 +966,7 @@ func (lc *leaderController) appendToWalStreamRequest(request *proto.WriteRequest
// ////

func (lc *leaderController) GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error {
// Create a context for handling this stream
ctx, cancel := context.WithCancel(stream.Context())

go common.DoWithLabels(
ctx,
map[string]string{
"oxia": "dispatch-notifications",
"shard": fmt.Sprintf("%d", lc.shardId),
"peer": common.GetPeer(stream.Context()),
},
func() {
if err := lc.dispatchNotifications(ctx, req, stream); err != nil && !errors.Is(err, context.Canceled) {
lc.log.Warn(
"Failed to dispatch notifications",
slog.Any("error", err),
slog.String("peer", common.GetPeer(stream.Context())),
)
cancel()
}
},
)

select {
case <-lc.ctx.Done():
// Leader is getting closed
cancel()
return lc.ctx.Err()

case <-ctx.Done():
return ctx.Err()

case <-stream.Context().Done():
// The stream is getting closed
cancel()
return stream.Context().Err()
}
}

func (lc *leaderController) dispatchNotifications(ctx context.Context, req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error {
lc.log.Debug(
"Dispatch notifications",
slog.Any("start-offset-exclusive", req.StartOffsetExclusive),
)

var offsetInclusive int64
if req.StartOffsetExclusive != nil {
offsetInclusive = *req.StartOffsetExclusive + 1
} else {
lc.Lock()
qat := lc.quorumAckTracker
lc.Unlock()

if qat == nil {
return errors.New("leader is not yet ready")
}
commitOffset := qat.CommitOffset()

// The client is creating a new notification stream and wants to receive the notification from the next
// entry that will be written.
// In order to ensure the client will positioned on a given offset, we need to send a first "dummy"
// notification. The client will wait for this first notification before making the notification
// channel available to the application
lc.log.Debug(
"Sending first dummy notification",
slog.Int64("commit-offset", commitOffset),
)
if err := stream.Send(&proto.NotificationBatch{
Shard: lc.shardId,
Offset: commitOffset,
Timestamp: 0,
Notifications: nil,
}); err != nil {
return err
}

offsetInclusive = commitOffset + 1
}

return lc.iterateOverNotifications(ctx, stream, offsetInclusive)
}

func (lc *leaderController) iterateOverNotifications(ctx context.Context, stream proto.OxiaClient_GetNotificationsServer, startOffsetInclusive int64) error {
offsetInclusive := startOffsetInclusive
for ctx.Err() == nil {
notifications, err := lc.db.ReadNextNotifications(ctx, offsetInclusive)
if err != nil {
return err
}

lc.log.Debug(
"Got a new list of notification batches",
slog.Int("list-size", len(notifications)),
)

for _, n := range notifications {
if err := stream.Send(n); err != nil {
return err
}
}

offsetInclusive += int64(len(notifications))
}

return ctx.Err()
return startNotificationDispatcher(lc, req, stream)
}

func (lc *leaderController) isClosed() bool {
Expand Down Expand Up @@ -1098,6 +998,10 @@ func (lc *leaderController) close() error {

err = lc.sessionManager.Close()

for _, nd := range lc.notificationDispatchers {
nd.close()
}

if lc.wal != nil {
err = multierr.Append(err, lc.wal.Close())
lc.wal = nil
Expand Down
175 changes: 175 additions & 0 deletions server/notifications_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"fmt"
"log/slog"
"sync/atomic"

"github.com/pkg/errors"

"github.com/streamnative/oxia/common"
"github.com/streamnative/oxia/proto"
)

type notificationDispatcher struct {
lc *leaderController
id int64
req *proto.NotificationsRequest
stream proto.OxiaClient_GetNotificationsServer

ctx context.Context
cancel context.CancelFunc

closeCh chan any
log *slog.Logger
}

var notificationDispatcherIdGen atomic.Int64

func startNotificationDispatcher(lc *leaderController, req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error {
nd := &notificationDispatcher{
lc: lc,
id: notificationDispatcherIdGen.Add(1),
req: req,
stream: stream,
log: lc.log.With(slog.String("component", "notification-dispatcher")),
closeCh: make(chan any),
}

lc.Lock()
lc.notificationDispatchers[nd.id] = nd
lc.Unlock()

// Create a context for handling this stream
nd.ctx, nd.cancel = context.WithCancel(stream.Context())

go common.DoWithLabels(
nd.ctx,
map[string]string{
"oxia": "dispatch-notifications",
"shard": fmt.Sprintf("%d", lc.shardId),
"peer": common.GetPeer(stream.Context()),
},
func() {
if err := nd.dispatchNotifications(); err != nil && !errors.Is(err, context.Canceled) {
nd.log.Warn(
"Failed to dispatch notifications",
slog.Any("error", err),
slog.String("peer", common.GetPeer(stream.Context())),
)
nd.cancel()
}

close(nd.closeCh)

// Clean up dispatcher for leader controller map
nd.lc.Lock()
delete(nd.lc.notificationDispatchers, nd.id)
nd.lc.Unlock()
},
)

select {
case <-lc.ctx.Done():
// Leader is getting closed
nd.cancel()
return lc.ctx.Err()

case <-nd.ctx.Done():
return nd.ctx.Err()

case <-stream.Context().Done():
// The stream is getting closed
nd.cancel()
return stream.Context().Err()
}
}

func (nd *notificationDispatcher) dispatchNotifications() error {
nd.log.Debug(
"Dispatch notifications",
slog.Any("start-offset-exclusive", nd.req.StartOffsetExclusive),
)

var offsetInclusive int64
if nd.req.StartOffsetExclusive != nil {
offsetInclusive = *nd.req.StartOffsetExclusive + 1
} else {
nd.lc.Lock()
qat := nd.lc.quorumAckTracker
nd.lc.Unlock()

if qat == nil {
return errors.New("leader is not yet ready")
}
commitOffset := qat.CommitOffset()

// The client is creating a new notification stream and wants to receive the notification from the next
// entry that will be written.
// In order to ensure the client will positioned on a given offset, we need to send a first "dummy"
// notification. The client will wait for this first notification before making the notification
// channel available to the application
nd.log.Debug(
"Sending first dummy notification",
slog.Int64("commit-offset", commitOffset),
)
if err := nd.stream.Send(&proto.NotificationBatch{
Shard: nd.lc.shardId,
Offset: commitOffset,
Timestamp: 0,
Notifications: nil,
}); err != nil {
return err
}

offsetInclusive = commitOffset + 1
}

return nd.iterateOverNotifications(offsetInclusive)
}

func (nd *notificationDispatcher) iterateOverNotifications(startOffsetInclusive int64) error {
lc := nd.lc
offsetInclusive := startOffsetInclusive
for nd.ctx.Err() == nil {
notifications, err := lc.db.ReadNextNotifications(nd.ctx, offsetInclusive)
if err != nil {
return err
}

nd.log.Debug(
"Got a new list of notification batches",
slog.Int("list-size", len(notifications)),
)

for _, n := range notifications {
if err := nd.stream.Send(n); err != nil {
return err
}
}

offsetInclusive += int64(len(notifications))
}

return nd.ctx.Err()
}

func (nd *notificationDispatcher) close() {
// Wait for dispatcher stream to be fully closed
<-nd.closeCh
}

0 comments on commit b6f044f

Please sign in to comment.