Skip to content

Commit 22bb956

Browse files
committed
feat: add channel.onClose
1 parent 8dd32ea commit 22bb956

File tree

4 files changed

+131
-10
lines changed

4 files changed

+131
-10
lines changed

src/common.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ import type { MessagePort, TransferListItem } from 'node:worker_threads'
33
/** Channel for communicating between main thread and workers */
44
export interface TinypoolChannel {
55
/** Workers subscribing to messages */
6-
onMessage(callback: (message: any) => void): void
6+
onMessage?: (callback: (message: any) => void) => void
77

88
/** Called with worker's messages */
9-
postMessage(message: any): void
9+
postMessage?: (message: any) => void
10+
11+
/** Called when channel can be closed */
12+
onClose?: () => void
1013
}
1114

1215
export interface TinypoolWorker {

src/runtime/process-worker.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,20 @@ export default class ProcessWorker implements TinypoolWorker {
6464
this.process.stdout?.unpipe(process.stdout)
6565
this.process.stderr?.unpipe(process.stderr)
6666
this.port?.close()
67+
this.channel?.onClose?.()
6768
clearTimeout(sigkillTimeout)
6869
}
6970

7071
setChannel(channel: TinypoolChannel) {
72+
// Previous channel exists in non-isolated runs
73+
if (this.channel && this.channel !== channel) {
74+
this.channel.onClose?.()
75+
}
76+
7177
this.channel = channel
7278

7379
// Mirror channel's messages to process
74-
this.channel.onMessage((message: any) => {
80+
this.channel.onMessage?.((message: any) => {
7581
this.send(message)
7682
})
7783
}
@@ -115,7 +121,7 @@ export default class ProcessWorker implements TinypoolWorker {
115121
}
116122

117123
if (!data || !data.__tinypool_worker_message__) {
118-
return this.channel?.postMessage(data)
124+
return this.channel?.postMessage?.(data)
119125
}
120126

121127
if (data.source === 'pool') {

src/runtime/thread-worker.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import { fileURLToPath } from 'node:url'
22
import { type TransferListItem, Worker } from 'node:worker_threads'
3-
import { type TinypoolWorker } from '../common'
3+
import { type TinypoolWorker, type TinypoolChannel } from '../common'
44

55
export default class ThreadWorker implements TinypoolWorker {
66
name = 'ThreadWorker'
77
runtime = 'worker_threads'
88
thread!: Worker
99
threadId!: number
10+
channel?: TinypoolChannel
1011

1112
initialize(options: Parameters<TinypoolWorker['initialize']>[0]) {
1213
this.thread = new Worker(
@@ -17,7 +18,11 @@ export default class ThreadWorker implements TinypoolWorker {
1718
}
1819

1920
async terminate() {
20-
return this.thread.terminate()
21+
const output = await this.thread.terminate()
22+
23+
this.channel?.onClose?.()
24+
25+
return output
2126
}
2227

2328
postMessage(message: any, transferListItem?: Readonly<TransferListItem[]>) {
@@ -44,9 +49,24 @@ export default class ThreadWorker implements TinypoolWorker {
4449
return this.thread.unref()
4550
}
4651

47-
setChannel() {
48-
throw new Error(
49-
"{ runtime: 'worker_threads' } doesn't support channel. Use transferListItem instead."
50-
)
52+
setChannel(channel: TinypoolChannel) {
53+
if (channel.onMessage) {
54+
throw new Error(
55+
"{ runtime: 'worker_threads' } doesn't support channel.onMessage. Use transferListItem for listening to messages instead."
56+
)
57+
}
58+
59+
if (channel.postMessage) {
60+
throw new Error(
61+
"{ runtime: 'worker_threads' } doesn't support channel.postMessage. Use transferListItem for sending to messages instead."
62+
)
63+
}
64+
65+
// Previous channel exists in non-isolated runs
66+
if (this.channel && this.channel !== channel) {
67+
this.channel.onClose?.()
68+
}
69+
70+
this.channel = channel
5171
}
5272
}

test/runtime.test.ts

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,52 @@ describe('worker_threads', () => {
4646
`)
4747
expect(result).toBe(threadId)
4848
})
49+
50+
test('channel is closed when isolated', async () => {
51+
const pool = createPool({
52+
runtime: 'worker_threads',
53+
isolateWorkers: true,
54+
minThreads: 2,
55+
maxThreads: 2,
56+
})
57+
58+
const events: string[] = []
59+
60+
await pool.run('', { channel: { onClose: () => events.push('call #1') } })
61+
expect(events).toStrictEqual(['call #1'])
62+
63+
await pool.run('', { channel: { onClose: () => events.push('call #2') } })
64+
expect(events).toStrictEqual(['call #1', 'call #2'])
65+
66+
await pool.run('', { channel: { onClose: () => events.push('call #3') } })
67+
expect(events).toStrictEqual(['call #1', 'call #2', 'call #3'])
68+
69+
await pool.destroy()
70+
expect(events).toStrictEqual(['call #1', 'call #2', 'call #3'])
71+
})
72+
73+
test('channel is closed when non-isolated', async () => {
74+
const pool = createPool({
75+
runtime: 'worker_threads',
76+
isolateWorkers: false,
77+
minThreads: 2,
78+
maxThreads: 2,
79+
})
80+
81+
const events: string[] = []
82+
83+
await pool.run('', { channel: { onClose: () => events.push('call #1') } })
84+
expect(events).toStrictEqual([])
85+
86+
await pool.run('', { channel: { onClose: () => events.push('call #2') } })
87+
expect(events).toStrictEqual(['call #1'])
88+
89+
await pool.run('', { channel: { onClose: () => events.push('call #3') } })
90+
expect(events).toStrictEqual(['call #1', 'call #2'])
91+
92+
await pool.destroy()
93+
expect(events).toStrictEqual(['call #1', 'call #2', 'call #3'])
94+
})
4995
})
5096

5197
describe('child_process', () => {
@@ -152,6 +198,52 @@ describe('child_process', () => {
152198
response: 'Hello from worker',
153199
})
154200
})
201+
202+
test('channel is closed when isolated', async () => {
203+
const pool = createPool({
204+
runtime: 'child_process',
205+
isolateWorkers: true,
206+
minThreads: 2,
207+
maxThreads: 2,
208+
})
209+
210+
const events: string[] = []
211+
212+
await pool.run('', { channel: { onClose: () => events.push('call #1') } })
213+
expect(events).toStrictEqual(['call #1'])
214+
215+
await pool.run('', { channel: { onClose: () => events.push('call #2') } })
216+
expect(events).toStrictEqual(['call #1', 'call #2'])
217+
218+
await pool.run('', { channel: { onClose: () => events.push('call #3') } })
219+
expect(events).toStrictEqual(['call #1', 'call #2', 'call #3'])
220+
221+
await pool.destroy()
222+
expect(events).toStrictEqual(['call #1', 'call #2', 'call #3'])
223+
})
224+
225+
test('channel is closed when non-isolated', async () => {
226+
const pool = createPool({
227+
runtime: 'child_process',
228+
isolateWorkers: false,
229+
minThreads: 2,
230+
maxThreads: 2,
231+
})
232+
233+
const events: string[] = []
234+
235+
await pool.run('', { channel: { onClose: () => events.push('call #1') } })
236+
expect(events).toStrictEqual([])
237+
238+
await pool.run('', { channel: { onClose: () => events.push('call #2') } })
239+
expect(events).toStrictEqual(['call #1'])
240+
241+
await pool.run('', { channel: { onClose: () => events.push('call #3') } })
242+
expect(events).toStrictEqual(['call #1', 'call #2'])
243+
244+
await pool.destroy()
245+
expect(events).toStrictEqual(['call #1', 'call #2', 'call #3'])
246+
})
155247
})
156248

157249
test('runtime can be changed after recycle', async () => {

0 commit comments

Comments
 (0)