From e70812bfef1a22aea21b44b9b43a0d160f72b8b0 Mon Sep 17 00:00:00 2001 From: Ajay Lotan Thakur Date: Thu, 16 Jan 2025 12:16:57 -0500 Subject: [PATCH] Updates in the logs (#95) * Updates in the logs Signed-off-by: Ajay Lotan Thakur * Bump github.com/gin-contrib/cors in /mongoapi/dbtestapp (#94) Signed-off-by: Ajay Lotan Thakur * Fixing API errors Signed-off-by: Ajay Lotan Thakur * Update drsm/api.go Co-authored-by: gab-arrobo Signed-off-by: Ajay Lotan Thakur * Update drsm/api.go Co-authored-by: gab-arrobo Signed-off-by: Ajay Lotan Thakur * Update drsm/api.go Co-authored-by: gab-arrobo Signed-off-by: Ajay Lotan Thakur * Update drsm/chunk.go Co-authored-by: gab-arrobo Signed-off-by: Ajay Lotan Thakur * Update drsm/claim.go Co-authored-by: gab-arrobo Signed-off-by: Ajay Lotan Thakur * Fix typo and update logging level Signed-off-by: Ajay Lotan Thakur * feat: add MongoDB functions to handle transactions (#96) * add MongoDB functions to handle transactions Signed-off-by: Patricia Reinoso * rename to supports transactions Signed-off-by: Patricia Reinoso --------- Signed-off-by: Patricia Reinoso Signed-off-by: Ajay Lotan Thakur * Update drsm/api.go Co-authored-by: gab-arrobo Signed-off-by: Ajay Lotan Thakur --------- Signed-off-by: Ajay Lotan Thakur --- drsm/api.go | 30 +++++++++++++++--------------- drsm/chunk.go | 21 +++++++++++++-------- drsm/claim.go | 16 +++++++++------- drsm/updates.go | 15 +++++++++------ 4 files changed, 46 insertions(+), 36 deletions(-) diff --git a/drsm/api.go b/drsm/api.go index f90ec87..296e121 100644 --- a/drsm/api.go +++ b/drsm/api.go @@ -50,7 +50,7 @@ type DrsmInterface interface { } func InitDRSM(sharedPoolName string, myid PodId, db DbInfo, opt *Options) (DrsmInterface, error) { - logger.DrsmLog.Debugln("client id:", myid) + logger.DrsmLog.Infoln("client id:", myid) d := &Drsm{ sharedPoolName: sharedPoolName, @@ -68,21 +68,23 @@ func (d *Drsm) AllocateInt32ID() (int32, error) { mutex.Lock() defer mutex.Unlock() if d.mode == ResourceDemux { - logger.DrsmLog.Debugln("demux mode can not allocate Resource index") + logger.DrsmLog.Errorln("demux mode can not allocate Resource index") err := fmt.Errorf("demux mode does not allow Resource Id allocation") return 0, err } for _, c := range d.localChunkTbl { if len(c.FreeIds) > 0 { - return c.AllocateIntID(), nil + return c.AllocateIntID() } } + // None of the Chunk has freeIds. Allocate new Chunk c, err := d.GetNewChunk() if err != nil { - err := fmt.Errorf("ids not available") + logger.DrsmLog.Errorln("failed to allocate new Chunk") + err := fmt.Errorf("failed to allocate new Chunk") return 0, err } - return c.AllocateIntID(), nil + return c.AllocateIntID() } func (d *Drsm) ReleaseInt32ID(id int32) error { @@ -107,8 +109,8 @@ func (d *Drsm) ReleaseInt32ID(id int32) error { return nil } } - err := fmt.Errorf("unknown Id") - return err + logger.DrsmLog.Errorf("failed to release id - %v", id) + return fmt.Errorf("unknown Id") } func (d *Drsm) FindOwnerInt32ID(id int32) (*PodId, error) { @@ -120,24 +122,22 @@ func (d *Drsm) FindOwnerInt32ID(id int32) (*PodId, error) { podId := chunk.GetOwner() return podId, nil } - err := fmt.Errorf("unknown Id") - return nil, err + logger.DrsmLog.Errorf("failed to find POD owner for Id - %v ", id) + return nil, fmt.Errorf("unknown Id") } func (d *Drsm) AcquireIp(pool string) (string, error) { if d.mode == ResourceDemux { - logger.DrsmLog.Debugln("demux mode can not allocate Ip") - err := fmt.Errorf("demux mode does not allow Resource allocation") - return "", err + logger.DrsmLog.Errorln("demux mode can not allocate Ip") + return "", fmt.Errorf("demux mode does not allow Resource allocation") } return d.acquireIp(pool) } func (d *Drsm) ReleaseIp(pool, ip string) error { if d.mode == ResourceDemux { - logger.DrsmLog.Debugln("demux mode can not Release Resource") - err := fmt.Errorf("demux mode does not allow Resource Release") - return err + logger.DrsmLog.Errorln("demux mode can not Release Resource") + return fmt.Errorf("demux mode does not allow Resource Release") } return d.releaseIp(pool, ip) } diff --git a/drsm/chunk.go b/drsm/chunk.go index 7fc9026..6a3a4c8 100644 --- a/drsm/chunk.go +++ b/drsm/chunk.go @@ -23,7 +23,7 @@ func (d *Drsm) GetNewChunk() (*chunk, error) { // We got to allocate new Chunk. We should select // probable chunk number - logger.DrsmLog.Debugln("allocate new chunk") + logger.DrsmLog.Infoln("allocate new chunk") // 14 bits --- 1,2,4,8,16 var cn int32 = 1 for { @@ -35,7 +35,7 @@ func (d *Drsm) GetNewChunk() (*chunk, error) { if found { continue } - logger.DrsmLog.Debugln("found chunk Id block", cn) + logger.DrsmLog.Debugln("found free chunk Id block", cn) break } // Let's confirm if this gets updated in DB @@ -62,21 +62,24 @@ func (d *Drsm) GetNewChunk() (*chunk, error) { d.localChunkTbl[cn] = c // add Ids to freeIds + // why we are not adding in global table right away??? return c, nil } -func (c *chunk) AllocateIntID() int32 { +func (c *chunk) AllocateIntID() (int32, error) { if len(c.FreeIds) == 0 { - logger.DrsmLog.Debugln("freeIds in chunk 0") - return 0 + err := fmt.Errorf("freeIds in chunk 0") + logger.DrsmLog.Errorf("%v", err) + return 0, err } id := c.FreeIds[len(c.FreeIds)-1] c.FreeIds = c.FreeIds[:len(c.FreeIds)-1] - return (c.Id << 10) | id + return (c.Id << 10) | id, nil } func (c *chunk) ReleaseIntID(id int32) { i := id & 0x3ff + // not efficient but we are doing cross checks for _, freeid := range c.FreeIds { if freeid == i { logger.DrsmLog.Warnf("id %v is already freed", freeid) @@ -95,8 +98,9 @@ func (c *chunk) ReleaseIntID(id int32) { } } -func getChunIdFromDocId(id string) int32 { - logger.DrsmLog.Infof("id received: %v value", id) +// chunkid-123456 +func getChunkIdFromDocId(id string) int32 { + logger.DrsmLog.Debugf("id received: %v value", id) z := strings.Split(id, "-") if len(z) == 2 && z[0] == "chunkid" { cid, _ := strconv.ParseInt(z[1], 10, 32) @@ -106,6 +110,7 @@ func getChunIdFromDocId(id string) int32 { return 0 } +// check the id format and if its matching chunkid doc format then return true func isChunkDoc(id string) bool { z := strings.Split(id, "-") if len(z) == 2 && z[0] == "chunkid" { diff --git a/drsm/claim.go b/drsm/claim.go index 86ca9b4..78f7b61 100644 --- a/drsm/claim.go +++ b/drsm/claim.go @@ -11,9 +11,9 @@ import ( ) func (d *Drsm) podDownDetected() { - fmt.Println("started Pod Down goroutine") + logger.DrsmLog.Infoln("started Pod Down goroutine") for p := range d.podDown { - logger.DrsmLog.Infoln("pod Down detected", p) + logger.DrsmLog.Infof("pod Down detected %v", p) // Given Pod find out current Chunks owned by this POD pd := d.podMap[p] for k := range pd.podChunks { @@ -22,13 +22,14 @@ func (d *Drsm) podDownDetected() { d.globalChunkTblMutex.Unlock() logger.DrsmLog.Debugf("found: %v chunk: %v", found, c) if found { - go c.claimChunk(d) + go c.claimChunk(d, pd.PodId.PodName) } } } } -func (c *chunk) claimChunk(d *Drsm) { +func (c *chunk) claimChunk(d *Drsm, curOwner string) { + // Need optimization if d.mode != ResourceClient { logger.DrsmLog.Infoln("claimChunk ignored demux mode") return @@ -37,15 +38,16 @@ func (c *chunk) claimChunk(d *Drsm) { logger.DrsmLog.Debugln("claimChunk started") docId := fmt.Sprintf("chunkid-%d", c.Id) update := bson.M{"_id": docId, "type": "chunk", "podId": d.clientId.PodName, "podInstance": d.clientId.PodInstance, "podIp": d.clientId.PodIp} - filter := bson.M{"_id": docId, "podId": c.Owner.PodName} + filter := bson.M{"_id": docId, "podId": curOwner} updated := d.mongo.RestfulAPIPutOnly(d.sharedPoolName, filter, update) if updated == nil { // TODO : don't add to local pool yet. We can add it only if scan is done. - logger.DrsmLog.Debugln("claimChunk success") + logger.DrsmLog.Infof("claimChunk %v success", c.Id) c.Owner.PodName = d.clientId.PodName c.Owner.PodIp = d.clientId.PodIp go c.scanChunk(d) } else { - logger.DrsmLog.Debugln("claimChunk failure") + // no problem, some other POD successfully claimed this chunk + logger.DrsmLog.Infof("claimChunk %v failure", c.Id) } } diff --git a/drsm/updates.go b/drsm/updates.go index 96863d7..e49eaa7 100644 --- a/drsm/updates.go +++ b/drsm/updates.go @@ -148,12 +148,13 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan d.addChunk(full) } case "update": + // chunk ownership changed..update chunk owner // logger.DrsmLog.Debugln("update operations") if isChunkDoc(s.DId.Id) { // update on chunkId.. // looks like chunk owner getting change owner := s.Update.UpdFields.PodId - c := getChunIdFromDocId(s.DId.Id) + c := getChunkIdFromDocId(s.DId.Id) d.globalChunkTblMutex.Lock() cp := d.globalChunkTbl[c] d.globalChunkTblMutex.Unlock() @@ -169,7 +170,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan logger.DrsmLog.Debugln("delete operations") if !isChunkDoc(s.DId.Id) { // not chunk type doc. So its POD doc. - // delete olnly gets document id + // delete only gets document id pod, found := d.podMap[s.DId.Id] if pod != nil { logger.DrsmLog.Infof("Stream(Delete): Pod %v and found %v. Chunks owned by crashed pod = %v", pod, found, pod.podChunks) @@ -180,6 +181,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan } } +// periodic task func (d *Drsm) punchLiveness() { // write to DB - signature every 5 second ticker := time.NewTicker(5 * time.Second) @@ -217,6 +219,7 @@ func (d *Drsm) punchLiveness() { } } +// periodic task func (d *Drsm) checkAllChunks() { // go through all pods to see if any pod is showing same old counter // Mark it down locally @@ -233,7 +236,7 @@ func (d *Drsm) checkAllChunks() { var s FullStream bsonBytes, _ := bson.Marshal(v) bson.Unmarshal(bsonBytes, &s) - logger.DrsmLog.Infof("individual Chunk bson Element %v", s) + logger.DrsmLog.Debugf("individual Chunk bson Element %v", s) d.addChunk(&s) } } @@ -249,8 +252,8 @@ func (d *Drsm) addChunk(full *FullStream) { if did == "" { did = full.ChunkId } - logger.DrsmLog.Infof("received Chunk Doc: %v", full) - cid := getChunIdFromDocId(did) + logger.DrsmLog.Debugf("received Chunk Doc: %v", full) + cid := getChunkIdFromDocId(did) o := PodId{PodName: full.PodId, PodInstance: full.PodInstance, PodIp: full.PodIp} c := &chunk{Id: cid, Owner: o} c.resourceValidCb = d.resourceValidCb @@ -261,7 +264,7 @@ func (d *Drsm) addChunk(full *FullStream) { d.globalChunkTbl[cid] = c d.globalChunkTblMutex.Unlock() - logger.DrsmLog.Infof("chunk id %v, podChunks %v", cid, pod.podChunks) + logger.DrsmLog.Debugf("chunk id %v, podChunks %v", cid, pod.podChunks) } func (d *Drsm) addPod(full *FullStream) *podData {