Skip to content

Commit

Permalink
rbd: get volumegroup in secondary cluster
Browse files Browse the repository at this point in the history
Currently, `GetVolumeGroup()` fetches the RBD group from the
pool using the clusterID & poolID encoded in the VolumeGroupHandle.
However, this approach may fail in a secondary mirrored cluster,
where the clusterID & poolID could differ.

This commit ensures that `GetVolumeGroup` leverages the
clusterIDMapping and RBDPoolIDMapping to locate the RBD group in the
appropriate  pool if it is not found in the pool corresponding
to the poolID encoded in the VolumeGroupHandle.

Signed-off-by: Praveen M <[email protected]>
  • Loading branch information
iPraveenParihar committed Feb 3, 2025
1 parent 72cfaaf commit f511417
Showing 1 changed file with 110 additions and 11 deletions.
121 changes: 110 additions & 11 deletions internal/rbd/group/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/ceph/go-ceph/rados"
Expand Down Expand Up @@ -63,31 +64,28 @@ type commonVolumeGroup struct {
journal journal.VolumeGroupJournal
}

func (cvg *commonVolumeGroup) initCommonVolumeGroup(
// generateVolumeGroup generates a commonVolumeGroup structure from the volumeGroup identifier.
func generateVolumeGroup(
ctx context.Context,
id string,
csiDriver string,
creds *util.Credentials,
) error {
csiID := util.CSIIdentifier{}
err := csiID.DecomposeCSIID(id)
if err != nil {
return fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
}

csiID util.CSIIdentifier,
) (*commonVolumeGroup, error) {
cvg := &commonVolumeGroup{}
mons, err := util.Mons(util.CsiConfigFile, csiID.ClusterID)
if err != nil {
return fmt.Errorf("failed to get MONs for cluster id %q: %w", csiID.ClusterID, err)
return nil, fmt.Errorf("failed to get MONs for cluster id %q: %w", csiID.ClusterID, err)
}

namespace, err := util.GetRBDRadosNamespace(util.CsiConfigFile, csiID.ClusterID)
if err != nil {
return fmt.Errorf("failed to get RADOS namespace for cluster id %q: %w", csiID.ClusterID, err)
return nil, fmt.Errorf("failed to get RADOS namespace for cluster id %q: %w", csiID.ClusterID, err)
}

pool, err := util.GetPoolName(mons, creds, csiID.LocationID)
if err != nil {
return fmt.Errorf("failed to get pool for volume group id %q: %w", id, err)
return nil, fmt.Errorf("failed to get pool for volume group id %q: %w", id, err)
}

cvg.csiDriver = csiDriver
Expand All @@ -99,6 +97,107 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
cvg.pool = pool
cvg.namespace = namespace

return cvg, nil
}

// generateVolumeGroupFromMapping checks the clusterID and poolID mapping and
// generates commonVolumeGroup structure for the mapped clusterID and poolID.
func generateVolumeGroupFromMapping(
ctx context.Context,
id string,
csiDriver string,
creds *util.Credentials,
csiID util.CSIIdentifier,
mapping *[]util.ClusterMappingInfo,
) (*commonVolumeGroup, error) {
cvg := &commonVolumeGroup{}
mcsiID := csiID
existingClusterID := csiID.ClusterID
existingPoolID := strconv.FormatInt(csiID.LocationID, 10)

for _, cm := range *mapping {
for key, val := range cm.ClusterIDMapping {
mappedClusterID := util.GetMappedID(key, val, csiID.ClusterID)
if mappedClusterID == "" {
continue
}

log.DebugLog(ctx,
"found new clusterID mapping %s for existing clusterID %s", mappedClusterID, existingClusterID)

// Add mapped clusterID to Identifier
mcsiID.ClusterID = mappedClusterID
for _, pools := range cm.RBDpoolIDMappingInfo {
for key, val := range pools {
mappedPoolID := util.GetMappedID(key, val, existingPoolID)
if mappedPoolID == "" {
continue
}
log.DebugLog(ctx,
"found new poolID mapping %s for existing poolID %s", mappedPoolID, existingPoolID)

mPID, err := strconv.ParseInt(mappedPoolID, 10, 64)
if err != nil {
return cvg, err
}
mcsiID.LocationID = mPID
cvg, err = generateVolumeGroup(ctx, id, csiDriver, creds, mcsiID)
if err != nil && !errors.Is(err, util.ErrPoolNotFound) {
return cvg, err
}
// If the pool is found, return the volume group
if cvg != nil {
return cvg, nil
}
}
}
}
}

return nil, util.ErrPoolNotFound
}

func (cvg *commonVolumeGroup) initCommonVolumeGroup(
ctx context.Context,
id string,
csiDriver string,
creds *util.Credentials,
) error {
vg := &commonVolumeGroup{}
csiID := util.CSIIdentifier{}

err := csiID.DecomposeCSIID(id)
if err != nil {
return fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
}

vg, err = generateVolumeGroup(ctx, id, csiDriver, creds, csiID)
if err != nil && !errors.Is(err, util.ErrPoolNotFound) {
return err
}

if err != nil && errors.Is(err, util.ErrPoolNotFound) {
mapping, err := util.GetClusterMappingInfo(csiID.ClusterID)
if err != nil {
return err
}
if mapping != nil {
vg, err = generateVolumeGroupFromMapping(ctx, id, csiDriver, creds, csiID, mapping)
if err != nil {
return err
}
}
}

cvg.csiDriver = vg.csiDriver
cvg.credentials = vg.credentials
cvg.id = vg.id
cvg.clusterID = vg.clusterID
cvg.objectUUID = vg.objectUUID
cvg.monitors = vg.monitors
cvg.pool = vg.pool
cvg.namespace = vg.namespace

log.DebugLog(ctx, "object for volume group %q has been initialized", cvg.id)

return nil
Expand Down

0 comments on commit f511417

Please sign in to comment.