Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates in the logs #95

Merged
merged 13 commits into from
Jan 16, 2025
30 changes: 15 additions & 15 deletions drsm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type DrsmInterface interface {
}

func InitDRSM(sharedPoolName string, myid PodId, db DbInfo, opt *Options) (DrsmInterface, error) {
logger.DrsmLog.Debugln("client id:", myid)
logger.DrsmLog.Infoln("client id:", myid)

d := &Drsm{
sharedPoolName: sharedPoolName,
Expand All @@ -68,21 +68,23 @@ func (d *Drsm) AllocateInt32ID() (int32, error) {
mutex.Lock()
defer mutex.Unlock()
if d.mode == ResourceDemux {
logger.DrsmLog.Debugln("demux mode can not allocate Resource index")
logger.DrsmLog.Errorln("demux mode can not allocate Resource index")
err := fmt.Errorf("demux mode does not allow Resource Id allocation")
return 0, err
}
for _, c := range d.localChunkTbl {
if len(c.FreeIds) > 0 {
return c.AllocateIntID(), nil
return c.AllocateIntID()
}
}
// None of the Chunk has freeIds. Allocate new Chunk
c, err := d.GetNewChunk()
if err != nil {
err := fmt.Errorf("ids not available")
logger.DrsmLog.Errorln("failed to allocate new Chunk")
err := fmt.Errorf("failed to allocate new Chunk")
gab-arrobo marked this conversation as resolved.
Show resolved Hide resolved
return 0, err
}
return c.AllocateIntID(), nil
return c.AllocateIntID()
}

func (d *Drsm) ReleaseInt32ID(id int32) error {
Expand All @@ -107,8 +109,8 @@ func (d *Drsm) ReleaseInt32ID(id int32) error {
return nil
}
}
err := fmt.Errorf("unknown Id")
return err
logger.DrsmLog.Errorf("failed to release id - %v", id)
return fmt.Errorf("unknown Id")
}

func (d *Drsm) FindOwnerInt32ID(id int32) (*PodId, error) {
Expand All @@ -120,24 +122,22 @@ func (d *Drsm) FindOwnerInt32ID(id int32) (*PodId, error) {
podId := chunk.GetOwner()
return podId, nil
}
err := fmt.Errorf("unknown Id")
return nil, err
logger.DrsmLog.Errorf("failed to find POD owner for Id - %v ", id)
return nil, fmt.Errorf("unknown Id")
}

func (d *Drsm) AcquireIp(pool string) (string, error) {
if d.mode == ResourceDemux {
logger.DrsmLog.Debugln("demux mode can not allocate Ip")
err := fmt.Errorf("demux mode does not allow Resource allocation")
return "", err
logger.DrsmLog.Errorln("demux mode can not allocate Ip")
return "", fmt.Errorf("demux mode does not allow Resource allocation")
}
return d.acquireIp(pool)
}

func (d *Drsm) ReleaseIp(pool, ip string) error {
if d.mode == ResourceDemux {
logger.DrsmLog.Debugln("demux mode can not Release Resource")
err := fmt.Errorf("demux mode does not allow Resource Release")
return err
logger.DrsmLog.Errorln("demux mode can not Release Resource")
return fmt.Errorf("demux mode does not allow Resource Release")
}
return d.releaseIp(pool, ip)
}
Expand Down
21 changes: 13 additions & 8 deletions drsm/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
// We got to allocate new Chunk. We should select
// probable chunk number

logger.DrsmLog.Debugln("allocate new chunk")
logger.DrsmLog.Infoln("allocate new chunk")
// 14 bits --- 1,2,4,8,16
var cn int32 = 1
for {
Expand All @@ -35,7 +35,7 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
if found {
continue
}
logger.DrsmLog.Debugln("found chunk Id block", cn)
logger.DrsmLog.Debugln("found free chunk Id block", cn)
break
}
// Let's confirm if this gets updated in DB
Expand All @@ -62,21 +62,24 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
d.localChunkTbl[cn] = c

// add Ids to freeIds
// why we are not adding in global table right away???
return c, nil
}

func (c *chunk) AllocateIntID() int32 {
func (c *chunk) AllocateIntID() (int32, error) {
if len(c.FreeIds) == 0 {
logger.DrsmLog.Debugln("freeIds in chunk 0")
return 0
err := fmt.Errorf("freeIds in chunk 0")
logger.DrsmLog.Errorf("%v", err)
return 0, err
}
id := c.FreeIds[len(c.FreeIds)-1]
c.FreeIds = c.FreeIds[:len(c.FreeIds)-1]
return (c.Id << 10) | id
return (c.Id << 10) | id, nil
}

func (c *chunk) ReleaseIntID(id int32) {
i := id & 0x3ff
// not efficient but we are doing cross checks
for _, freeid := range c.FreeIds {
if freeid == i {
logger.DrsmLog.Warnf("id %v is already freed", freeid)
Expand All @@ -95,8 +98,9 @@ func (c *chunk) ReleaseIntID(id int32) {
}
}

func getChunIdFromDocId(id string) int32 {
logger.DrsmLog.Infof("id received: %v value", id)
// chunkid-123456
func getChunkIdFromDocId(id string) int32 {
logger.DrsmLog.Debugf("id received: %v value", id)
z := strings.Split(id, "-")
if len(z) == 2 && z[0] == "chunkid" {
cid, _ := strconv.ParseInt(z[1], 10, 32)
Expand All @@ -106,6 +110,7 @@ func getChunIdFromDocId(id string) int32 {
return 0
}

// check the id format and if its matching chunkid doc format then return true
func isChunkDoc(id string) bool {
z := strings.Split(id, "-")
if len(z) == 2 && z[0] == "chunkid" {
Expand Down
16 changes: 9 additions & 7 deletions drsm/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

func (d *Drsm) podDownDetected() {
fmt.Println("started Pod Down goroutine")
logger.DrsmLog.Infoln("started Pod Down goroutine")
for p := range d.podDown {
logger.DrsmLog.Infoln("pod Down detected", p)
logger.DrsmLog.Infof("pod Down detected %v", p)
// Given Pod find out current Chunks owned by this POD
pd := d.podMap[p]
for k := range pd.podChunks {
Expand All @@ -22,13 +22,14 @@ func (d *Drsm) podDownDetected() {
d.globalChunkTblMutex.Unlock()
logger.DrsmLog.Debugf("found: %v chunk: %v", found, c)
if found {
go c.claimChunk(d)
go c.claimChunk(d, pd.PodId.PodName)
}
}
}
}

func (c *chunk) claimChunk(d *Drsm) {
func (c *chunk) claimChunk(d *Drsm, curOwner string) {
// Need optimization
if d.mode != ResourceClient {
logger.DrsmLog.Infoln("claimChunk ignored demux mode")
return
Expand All @@ -37,15 +38,16 @@ func (c *chunk) claimChunk(d *Drsm) {
logger.DrsmLog.Debugln("claimChunk started")
docId := fmt.Sprintf("chunkid-%d", c.Id)
update := bson.M{"_id": docId, "type": "chunk", "podId": d.clientId.PodName, "podInstance": d.clientId.PodInstance, "podIp": d.clientId.PodIp}
filter := bson.M{"_id": docId, "podId": c.Owner.PodName}
filter := bson.M{"_id": docId, "podId": curOwner}
updated := d.mongo.RestfulAPIPutOnly(d.sharedPoolName, filter, update)
if updated == nil {
// TODO : don't add to local pool yet. We can add it only if scan is done.
logger.DrsmLog.Debugln("claimChunk success")
logger.DrsmLog.Infof("claimChunk %v success", c.Id)
c.Owner.PodName = d.clientId.PodName
c.Owner.PodIp = d.clientId.PodIp
go c.scanChunk(d)
} else {
logger.DrsmLog.Debugln("claimChunk failure")
// no problem, some other POD successfully claimed this chunk
logger.DrsmLog.Infof("claimChunk %v failure", c.Id)
}
}
15 changes: 9 additions & 6 deletions drsm/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,13 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
d.addChunk(full)
}
case "update":
// chunk ownership changed..update chunk owner
// logger.DrsmLog.Debugln("update operations")
if isChunkDoc(s.DId.Id) {
// update on chunkId..
// looks like chunk owner getting change
owner := s.Update.UpdFields.PodId
c := getChunIdFromDocId(s.DId.Id)
c := getChunkIdFromDocId(s.DId.Id)
d.globalChunkTblMutex.Lock()
cp := d.globalChunkTbl[c]
d.globalChunkTblMutex.Unlock()
Expand All @@ -169,7 +170,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
logger.DrsmLog.Debugln("delete operations")
if !isChunkDoc(s.DId.Id) {
// not chunk type doc. So its POD doc.
// delete olnly gets document id
// delete only gets document id
pod, found := d.podMap[s.DId.Id]
if pod != nil {
logger.DrsmLog.Infof("Stream(Delete): Pod %v and found %v. Chunks owned by crashed pod = %v", pod, found, pod.podChunks)
Expand All @@ -180,6 +181,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
}
}

// periodic task
func (d *Drsm) punchLiveness() {
// write to DB - signature every 5 second
ticker := time.NewTicker(5 * time.Second)
Expand Down Expand Up @@ -217,6 +219,7 @@ func (d *Drsm) punchLiveness() {
}
}

// periodic task
func (d *Drsm) checkAllChunks() {
// go through all pods to see if any pod is showing same old counter
// Mark it down locally
Expand All @@ -233,7 +236,7 @@ func (d *Drsm) checkAllChunks() {
var s FullStream
bsonBytes, _ := bson.Marshal(v)
bson.Unmarshal(bsonBytes, &s)
logger.DrsmLog.Infof("individual Chunk bson Element %v", s)
logger.DrsmLog.Debugf("individual Chunk bson Element %v", s)
d.addChunk(&s)
}
}
Expand All @@ -249,8 +252,8 @@ func (d *Drsm) addChunk(full *FullStream) {
if did == "" {
did = full.ChunkId
}
logger.DrsmLog.Infof("received Chunk Doc: %v", full)
cid := getChunIdFromDocId(did)
logger.DrsmLog.Debugf("received Chunk Doc: %v", full)
cid := getChunkIdFromDocId(did)
o := PodId{PodName: full.PodId, PodInstance: full.PodInstance, PodIp: full.PodIp}
c := &chunk{Id: cid, Owner: o}
c.resourceValidCb = d.resourceValidCb
Expand All @@ -261,7 +264,7 @@ func (d *Drsm) addChunk(full *FullStream) {
d.globalChunkTbl[cid] = c
d.globalChunkTblMutex.Unlock()

logger.DrsmLog.Infof("chunk id %v, podChunks %v", cid, pod.podChunks)
logger.DrsmLog.Debugf("chunk id %v, podChunks %v", cid, pod.podChunks)
}

func (d *Drsm) addPod(full *FullStream) *podData {
Expand Down
Loading