Skip to content

Commit

Permalink
Updates in the logs (#95)
Browse files Browse the repository at this point in the history
* Updates in the logs

Signed-off-by: Ajay Lotan Thakur <[email protected]>

* Bump github.com/gin-contrib/cors in /mongoapi/dbtestapp (#94)

Signed-off-by: Ajay Lotan Thakur <[email protected]>

* Fixing API errors

Signed-off-by: Ajay Lotan Thakur <[email protected]>

* Update drsm/api.go

Co-authored-by: gab-arrobo <[email protected]>
Signed-off-by: Ajay Lotan Thakur <[email protected]>

* Update drsm/api.go

Co-authored-by: gab-arrobo <[email protected]>
Signed-off-by: Ajay Lotan Thakur <[email protected]>

* Update drsm/api.go

Co-authored-by: gab-arrobo <[email protected]>
Signed-off-by: Ajay Lotan Thakur <[email protected]>

* Update drsm/chunk.go

Co-authored-by: gab-arrobo <[email protected]>
Signed-off-by: Ajay Lotan Thakur <[email protected]>

* Update drsm/claim.go

Co-authored-by: gab-arrobo <[email protected]>
Signed-off-by: Ajay Lotan Thakur <[email protected]>

* Fix typo and update logging level

Signed-off-by: Ajay Lotan Thakur <[email protected]>

* feat: add MongoDB functions to handle transactions (#96)

* add MongoDB functions to handle transactions

Signed-off-by: Patricia Reinoso <[email protected]>

* rename to supports transactions

Signed-off-by: Patricia Reinoso <[email protected]>

---------

Signed-off-by: Patricia Reinoso <[email protected]>
Signed-off-by: Ajay Lotan Thakur <[email protected]>

* Update drsm/api.go

Co-authored-by: gab-arrobo <[email protected]>
Signed-off-by: Ajay Lotan Thakur <[email protected]>

---------

Signed-off-by: Ajay Lotan Thakur <[email protected]>
  • Loading branch information
thakurajayL authored Jan 16, 2025
1 parent 390b6b9 commit e70812b
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 36 deletions.
30 changes: 15 additions & 15 deletions drsm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type DrsmInterface interface {
}

func InitDRSM(sharedPoolName string, myid PodId, db DbInfo, opt *Options) (DrsmInterface, error) {
logger.DrsmLog.Debugln("client id:", myid)
logger.DrsmLog.Infoln("client id:", myid)

d := &Drsm{
sharedPoolName: sharedPoolName,
Expand All @@ -68,21 +68,23 @@ func (d *Drsm) AllocateInt32ID() (int32, error) {
mutex.Lock()
defer mutex.Unlock()
if d.mode == ResourceDemux {
logger.DrsmLog.Debugln("demux mode can not allocate Resource index")
logger.DrsmLog.Errorln("demux mode can not allocate Resource index")
err := fmt.Errorf("demux mode does not allow Resource Id allocation")
return 0, err
}
for _, c := range d.localChunkTbl {
if len(c.FreeIds) > 0 {
return c.AllocateIntID(), nil
return c.AllocateIntID()
}
}
// None of the Chunk has freeIds. Allocate new Chunk
c, err := d.GetNewChunk()
if err != nil {
err := fmt.Errorf("ids not available")
logger.DrsmLog.Errorln("failed to allocate new Chunk")
err := fmt.Errorf("failed to allocate new Chunk")
return 0, err
}
return c.AllocateIntID(), nil
return c.AllocateIntID()
}

func (d *Drsm) ReleaseInt32ID(id int32) error {
Expand All @@ -107,8 +109,8 @@ func (d *Drsm) ReleaseInt32ID(id int32) error {
return nil
}
}
err := fmt.Errorf("unknown Id")
return err
logger.DrsmLog.Errorf("failed to release id - %v", id)
return fmt.Errorf("unknown Id")
}

func (d *Drsm) FindOwnerInt32ID(id int32) (*PodId, error) {
Expand All @@ -120,24 +122,22 @@ func (d *Drsm) FindOwnerInt32ID(id int32) (*PodId, error) {
podId := chunk.GetOwner()
return podId, nil
}
err := fmt.Errorf("unknown Id")
return nil, err
logger.DrsmLog.Errorf("failed to find POD owner for Id - %v ", id)
return nil, fmt.Errorf("unknown Id")
}

func (d *Drsm) AcquireIp(pool string) (string, error) {
if d.mode == ResourceDemux {
logger.DrsmLog.Debugln("demux mode can not allocate Ip")
err := fmt.Errorf("demux mode does not allow Resource allocation")
return "", err
logger.DrsmLog.Errorln("demux mode can not allocate Ip")
return "", fmt.Errorf("demux mode does not allow Resource allocation")
}
return d.acquireIp(pool)
}

func (d *Drsm) ReleaseIp(pool, ip string) error {
if d.mode == ResourceDemux {
logger.DrsmLog.Debugln("demux mode can not Release Resource")
err := fmt.Errorf("demux mode does not allow Resource Release")
return err
logger.DrsmLog.Errorln("demux mode can not Release Resource")
return fmt.Errorf("demux mode does not allow Resource Release")
}
return d.releaseIp(pool, ip)
}
Expand Down
21 changes: 13 additions & 8 deletions drsm/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
// We got to allocate new Chunk. We should select
// probable chunk number

logger.DrsmLog.Debugln("allocate new chunk")
logger.DrsmLog.Infoln("allocate new chunk")
// 14 bits --- 1,2,4,8,16
var cn int32 = 1
for {
Expand All @@ -35,7 +35,7 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
if found {
continue
}
logger.DrsmLog.Debugln("found chunk Id block", cn)
logger.DrsmLog.Debugln("found free chunk Id block", cn)
break
}
// Let's confirm if this gets updated in DB
Expand All @@ -62,21 +62,24 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
d.localChunkTbl[cn] = c

// add Ids to freeIds
// why we are not adding in global table right away???
return c, nil
}

func (c *chunk) AllocateIntID() int32 {
func (c *chunk) AllocateIntID() (int32, error) {
if len(c.FreeIds) == 0 {
logger.DrsmLog.Debugln("freeIds in chunk 0")
return 0
err := fmt.Errorf("freeIds in chunk 0")
logger.DrsmLog.Errorf("%v", err)
return 0, err
}
id := c.FreeIds[len(c.FreeIds)-1]
c.FreeIds = c.FreeIds[:len(c.FreeIds)-1]
return (c.Id << 10) | id
return (c.Id << 10) | id, nil
}

func (c *chunk) ReleaseIntID(id int32) {
i := id & 0x3ff
// not efficient but we are doing cross checks
for _, freeid := range c.FreeIds {
if freeid == i {
logger.DrsmLog.Warnf("id %v is already freed", freeid)
Expand All @@ -95,8 +98,9 @@ func (c *chunk) ReleaseIntID(id int32) {
}
}

func getChunIdFromDocId(id string) int32 {
logger.DrsmLog.Infof("id received: %v value", id)
// chunkid-123456
func getChunkIdFromDocId(id string) int32 {
logger.DrsmLog.Debugf("id received: %v value", id)
z := strings.Split(id, "-")
if len(z) == 2 && z[0] == "chunkid" {
cid, _ := strconv.ParseInt(z[1], 10, 32)
Expand All @@ -106,6 +110,7 @@ func getChunIdFromDocId(id string) int32 {
return 0
}

// check the id format and if its matching chunkid doc format then return true
func isChunkDoc(id string) bool {
z := strings.Split(id, "-")
if len(z) == 2 && z[0] == "chunkid" {
Expand Down
16 changes: 9 additions & 7 deletions drsm/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

func (d *Drsm) podDownDetected() {
fmt.Println("started Pod Down goroutine")
logger.DrsmLog.Infoln("started Pod Down goroutine")
for p := range d.podDown {
logger.DrsmLog.Infoln("pod Down detected", p)
logger.DrsmLog.Infof("pod Down detected %v", p)
// Given Pod find out current Chunks owned by this POD
pd := d.podMap[p]
for k := range pd.podChunks {
Expand All @@ -22,13 +22,14 @@ func (d *Drsm) podDownDetected() {
d.globalChunkTblMutex.Unlock()
logger.DrsmLog.Debugf("found: %v chunk: %v", found, c)
if found {
go c.claimChunk(d)
go c.claimChunk(d, pd.PodId.PodName)
}
}
}
}

func (c *chunk) claimChunk(d *Drsm) {
func (c *chunk) claimChunk(d *Drsm, curOwner string) {
// Need optimization
if d.mode != ResourceClient {
logger.DrsmLog.Infoln("claimChunk ignored demux mode")
return
Expand All @@ -37,15 +38,16 @@ func (c *chunk) claimChunk(d *Drsm) {
logger.DrsmLog.Debugln("claimChunk started")
docId := fmt.Sprintf("chunkid-%d", c.Id)
update := bson.M{"_id": docId, "type": "chunk", "podId": d.clientId.PodName, "podInstance": d.clientId.PodInstance, "podIp": d.clientId.PodIp}
filter := bson.M{"_id": docId, "podId": c.Owner.PodName}
filter := bson.M{"_id": docId, "podId": curOwner}
updated := d.mongo.RestfulAPIPutOnly(d.sharedPoolName, filter, update)
if updated == nil {
// TODO : don't add to local pool yet. We can add it only if scan is done.
logger.DrsmLog.Debugln("claimChunk success")
logger.DrsmLog.Infof("claimChunk %v success", c.Id)
c.Owner.PodName = d.clientId.PodName
c.Owner.PodIp = d.clientId.PodIp
go c.scanChunk(d)
} else {
logger.DrsmLog.Debugln("claimChunk failure")
// no problem, some other POD successfully claimed this chunk
logger.DrsmLog.Infof("claimChunk %v failure", c.Id)
}
}
15 changes: 9 additions & 6 deletions drsm/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,13 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
d.addChunk(full)
}
case "update":
// chunk ownership changed..update chunk owner
// logger.DrsmLog.Debugln("update operations")
if isChunkDoc(s.DId.Id) {
// update on chunkId..
// looks like chunk owner getting change
owner := s.Update.UpdFields.PodId
c := getChunIdFromDocId(s.DId.Id)
c := getChunkIdFromDocId(s.DId.Id)
d.globalChunkTblMutex.Lock()
cp := d.globalChunkTbl[c]
d.globalChunkTblMutex.Unlock()
Expand All @@ -169,7 +170,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
logger.DrsmLog.Debugln("delete operations")
if !isChunkDoc(s.DId.Id) {
// not chunk type doc. So its POD doc.
// delete olnly gets document id
// delete only gets document id
pod, found := d.podMap[s.DId.Id]
if pod != nil {
logger.DrsmLog.Infof("Stream(Delete): Pod %v and found %v. Chunks owned by crashed pod = %v", pod, found, pod.podChunks)
Expand All @@ -180,6 +181,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
}
}

// periodic task
func (d *Drsm) punchLiveness() {
// write to DB - signature every 5 second
ticker := time.NewTicker(5 * time.Second)
Expand Down Expand Up @@ -217,6 +219,7 @@ func (d *Drsm) punchLiveness() {
}
}

// periodic task
func (d *Drsm) checkAllChunks() {
// go through all pods to see if any pod is showing same old counter
// Mark it down locally
Expand All @@ -233,7 +236,7 @@ func (d *Drsm) checkAllChunks() {
var s FullStream
bsonBytes, _ := bson.Marshal(v)
bson.Unmarshal(bsonBytes, &s)
logger.DrsmLog.Infof("individual Chunk bson Element %v", s)
logger.DrsmLog.Debugf("individual Chunk bson Element %v", s)
d.addChunk(&s)
}
}
Expand All @@ -249,8 +252,8 @@ func (d *Drsm) addChunk(full *FullStream) {
if did == "" {
did = full.ChunkId
}
logger.DrsmLog.Infof("received Chunk Doc: %v", full)
cid := getChunIdFromDocId(did)
logger.DrsmLog.Debugf("received Chunk Doc: %v", full)
cid := getChunkIdFromDocId(did)
o := PodId{PodName: full.PodId, PodInstance: full.PodInstance, PodIp: full.PodIp}
c := &chunk{Id: cid, Owner: o}
c.resourceValidCb = d.resourceValidCb
Expand All @@ -261,7 +264,7 @@ func (d *Drsm) addChunk(full *FullStream) {
d.globalChunkTbl[cid] = c
d.globalChunkTblMutex.Unlock()

logger.DrsmLog.Infof("chunk id %v, podChunks %v", cid, pod.podChunks)
logger.DrsmLog.Debugf("chunk id %v, podChunks %v", cid, pod.podChunks)
}

func (d *Drsm) addPod(full *FullStream) *podData {
Expand Down

0 comments on commit e70812b

Please sign in to comment.