Skip to content

Commit b0f41a1

Browse files
committed
feat: add channel.onClose
1 parent 8dd32ea commit b0f41a1

File tree

3 files changed

+39
-10
lines changed

3 files changed

+39
-10
lines changed

src/common.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ import type { MessagePort, TransferListItem } from 'node:worker_threads'
33
/** Channel for communicating between main thread and workers */
44
export interface TinypoolChannel {
55
/** Workers subscribing to messages */
6-
onMessage(callback: (message: any) => void): void
6+
onMessage?: (callback: (message: any) => void) => void
77

88
/** Called with worker's messages */
9-
postMessage(message: any): void
9+
postMessage?: (message: any) => void
10+
11+
/** Called when channel can be closed */
12+
onClose?: () => void
1013
}
1114

1215
export interface TinypoolWorker {

src/runtime/process-worker.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,20 @@ export default class ProcessWorker implements TinypoolWorker {
6464
this.process.stdout?.unpipe(process.stdout)
6565
this.process.stderr?.unpipe(process.stderr)
6666
this.port?.close()
67+
this.channel?.onClose?.()
6768
clearTimeout(sigkillTimeout)
6869
}
6970

7071
setChannel(channel: TinypoolChannel) {
72+
// Previous channel exists in non-isolated runs
73+
if (this.channel && this.channel !== channel) {
74+
this.channel.onClose?.()
75+
}
76+
7177
this.channel = channel
7278

7379
// Mirror channel's messages to process
74-
this.channel.onMessage((message: any) => {
80+
this.channel.onMessage?.((message: any) => {
7581
this.send(message)
7682
})
7783
}
@@ -115,7 +121,7 @@ export default class ProcessWorker implements TinypoolWorker {
115121
}
116122

117123
if (!data || !data.__tinypool_worker_message__) {
118-
return this.channel?.postMessage(data)
124+
return this.channel?.postMessage?.(data)
119125
}
120126

121127
if (data.source === 'pool') {

src/runtime/thread-worker.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import { fileURLToPath } from 'node:url'
22
import { type TransferListItem, Worker } from 'node:worker_threads'
3-
import { type TinypoolWorker } from '../common'
3+
import { type TinypoolWorker, type TinypoolChannel } from '../common'
44

55
export default class ThreadWorker implements TinypoolWorker {
66
name = 'ThreadWorker'
77
runtime = 'worker_threads'
88
thread!: Worker
99
threadId!: number
10+
channel?: TinypoolChannel
1011

1112
initialize(options: Parameters<TinypoolWorker['initialize']>[0]) {
1213
this.thread = new Worker(
@@ -17,7 +18,11 @@ export default class ThreadWorker implements TinypoolWorker {
1718
}
1819

1920
async terminate() {
20-
return this.thread.terminate()
21+
const output = await this.thread.terminate()
22+
23+
this.channel?.onClose?.()
24+
25+
return output
2126
}
2227

2328
postMessage(message: any, transferListItem?: Readonly<TransferListItem[]>) {
@@ -44,9 +49,24 @@ export default class ThreadWorker implements TinypoolWorker {
4449
return this.thread.unref()
4550
}
4651

47-
setChannel() {
48-
throw new Error(
49-
"{ runtime: 'worker_threads' } doesn't support channel. Use transferListItem instead."
50-
)
52+
setChannel(channel: TinypoolChannel) {
53+
if (channel.onMessage) {
54+
throw new Error(
55+
"{ runtime: 'worker_threads' } doesn't support channel.onMessage. Use transferListItem for listening to messages instead."
56+
)
57+
}
58+
59+
if (channel.postMessage) {
60+
throw new Error(
61+
"{ runtime: 'worker_threads' } doesn't support channel.postMessage. Use transferListItem for sending to messages instead."
62+
)
63+
}
64+
65+
// Previous channel exists in non-isolated runs
66+
if (this.channel && this.channel !== channel) {
67+
this.channel.onClose?.()
68+
}
69+
70+
this.channel = channel
5171
}
5272
}

0 commit comments

Comments
 (0)