Skip to content

Commit

Permalink
[YUNIKORN-2834] [shim] Add non-YuniKorn allocation tracking logic
Browse files Browse the repository at this point in the history
  • Loading branch information
pbacsko committed Oct 2, 2024
1 parent f1e7920 commit 6152e8c
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 237 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ go 1.22.0
toolchain go1.22.5

require (
github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3 h1:ySu0cpFSYFGNtf+PZw4ulzO+cWOyJMYJs+AjmwGWM80=
github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3/go.mod h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d h1:awo2goBrw25P1aFNZgYJ0q7V+5ycMqMhvI60B75OzQg=
github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
Expand Down
58 changes: 19 additions & 39 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,8 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {

if !common.Equals(prevCapacity, newCapacity) {
// update capacity
if capacity, occupied, ok := ctx.schedulerCache.UpdateCapacity(node.Name, newCapacity); ok {
if err := ctx.updateNodeResources(node, capacity, occupied); err != nil {
log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err))
}
} else {
log.Log(log.ShimContext).Warn("Failed to update cached node capacity", zap.String("nodeName", node.Name))
if err := ctx.updateNodeResources(node, newCapacity, nil); err != nil {
log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -370,7 +366,11 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.AddOccupiedResource)
allocReq := common.CreateAllocationForForeignPod(pod)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(allocReq); err != nil {
log.Log(log.ShimContext).Error("failed to add foreign allocation to the core",
zap.Error(err))
}
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for assigned orphaned pod",
Expand All @@ -394,8 +394,12 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource)
ctx.schedulerCache.RemovePod(pod)
releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil {
log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core",
zap.Error(err))
}
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for terminated orphaned pod",
Expand Down Expand Up @@ -441,38 +445,14 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
}

func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
if oldPod == nil {
// if pod is not in scheduler cache, no node updates are needed
log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name))
return
releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition)
if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil {
log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core",
zap.Error(err))
}

// conditions for release:
// 1. pod is already assigned to a node
// 2. pod was not in a terminal state before
// 3. pod references a known node
if !utils.IsPodTerminated(oldPod) {
if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) {
log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("podStatusBefore", string(oldPod.Status.Phase)),
zap.String("podStatusCurrent", string(pod.Status.Phase)))
// this means pod is terminated
// we need sub the occupied resource and re-sync with the scheduler-core
ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource)
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied resource update for removed orphaned pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("nodeName", pod.Spec.NodeName))
}
ctx.schedulerCache.RemovePod(pod)
}
log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
}

func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace string, podName string, resource *si.Resource, opt schedulercache.UpdateType) {

Check failure on line 458 in pkg/cache/context.go

View workflow job for this annotation

GitHub Actions / build

func `(*Context).updateNodeOccupiedResources` is unused (unused)
Expand Down Expand Up @@ -1560,7 +1540,7 @@ func (ctx *Context) decommissionNode(node *v1.Node) error {
}

func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource) error {
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied)
request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, nil)
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}

Expand Down
Loading

0 comments on commit 6152e8c

Please sign in to comment.