Skip to content

Commit

Permalink
detect and recover from kernel auto restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
shibbas committed Jun 3, 2021
1 parent 7a0ad1a commit 7fc02ba
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 3 deletions.
3 changes: 3 additions & 0 deletions packages/actions/src/actionTypes/kernel_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export const LAUNCH_KERNEL_FAILED = "LAUNCH_KERNEL_FAILED";
export const SHUTDOWN_REPLY_SUCCEEDED = "SHUTDOWN_REPLY_SUCCEEDED";
export const SHUTDOWN_REPLY_TIMED_OUT = "SHUTDOWN_REPLY_TIMED_OUT";
export const DISPOSE_KERNEL = "DISPOSE_KERNEL";
export const KERNEL_AUTO_RESTARTED = "KERNEL_AUTO_RESTARTED";

export type InterruptKernel = Action <typeof INTERRUPT_KERNEL, MaybeHasContent & MaybeHasKernel>;
export type InterruptKernelSuccessful = Action <typeof INTERRUPT_KERNEL_SUCCESSFUL, MaybeHasContent & MaybeHasKernel>;
Expand All @@ -48,6 +49,7 @@ export type LaunchKernelFailed = ErrorAction<typeof LAUNCH_KERNEL_FAI
export type ShutdownReplySucceeded = Action <typeof SHUTDOWN_REPLY_SUCCEEDED, HasKernel & { content: { restart: boolean } }>;
export type ShutdownReplyTimedOut = Action <typeof SHUTDOWN_REPLY_TIMED_OUT, HasKernel>;
export type DisposeKernel = Action <typeof DISPOSE_KERNEL, HasKernel>;
export type KernelAutoRestarted = Action <typeof KERNEL_AUTO_RESTARTED, HasKernel>;

export const interruptKernel = makeActionFunction <InterruptKernel> (INTERRUPT_KERNEL);
export const interruptKernelSuccessful = makeActionFunction <InterruptKernelSuccessful> (INTERRUPT_KERNEL_SUCCESSFUL);
Expand All @@ -66,3 +68,4 @@ export const launchKernelFailed = makeErrorActionFunction <LaunchKerne
export const shutdownReplySucceeded = makeActionFunction <ShutdownReplySucceeded> (SHUTDOWN_REPLY_SUCCEEDED);
export const shutdownReplyTimedOut = makeActionFunction <ShutdownReplyTimedOut> (SHUTDOWN_REPLY_TIMED_OUT);
export const disposeKernel = makeActionFunction <DisposeKernel> (DISPOSE_KERNEL);
export const kernelAutoRestarted = makeActionFunction <KernelAutoRestarted> (KERNEL_AUTO_RESTARTED);
194 changes: 193 additions & 1 deletion packages/epics/__tests__/kernel-lifecycle.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import {
acquireKernelInfo,
launchKernelWhenNotebookSetEpic,
restartKernelEpic,
watchExecutionStateEpic
watchExecutionStateEpic,
watchForKernelAutoRestartEpic
} from "../src/kernel-lifecycle";

const buildScheduler = () =>
Expand Down Expand Up @@ -466,6 +467,197 @@ describe("watchExecutionStateEpic", () => {
});
});


describe("watchForKernelAutoRestartEpic", () => {
test("returns an empty Observable when not a jupyter host", done => {
const action$ = of({
type: actionsModule.LAUNCH_KERNEL_SUCCESSFUL,
payload: {
kernel: {
channels: of({
header: { msg_type: "status" },
content: { execution_state: "restarting" }
},
{
header: { msg_type: "status" },
content: { execution_state: "starting" }
}) as Subject<any>,
cwd: "/home/tester",
type: "websocket"
},
kernelRef: "fakeKernelRef",
contentRef: "fakeContentRef",
selectNextKernel: false
}
});
const state = {
...mockAppState({})
};
const state$ = new StateObservable<stateModule.AppState>(
new Subject(),
state
);

const obs = watchForKernelAutoRestartEpic(action$, state$);
obs.pipe(toArray()).subscribe(
// Every action that goes through should get stuck on an array
actions => {
const types = actions.map(({ type }) => type);
expect(types).toEqual([]);
},
err => done.fail(err), // It should not error in the stream
() => done()
);
});

test("returns an Observable detecting auto restarted with valid states", done => {

const action$ = of({
type: actionsModule.LAUNCH_KERNEL_SUCCESSFUL,
payload: {
kernel: {
channels: of({
header: { msg_type: "status" },
content: { execution_state: "restarting" }
},
{
header: { msg_type: "status" },
content: { execution_state: "starting" }
}) as Subject<any>,
cwd: "/home/tester",
type: "websocket"
},
kernelRef: "fakeKernelRef",
contentRef: "fakeContentRef",
selectNextKernel: false
}
});
const state = {
...mockAppState({}),
app: stateModule.makeAppRecord({
host: stateModule.makeJupyterHostRecord({})
})
};
const state$ = new StateObservable<stateModule.AppState>(
new Subject(),
state
);

const obs = watchForKernelAutoRestartEpic(action$, state$);
obs.pipe(toArray()).subscribe(
// Every action that goes through should get stuck on an array
actions => {
const types = actions.map(({ type }) => type);
expect(types).toEqual([actionsModule.KERNEL_AUTO_RESTARTED]);
},
err => done.fail(err), // It should not error in the stream
() => done()
);
});

test("returns an empty Observable when states are not valid", done => {
const action$ = of({
type: actionsModule.LAUNCH_KERNEL_SUCCESSFUL,
payload: {
kernel: {
channels: of({
header: { msg_type: "status" },
content: { execution_state: "restarting" }
},
{
header: { msg_type: "status" },
content: { execution_state: "dead" }
},
{
header: { msg_type: "status" },
content: { execution_state: "starting" }
}) as Subject<any>,
cwd: "/home/tester",
type: "websocket"
},
kernelRef: "fakeKernelRef",
contentRef: "fakeContentRef",
selectNextKernel: false
}
});
const state = {
...mockAppState({}),
app: stateModule.makeAppRecord({
host: stateModule.makeJupyterHostRecord({})
})
};
const state$ = new StateObservable<stateModule.AppState>(
new Subject(),
state
);

const obs = watchForKernelAutoRestartEpic(action$, state$);
obs.pipe(toArray()).subscribe(
// Every action that goes through should get stuck on an array
actions => {
const types = actions.map(({ type }) => type);
expect(types).toEqual([]);
},
err => done.fail(err), // It should not error in the stream
() => done()
);
});

test("on kernel error returns executeFailed action", done => {
const sent = new Subject();
const received = new Subject();
received.hasError = true;

const mockSocket = Subject.create(sent, received);

const action$ = of({
type: actionsModule.LAUNCH_KERNEL_SUCCESSFUL,
payload: {
kernel: {
channels: mockSocket,
cwd: "/home/tester",
type: "websocket"
},
kernelRef: "fakeKernelRef",
contentRef: "fakeContentRef",
selectNextKernel: false
}
});
const state = {
...mockAppState({}),
app: stateModule.makeAppRecord({
host: stateModule.makeJupyterHostRecord({})
})
};
const state$ = new StateObservable<stateModule.AppState>(
new Subject(),
state
);

const obs = watchForKernelAutoRestartEpic(action$, state$);
obs.pipe(toArray()).subscribe(
// Every action that goes through should get stuck on an array
actions => {
expect(actions).toEqual([
{
type: actionsModule.EXECUTE_FAILED,
error: true,
payload: {
code: "EXEC_WEBSOCKET_ERROR",
contentRef: "fakeContentRef",
error: new Error(
"The WebSocket connection has unexpectedly disconnected."
)
}
}
]);
},
err => done.fail(err), // It should not error in the stream
() => done()
);
});
});

describe("restartKernelEpic", () => {
test("work for outputHandling None", () => {
const contentRef = "contentRef";
Expand Down
5 changes: 4 additions & 1 deletion packages/epics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import {
acquireKernelInfoEpic,
launchKernelWhenNotebookSetEpic,
restartKernelEpic,
watchExecutionStateEpic
watchExecutionStateEpic,
watchForKernelAutoRestartEpic
} from "./kernel-lifecycle";
import { fetchKernelspecsEpic } from "./kernelspecs";
import {
Expand Down Expand Up @@ -51,6 +52,7 @@ const allEpics = [
killKernelEpic,
acquireKernelInfoEpic,
watchExecutionStateEpic,
watchForKernelAutoRestartEpic,
restartKernelEpic,
fetchKernelspecsEpic,
fetchContentEpic,
Expand Down Expand Up @@ -81,6 +83,7 @@ export {
killKernelEpic,
acquireKernelInfoEpic,
watchExecutionStateEpic,
watchForKernelAutoRestartEpic,
launchKernelWhenNotebookSetEpic,
restartKernelEpic,
fetchKernelspecsEpic,
Expand Down
73 changes: 72 additions & 1 deletion packages/epics/src/kernel-lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import {
childOf,
createMessage,
JupyterMessage,
ofMessageType
ofMessageType,
kernelStatuses,
kernelInfoRequest
} from "@nteract/messaging";
import { sendNotification } from "@nteract/mythic-notifications";
import { AnyAction } from "redux";
Expand All @@ -17,9 +19,11 @@ import {
first,
map,
mergeMap,
pairwise,
switchMap,
take,
takeUntil,
tap,
timeout
} from "rxjs/operators";

Expand Down Expand Up @@ -79,6 +83,73 @@ export const watchExecutionStateEpic = (
)
);

/**
* Jupyter has options to automatically restart the kernel on crash for a max-retry of 5 retries.
* Monitor for the kernel to have successfully restarted (sent as a "restarting" status followed by a "starting").
* If all 5 retries fail, the kernel status is reported as "dead".
*
* @oaram {ActionObservable} action$ ActionObservable for LAUNCH_KERNEL_SUCCESSFUL action
*/
export const watchForKernelAutoRestartEpic = (
action$: Observable<
actions.NewKernelAction | actions.KillKernelSuccessful
>,
state$: StateObservable<AppState>
) =>
action$.pipe(
ofType(actions.LAUNCH_KERNEL_SUCCESSFUL),
// Only accept jupyter servers for the host with this epic
filter(() => selectors.isCurrentHostJupyter(state$.value)),
switchMap(
(action: actions.NewKernelAction | actions.KillKernelSuccessful) => {
const { kernel, kernelRef, contentRef } = (action as actions.NewKernelAction).payload;

return kernel.channels.pipe(
kernelStatuses(),
pairwise(),
filter(
([previousStatus, currentStatus]: [KernelStatus, KernelStatus]) =>
previousStatus === KernelStatus.Restarting && currentStatus === KernelStatus.Starting
),
tap(() => {
// to avoid getting stuck in the "starting" state, nudge kernel with kernel_info_request to bring the status to Idle.
// TODO: test can't seem to identify next on subject. For now, check before calling
if (kernel.channels.next) {
kernel.channels.next(kernelInfoRequest());
}
}),
map(() =>
actions.kernelAutoRestarted({
kernelRef
})
),
takeUntil(
action$.pipe(
ofType(actions.KILL_KERNEL_SUCCESSFUL),
filter(
(
killAction:
| actions.KillKernelSuccessful
| actions.NewKernelAction
) => killAction.payload.kernelRef === action.payload.kernelRef
)
)
),
catchError((error: Error) => {
return of(
actions.executeFailed({
error: new Error(
"The WebSocket connection has unexpectedly disconnected."
),
code: errors.EXEC_WEBSOCKET_ERROR,
contentRef
})
);
})
);
})
);

/**
* Send a kernel_info_request to the kernel.
*
Expand Down

0 comments on commit 7fc02ba

Please sign in to comment.