Skip to content

[YUNIKORN-3036] fix race prevention regression #1014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 30 additions & 24 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,9 @@ func (pc *PartitionContext) calculateNodesResourceUsage() map[string][]int {
return mapResult
}

func (pc *PartitionContext) generateReleased(release *si.AllocationRelease, app *objects.Application) []*objects.Allocation {
// processAllocationRelease processes the releases from the RM and removes the allocation(s) as requested.
// Updates the application which can trigger an application state change.
func (pc *PartitionContext) processAllocationRelease(release *si.AllocationRelease, app *objects.Application) []*objects.Allocation {
released := make([]*objects.Allocation, 0)
// when allocationKey is not specified, remove all allocations from the app
allocationKey := release.GetAllocationKey()
Expand Down Expand Up @@ -1448,7 +1450,7 @@ func (pc *PartitionContext) generateReleased(release *si.AllocationRelease, app

// removeAllocation removes the referenced allocation(s) from the applications and nodes
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) { //nolint:funlen
func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) {
if release == nil {
return nil, nil
}
Expand All @@ -1468,25 +1470,20 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
return nil, nil
}

// temp store for allocations manipulated
released := pc.generateReleased(release, app)
var confirmed *objects.Allocation
// **** DO NOT MOVE **** this must be called before any allocations are released.
// Processing a removal while in the Completing state could race with the state change. The race occurs between
// removing the allocation and updating the queue after node processing. If the state change removes the queue link
// before we get to updating the queue after the node we leave the resources as allocated on the queue. The queue
// will always exist at this point. Retrieving the queue now sidesteps this.
queue := app.GetQueue()

// all releases are collected: placeholder count needs updating for all placeholder releases
// regardless of what happens later
phReleases := 0
for _, r := range released {
if r.IsPlaceholder() {
phReleases++
}
}
if phReleases > 0 {
pc.decPhAllocationCount(phReleases)
}
released := pc.processAllocationRelease(release, app)
pc.updatePhAllocationCount(released)

// for each allocation to release, update the node and queue.
total := resources.NewResource()
totalPreempting := resources.NewResource()
var confirmed *objects.Allocation
// for each allocation to release, update the node and queue.
for _, alloc := range released {
node := pc.GetNode(alloc.GetNodeID())
if node == nil {
Expand Down Expand Up @@ -1537,13 +1534,6 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
}
}

// Processing a removal while in the Completing state could race with the state change.
// The race occurs between removing the allocation and updating the queue after node processing.
// If the state change removes the queue link before we get to updating the queue after the node we
// leave the resources as allocated on the queue. The queue cannot be removed yet at this point as
// there are still allocations left. So retrieve the queue early to sidestep the race.
queue := app.GetQueue()

if resources.StrictlyGreaterThanZero(total) {
if err := queue.DecAllocatedResource(total); err != nil {
log.Log(log.SchedPartition).Warn("failed to release resources from queue",
Expand Down Expand Up @@ -1581,6 +1571,22 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
return released, confirmed
}

// updatePhAllocationCount checks the released allocations and updates the partition context counter of allocated
// placeholders.
func (pc *PartitionContext) updatePhAllocationCount(released []*objects.Allocation) {
// all releases are collected: placeholder count needs updating for all placeholder releases
// regardless of what happens later
phReleases := 0
for _, a := range released {
if a.IsPlaceholder() {
phReleases++
}
}
if phReleases > 0 {
pc.decPhAllocationCount(phReleases)
}
}

func (pc *PartitionContext) removeForeignAllocation(allocID string) {
pc.Lock()
defer pc.Unlock()
Expand Down
71 changes: 17 additions & 54 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,9 +1541,7 @@ func TestGetQueue(t *testing.T) {
func TestTryAllocate(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
Expand Down Expand Up @@ -1620,12 +1618,7 @@ func TestTryAllocate(t *testing.T) {
func TestRequiredNodeReservation(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
assert.Assert(t, partition != nil, "partition create failed")
node := partition.nodes.GetNode(nodeID1)
if node == nil {
t.Fatal("node-1 should have been created")
Expand Down Expand Up @@ -1706,9 +1699,7 @@ func TestRequiredNodeReservation(t *testing.T) {
// allocate ask request with required node having non daemon set reservations
func TestRequiredNodeCancelOtherReservations(t *testing.T) {
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
Expand Down Expand Up @@ -1786,9 +1777,7 @@ func TestRequiredNodeCancelOtherReservations(t *testing.T) {
// allocate ask request with required node having daemon set reservations
func TestRequiredNodeCancelDSReservations(t *testing.T) {
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
Expand Down Expand Up @@ -1871,9 +1860,7 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) {
func TestRequiredNodeNotExist(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
Expand Down Expand Up @@ -1908,9 +1895,7 @@ func TestRequiredNodeNotExist(t *testing.T) {
// basic ds scheduling on specific node in first allocate run itself (without any need for reservation)
func TestRequiredNodeAllocation(t *testing.T) {
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result.Request.String())
}
Expand Down Expand Up @@ -2076,9 +2061,7 @@ func TestPreemptionForRequiredNodeReservedAlloc(t *testing.T) {

func TestPreemptionForRequiredNodeMultipleAttemptsAvoided(t *testing.T) {
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")

app, testHandler := newApplicationWithHandler(appID1, "default", "root.parent.sub-leaf")
res, err := resources.NewResourceFromConf(map[string]string{"vcore": "8"})
Expand Down Expand Up @@ -2158,9 +2141,7 @@ func getExpectedQueuesLimitsForPreemptionWithRequiredNode() map[string]map[strin
// setup the partition with existing allocations so we can test preemption
func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application, *objects.Application, *objects.Allocation, *objects.Allocation) {
partition := createPreemptionQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
Expand Down Expand Up @@ -2220,9 +2201,7 @@ func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application, *ob
// setup the partition in a state that we need for multiple tests
func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, *objects.Application) {
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
Expand Down Expand Up @@ -2300,9 +2279,7 @@ func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, *objects.A
func TestTryAllocateLarge(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
Expand Down Expand Up @@ -2333,9 +2310,7 @@ func TestTryAllocateLarge(t *testing.T) {
func TestAllocReserveNewNode(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned result: %s", result)
}
Expand Down Expand Up @@ -2404,9 +2379,7 @@ func TestAllocReserveNewNode(t *testing.T) {
func TestTryAllocateReserve(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryReservedAllocate(); result != nil {
t.Fatalf("empty cluster reserved allocate returned allocation: %s", result)
}
Expand Down Expand Up @@ -2478,9 +2451,7 @@ func TestTryAllocateReserve(t *testing.T) {
func TestTryAllocateWithReserved(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if alloc := partition.tryReservedAllocate(); alloc != nil {
t.Fatalf("empty cluster reserved allocate returned allocation: %v", alloc)
}
Expand All @@ -2502,9 +2473,7 @@ func TestTryAllocateWithReserved(t *testing.T) {

// reserve one node: scheduling should happen on the other
node2 := partition.GetNode(nodeID2)
if node2 == nil {
t.Fatal("expected node-2 to be returned got nil")
}
assert.Assert(t, node2 != nil, "expected node-2 to be returned got nil")
partition.reserve(app, node2, ask)
if app.NodeReservedForAsk(allocKey) != nodeID2 {
t.Fatal("reservation failure for alloc-1 and node-2")
Expand Down Expand Up @@ -2533,9 +2502,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
func TestScheduleRemoveReservedAsk(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
Expand Down Expand Up @@ -2623,9 +2590,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
// update the config with nodes registered and make sure that the root max and guaranteed are not changed
func TestUpdateRootQueue(t *testing.T) {
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
res, err := resources.NewResourceFromConf(map[string]string{"vcore": "20"})
assert.NilError(t, err, "resource creation failed")
assert.Assert(t, resources.Equals(res, partition.totalPartitionResource), "partition resource not set as expected")
Expand Down Expand Up @@ -3927,9 +3892,7 @@ func TestGetNodeSortingPolicyWhenNewPartitionFromConfig(t *testing.T) {
func TestTryAllocateMaxRunning(t *testing.T) {
const resType = "vcore"
partition := createQueuesNodes(t)
if partition == nil {
t.Fatal("partition create failed")
}
assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s", result)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/scheduler/tests/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,9 +675,7 @@ func TestAppRecovery(t *testing.T) {
mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)

app := serviceContext.Scheduler.GetClusterContext().GetApplication(appID1, "[rm:123]default")
if app == nil {
t.Fatal("application not found after recovery")
}
assert.Assert(t, app != nil, "application not found after recovery")
assert.Equal(t, app.ApplicationID, appID1)
assert.Equal(t, app.GetQueuePath(), "root.a")
}
Expand Down
Loading