Skip to content

Commit

Permalink
[YUNIKORN-1795] Remove the scheduling cycle in the shim
Browse files Browse the repository at this point in the history
  • Loading branch information
pbacsko committed Jan 16, 2024
1 parent 57f11dc commit 589af92
Show file tree
Hide file tree
Showing 13 changed files with 549 additions and 137 deletions.
148 changes: 73 additions & 75 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Application struct {
placeholderTimeoutInSec int64
schedulingStyle string
originatingTask *Task // Original Pod which creates the requests
accepted bool
}

const transitionErr = "no transition"
Expand Down Expand Up @@ -332,80 +333,6 @@ func (app *Application) TriggerAppSubmission() error {
return app.handle(NewSubmitApplicationEvent(app.applicationID))
}

// Schedule is called in every scheduling interval,
// we are not using dispatcher here because we want to
// make state transition in sync mode in order to prevent
// generating too many duplicate events. However, it must
// ensure non of these calls is expensive, usually, they
// do nothing more than just triggering the state transition.
// return true if the app needs scheduling or false if not
func (app *Application) Schedule() bool {
switch app.GetApplicationState() {
case ApplicationStates().New:
ev := NewSubmitApplicationEvent(app.GetApplicationID())
if err := app.handle(ev); err != nil {
log.Log(log.ShimCacheApplication).Warn("failed to handle SUBMIT app event",
zap.Error(err))
}
case ApplicationStates().Accepted:
// once the app is accepted by the scheduler core,
// the next step is to send requests for scheduling
// the app state could be transited to Reserving or Running
// depends on if the app has gang members
app.postAppAccepted()
case ApplicationStates().Reserving:
// during the Reserving state, only the placeholders
// can be scheduled
app.scheduleTasks(func(t *Task) bool {
return t.placeholder
})
if len(app.GetNewTasks()) == 0 {
return false
}
case ApplicationStates().Running:
// during the Running state, only the regular pods
// can be scheduled
app.scheduleTasks(func(t *Task) bool {
return !t.placeholder
})
if len(app.GetNewTasks()) == 0 {
return false
}
default:
log.Log(log.ShimCacheApplication).Debug("skipping scheduling application",
zap.String("appState", app.GetApplicationState()),
zap.String("appID", app.GetApplicationID()),
zap.String("appState", app.GetApplicationState()))
return false
}
return true
}

func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) {
for _, task := range app.GetNewTasks() {
if taskScheduleCondition(task) {
// for each new task, we do a sanity check before moving the state to Pending_Schedule
if err := task.sanityCheckBeforeScheduling(); err == nil {
// note, if we directly trigger submit task event, it may spawn too many duplicate
// events, because a task might be submitted multiple times before its state transits to PENDING.
if handleErr := task.handle(
NewSimpleTaskEvent(task.applicationID, task.taskID, InitTask)); handleErr != nil {
// something goes wrong when transit task to PENDING state,
// this should not happen because we already checked the state
// before calling the transition. Nowhere to go, just log the error.
log.Log(log.ShimCacheApplication).Warn("init task failed", zap.Error(err))
}
} else {
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error())
log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.Error(err))
}
}
}
}

func (app *Application) handleSubmitApplicationEvent() error {
log.Log(log.ShimCacheApplication).Info("handle app submission",
zap.Stringer("app", app),
Expand Down Expand Up @@ -469,10 +396,11 @@ func (app *Application) postAppAccepted() {
// app could have allocated tasks upon a recovery, and in that case,
// the reserving phase has already passed, no need to trigger that again.
var ev events.SchedulingEvent
numAllocatedTasks := len(app.getTasks(TaskStates().Allocated))
log.Log(log.ShimCacheApplication).Debug("postAppAccepted on cached app",
zap.String("appID", app.applicationID),
zap.Int("numTaskGroups", len(app.taskGroups)),
zap.Int("numAllocatedTasks", len(app.GetAllocatedTasks())))
zap.Int("numAllocatedTasks", numAllocatedTasks))
if app.skipReservationStage() {
ev = NewRunApplicationEvent(app.applicationID)
log.Log(log.ShimCacheApplication).Info("Skip the reservation stage",
Expand All @@ -485,6 +413,24 @@ func (app *Application) postAppAccepted() {
dispatcher.Dispatch(ev)
}

func (app *Application) postAppRunning() {
tasks := make([]*Task, 0, len(app.taskMap))
for _, task := range app.taskMap {
if !task.IsPlaceholder() {
tasks = append(tasks, task)
}
}
// sort tasks based on submission time & schedule them
sort.Slice(tasks, func(i, j int) bool {
l := tasks[i]
r := tasks[j]
return l.createTime.Before(r.createTime)
})
for _, task := range tasks {
task.Schedule()
}
}

func (app *Application) onReserving() {
// happens after recovery - if placeholders already exist, we need to send
// an event to trigger Application state change in the core
Expand Down Expand Up @@ -659,3 +605,55 @@ func (app *Application) SetPlaceholderTimeout(timeout int64) {
defer app.lock.Unlock()
app.placeholderTimeoutInSec = timeout
}

func (app *Application) addTaskAndSchedule(task *Task) {
app.lock.Lock()
defer app.lock.Unlock()
if _, ok := app.taskMap[task.taskID]; ok {
// skip adding duplicate task
return
}

Check warning on line 615 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L613-L615

Added lines #L613 - L615 were not covered by tests
app.taskMap[task.taskID] = task

if app.canScheduleTask(task) {
task.Schedule()
}
}

func (app *Application) canScheduleTask(task *Task) bool {
// skip - not yet accepted by the core
if !app.accepted {
return false
}

// can submit if gang scheduling is not used
if len(app.taskGroups) == 0 {
return true
}

// placeholder, or regular task and we're past reservation
ph := task.IsPlaceholder()
currentState := app.sm.Current()
return ph || (!ph && currentState != ApplicationStates().Reserving)
}

func (app *Application) GetNewTasksWithFailedAttempt() []*Task {
app.lock.RLock()
defer app.lock.RUnlock()

taskList := make([]*Task, 0, len(app.taskMap))
for _, task := range app.taskMap {
if task.GetTaskState() == TaskStates().New && task.IsFailedAttempt() {
taskList = append(taskList, task)
}
}

// sort the task based on creation time
sort.Slice(taskList, func(i, j int) bool {
l := taskList[i]
r := taskList[j]
return l.createTime.Before(r.createTime)
})

return taskList
}
9 changes: 9 additions & 0 deletions pkg/cache/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,19 @@ func newAppState() *fsm.FSM { //nolint:funlen
zap.String("destination", event.Dst),
zap.String("event", event.Event))
},
states.Accepted: func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.accepted = true
app.postAppAccepted()
},
states.Reserving: func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.onReserving()
},
states.Running: func(ctx context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.postAppRunning()
},
SubmitApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
event.Err = app.handleSubmitApplicationEvent()
Expand Down
106 changes: 91 additions & 15 deletions pkg/cache/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,9 +731,6 @@ func TestTryReserve(t *testing.T) {
err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication))
assert.NilError(t, err)

// run app schedule
app.Schedule()

// since this app has taskGroups defined,
// once the app is accepted, it is expected to see this app goes to Reserving state
assertAppState(t, app, ApplicationStates().Reserving, 3*time.Second)
Expand Down Expand Up @@ -787,15 +784,6 @@ func TestTryReservePostRestart(t *testing.T) {
},
})

// submit the app
err := app.handle(NewSubmitApplicationEvent(app.applicationID))
assert.NilError(t, err)
assertAppState(t, app, ApplicationStates().Submitted, 3*time.Second)

// accepted the app
err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication))
assert.NilError(t, err)

// simulate some tasks are recovered during the restart
// create 3 pods, 1 of them is Allocated and the other 2 are New
resources := make(map[v1.ResourceName]resource.Quantity)
Expand Down Expand Up @@ -860,8 +848,11 @@ func TestTryReservePostRestart(t *testing.T) {
assert.Equal(t, len(app.getTasks(TaskStates().Allocated)), 1)
assert.Equal(t, len(app.getTasks(TaskStates().New)), 2)

// run app schedule
app.Schedule()
// submit app & trigger state transition to Accepted
err := app.TriggerAppSubmission()
assert.NilError(t, err)
err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication))
assert.NilError(t, err)

// since this app has Allocated tasks, the Reserving state will be skipped
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
Expand Down Expand Up @@ -1160,7 +1151,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) {
app := context.GetApplication("app00001")
assert.Assert(t, app != nil)
assert.Equal(t, app.GetApplicationID(), "app00001")
assert.Equal(t, app.GetApplicationState(), ApplicationStates().New)
assert.Equal(t, app.GetApplicationState(), ApplicationStates().Submitted)
assert.Equal(t, app.GetQueue(), "root.a")
assert.Equal(t, len(app.GetNewTasks()), 1)

Expand Down Expand Up @@ -1284,6 +1275,91 @@ func TestApplication_onReservationStateChange(t *testing.T) {
assertAppState(t, app, ApplicationStates().Running, 1*time.Second)
}

func TestAddTaskAndSchedule(t *testing.T) {
context := initContextForTest()
dispatcher.RegisterEventHandler("TestAppHandler", dispatcher.EventTypeApp, context.ApplicationEventHandler())
dispatcher.Start()
defer dispatcher.Stop()

pod := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-test-00001",
UID: "UID-00001",
},
}

// can't schedule - app is not accepted
app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())
task := NewTask("task01", app, context, pod)
app.addTaskAndSchedule(task)
assert.Equal(t, task.sm.Current(), TaskStates().New)
assert.Assert(t, !task.failedAttempt)

// can schedule task - no gang scheduling
app = NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())
app.accepted = true
task = NewTask("task01", app, context, pod)
app.addTaskAndSchedule(task)

// can schedule task - placeholder
app = NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())
app.accepted = true
app.taskGroups = []TaskGroup{
{
Name: "group1",
MinMember: 3,
},
}
task = NewTaskPlaceholder("task01", app, context, pod)
app.addTaskAndSchedule(task)
assert.Assert(t, !task.IsFailedAttempt())

// can schedule task - state is no longer Reserving
app = NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())
app.accepted = true
app.taskGroups = []TaskGroup{
{
Name: "group1",
MinMember: 3,
},
}
task = NewTask("task01", app, context, pod)
app.sm.SetState(ApplicationStates().Running)
app.addTaskAndSchedule(task)
assert.Assert(t, !task.IsFailedAttempt())
}

func TestGetNewTasksWithFailedAttempt(t *testing.T) {
context := initContextForTest()
app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())

task1 := NewTask("task01", app, context, &v1.Pod{})
task1.setFailedAttempt(true)
task1.createTime = time.UnixMilli(100)
task2 := NewTask("task02", app, context, &v1.Pod{})
task3 := NewTask("task03", app, context, &v1.Pod{})
task3.setFailedAttempt(true)
task3.createTime = time.UnixMilli(50)
task4 := NewTask("task04", app, context, &v1.Pod{})
task4.setFailedAttempt(true)
task4.createTime = time.UnixMilli(10)

app.addTask(task1)
app.addTask(task2)
app.addTask(task3)
app.addTask(task4)

tasks := app.GetNewTasksWithFailedAttempt()
assert.Equal(t, 3, len(tasks))
assert.Equal(t, "task04", tasks[0].taskID)
assert.Equal(t, "task03", tasks[1].taskID)
assert.Equal(t, "task01", tasks[2].taskID)
}

func (ctx *Context) addApplicationToContext(app *Application) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
Expand Down
17 changes: 16 additions & 1 deletion pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Context struct {
configMaps []*v1.ConfigMap // cached yunikorn configmaps
lock *sync.RWMutex // lock
txnID atomic.Uint64 // transaction ID counter
newApp bool // whether application has been added since the last time it was checked
}

// NewContext create a new context for the scheduler using a default (empty) configuration
Expand Down Expand Up @@ -322,6 +323,10 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) {
app = ctx.addApplication(&AddApplicationRequest{
Metadata: appMeta,
})
err := app.TriggerAppSubmission()
if err != nil {
log.Log(log.ShimContext).Error("BUG: application submission failed")
}

Check warning on line 329 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L328-L329

Added lines #L328 - L329 were not covered by tests
}

// get task metadata
Expand Down Expand Up @@ -1017,6 +1022,7 @@ func (ctx *Context) addApplication(request *AddApplicationRequest) *Application

// add into cache
ctx.applications[app.applicationID] = app
ctx.newApp = true
log.Log(log.ShimContext).Info("app added",
zap.String("appID", app.applicationID))

Expand Down Expand Up @@ -1111,7 +1117,7 @@ func (ctx *Context) addTask(request *AddTaskRequest) *Task {
}
}
task := NewFromTaskMeta(request.Metadata.TaskID, app, ctx, request.Metadata, originator)
app.addTask(task)
app.addTaskAndSchedule(task)
log.Log(log.ShimContext).Info("task added",
zap.String("appID", app.applicationID),
zap.String("taskID", task.taskID),
Expand Down Expand Up @@ -1709,6 +1715,15 @@ func (ctx *Context) finalizePods(existingPods []*v1.Pod) error {
return nil
}

func (ctx *Context) HasNewApplication() bool {
ctx.lock.Lock()
defer ctx.lock.Unlock()
v := ctx.newApp
ctx.newApp = false

return v
}

// for a given pod, return an allocation if found
func getExistingAllocation(pod *v1.Pod) *si.Allocation {
// skip terminated pods
Expand Down
Loading

0 comments on commit 589af92

Please sign in to comment.