Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: cleanup some parts in the ControllerServer #5101

Open
wants to merge 7 commits into
base: devel
Choose a base branch
from
155 changes: 55 additions & 100 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"

csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -208,7 +209,7 @@ func (cs *ControllerServer) parseVolCreateRequest(
return nil, status.Error(codes.InvalidArgument, err.Error())
}

rbdVol.RequestName = req.GetName()
rbdVol.requestName = req.GetName()

// Volume Size - Default is 1 GiB
volSizeBytes := int64(oneGB)
Expand Down Expand Up @@ -276,7 +277,7 @@ func (rbdVol *rbdVolume) ToCSI(ctx context.Context) (*csi.Volume, error) {
func buildCreateVolumeResponse(
ctx context.Context,
req *csi.CreateVolumeRequest,
rbdVol *rbdVolume,
rbdVol types.Volume,
) (*csi.CreateVolumeResponse, error) {
volume, err := rbdVol.ToCSI(ctx)
if err != nil {
Expand Down Expand Up @@ -772,7 +773,11 @@ func (cs *ControllerServer) createBackingImage(
}
}

log.DebugLog(ctx, "created image %s backed for request name %s", rbdVol, rbdVol.RequestName)
requestName, err := rbdVol.GetRequestName(ctx)
if err != nil {
requestName = "<unset request name>"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nixpanic why is this special case not an error case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only used for logging, and was not an error before this change.

}
log.DebugLog(ctx, "created image %s backed for request name %s", rbdVol, requestName)

defer func() {
if err != nil {
Expand Down Expand Up @@ -884,12 +889,12 @@ func (cs *ControllerServer) checkErrAndUndoReserve(
// If error is ErrImageNotFound then we failed to find the image, but found the imageOMap
// to lead us to the image, hence the imageOMap needs to be garbage collected, by calling
// unreserve for the same
if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
if acquired := cs.VolumeLocks.TryAcquire(rbdVol.requestName); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, rbdVol.requestName)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.requestName)
}
defer cs.VolumeLocks.Release(rbdVol.RequestName)
defer cs.VolumeLocks.Release(rbdVol.requestName)

if err = undoVolReservation(ctx, rbdVol, cr); err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -953,31 +958,40 @@ func (cs *ControllerServer) DeleteVolume(
return &csi.DeleteVolumeResponse{}, nil
}

rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer func() {
if rbdVol != nil {
rbdVol.Destroy(ctx)
}
}()
mgr := NewManager(cs.Driver.GetInstanceID(), nil, req.GetSecrets())
defer mgr.Destroy(ctx)

var rbdVol *rbdVolume
vol, err := mgr.GetVolumeByID(ctx, volumeID)
if vol != nil {
defer vol.Destroy(ctx)

rbdVol = vol.(*rbdVolume) // FIXME: temporary cast until rbdVolume is cleaned up
}
if err != nil {
return cs.checkErrAndUndoReserve(ctx, err, volumeID, rbdVol, cr)
}

// lock out parallel create requests against the same volume name as we
// clean up the image and associated omaps for the same
if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
requestName, err := rbdVol.GetRequestName(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if acquired := cs.VolumeLocks.TryAcquire(requestName); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, requestName)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, requestName)
}
defer cs.VolumeLocks.Release(rbdVol.RequestName)
defer cs.VolumeLocks.Release(requestName)

return cleanupRBDImage(ctx, rbdVol, cr)
return rbdVol.cleanupRBDImage(ctx, cr)
}

// cleanupRBDImage removes the rbd image and OMAP metadata associated with it.
func cleanupRBDImage(ctx context.Context,
rbdVol *rbdVolume, cr *util.Credentials,
func (rbdVol *rbdVolume) cleanupRBDImage(ctx context.Context,
cr *util.Credentials,
) (*csi.DeleteVolumeResponse, error) {
info, err := rbdVol.GetMirroringInfo(ctx)
if err != nil {
Expand Down Expand Up @@ -1012,7 +1026,7 @@ func cleanupRBDImage(ctx context.Context,
if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() {
if err = undoVolReservation(ctx, rbdVol, cr); err != nil {
log.ErrorLog(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)",
rbdVol.RequestName, rbdVol.RbdImageName, err)
rbdVol.requestName, rbdVol.RbdImageName, err)

return nil, status.Error(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -1057,7 +1071,7 @@ func cleanupRBDImage(ctx context.Context,

if err = undoVolReservation(ctx, rbdVol, cr); err != nil {
log.ErrorLog(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)",
rbdVol.RequestName, rbdVol.RbdImageName, err)
rbdVol.requestName, rbdVol.RbdImageName, err)

return nil, status.Error(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -1146,7 +1160,7 @@ func (cs *ControllerServer) CreateSnapshot(
rbdSnap.RbdImageName = rbdVol.RbdImageName
rbdSnap.VolSize = rbdVol.VolSize
rbdSnap.SourceVolumeID = req.GetSourceVolumeId()
rbdSnap.RequestName = req.GetName()
rbdSnap.requestName = req.GetName()

if acquired := cs.SnapshotLocks.TryAcquire(req.GetName()); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, req.GetName())
Expand Down Expand Up @@ -1250,7 +1264,7 @@ func cloneFromSnapshot(
if err != nil {
uErr := undoSnapshotCloning(ctx, rbdVol, rbdSnap, vol, cr)
if uErr != nil {
log.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", rbdSnap.RequestName, uErr)
log.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", rbdSnap.requestName, uErr)
}

return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -1269,7 +1283,7 @@ func cloneFromSnapshot(
} else if err != nil {
uErr := undoSnapshotCloning(ctx, rbdVol, rbdSnap, vol, cr)
if uErr != nil {
log.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", rbdSnap.RequestName, uErr)
log.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", rbdSnap.requestName, uErr)
}

return nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -1412,12 +1426,6 @@ func (cs *ControllerServer) DeleteSnapshot(
return nil, err
}

cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()

snapshotID := req.GetSnapshotId()
if snapshotID == "" {
return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty")
Expand All @@ -1431,14 +1439,17 @@ func (cs *ControllerServer) DeleteSnapshot(
defer cs.SnapshotLocks.Release(snapshotID)

// lock out snapshotID for restore operation
if err = cs.OperationLocks.GetDeleteLock(snapshotID); err != nil {
if err := cs.OperationLocks.GetDeleteLock(snapshotID); err != nil {
log.ErrorLog(ctx, err.Error())

return nil, status.Error(codes.Aborted, err.Error())
}
defer cs.OperationLocks.ReleaseDeleteLock(snapshotID)

rbdSnap, err := genSnapFromSnapID(ctx, snapshotID, cr, req.GetSecrets())
mgr := NewManager(cs.Driver.GetInstanceID(), nil, req.GetSecrets())
defer mgr.Destroy(ctx)

rbdSnap, err := mgr.GetSnapshotByID(ctx, snapshotID)
if err != nil {
// if error is ErrPoolNotFound, the pool is already deleted we don't
// need to worry about deleting snapshot or omap data, return success
Expand All @@ -1462,7 +1473,7 @@ func (cs *ControllerServer) DeleteSnapshot(
if errors.Is(err, ErrImageNotFound) {
log.UsefulLog(ctx, "cleaning up leftovers of snapshot %s: %v", snapshotID, err)

err = cleanUpImageAndSnapReservation(ctx, rbdSnap, cr)
err = rbdSnap.Delete(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand All @@ -1476,15 +1487,20 @@ func (cs *ControllerServer) DeleteSnapshot(

// safeguard against parallel create or delete requests against the same
// name
if acquired := cs.SnapshotLocks.TryAcquire(rbdSnap.RequestName); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, rbdSnap.RequestName)
requestName, err := rbdSnap.GetRequestName(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdSnap.RequestName)
if acquired := cs.SnapshotLocks.TryAcquire(requestName); !acquired {
log.ErrorLog(ctx, util.SnapshotOperationAlreadyExistsFmt, requestName)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, requestName)
}
defer cs.SnapshotLocks.Release(rbdSnap.RequestName)
defer cs.SnapshotLocks.Release(requestName)

// Deleting snapshot and cloned volume
log.DebugLog(ctx, "deleting cloned rbd volume %s", rbdSnap.RbdSnapName)
log.DebugLog(ctx, "deleting cloned rbd volume %s", rbdSnap)

err = rbdSnap.Delete(ctx)
if err != nil {
Expand All @@ -1496,34 +1512,6 @@ func (cs *ControllerServer) DeleteSnapshot(
return &csi.DeleteSnapshotResponse{}, nil
}

// cleanUpImageAndSnapReservation cleans up the image from the trash and
// snapshot reservation in rados OMAP.
func cleanUpImageAndSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error {
rbdVol := rbdSnap.toVolume()
err := rbdVol.Connect(cr)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
defer rbdVol.Destroy(ctx)

// cleanup the image from trash if the error is image not found.
err = rbdVol.ensureImageCleanup(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete rbd image: %q with error: %v", rbdVol.Pool, rbdVol.VolName, err)

return status.Error(codes.Internal, err.Error())
}
err = undoSnapReservation(ctx, rbdSnap, cr)
if err != nil {
log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap %q",
rbdSnap.RequestName, rbdSnap, err)

return status.Error(codes.Internal, err.Error())
}

return nil
}

// ControllerExpandVolume expand RBD Volumes on demand based on resizer request.
func (cs *ControllerServer) ControllerExpandVolume(
ctx context.Context,
Expand Down Expand Up @@ -1611,36 +1599,3 @@ func (cs *ControllerServer) ControllerExpandVolume(
NodeExpansionRequired: nodeExpansion,
}, nil
}

// ControllerPublishVolume is a dummy publish implementation to mimic a successful attach operation being a NOOP.
func (cs *ControllerServer) ControllerPublishVolume(
ctx context.Context,
req *csi.ControllerPublishVolumeRequest,
) (*csi.ControllerPublishVolumeResponse, error) {
if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
}
if req.GetNodeId() == "" {
return nil, status.Error(codes.InvalidArgument, "Node ID cannot be empty")
}
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}

return &csi.ControllerPublishVolumeResponse{
// the dummy response carry an empty map in its response.
PublishContext: map[string]string{},
}, nil
}

// ControllerUnPublishVolume is a dummy unpublish implementation to mimic a successful attach operation being a NOOP.
func (cs *ControllerServer) ControllerUnpublishVolume(
ctx context.Context,
req *csi.ControllerUnpublishVolumeRequest,
) (*csi.ControllerUnpublishVolumeResponse, error) {
if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
}

return &csi.ControllerUnpublishVolumeResponse{}, nil
}
Loading
Loading