Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(messaging): automatic chunked messaging for ports #540

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
183 changes: 183 additions & 0 deletions api/messaging/src/chunked-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import type {
Chunk,
ChunkCollectionID,
InitChunk,
MessageEventCallback,
PortName
} from "./types"
import { getExtRuntime } from "./utils"

const maxChunkSize = 4_200_000

/**
* Split large data into multiple chunks to
* bypass the browser's limit on runtime messages.
*/
export function createChunksFromData(data: unknown): Chunk[] {
// serialize data to buffer
const jsonObj = JSON.stringify(data)
const serialized = new TextEncoder().encode(jsonObj)

// split serialized data
const bytes: number[][] = []

for (let i = 0; i < serialized.length; i++) {
const chunk = Math.floor(i / maxChunkSize)

if (!bytes[chunk]) bytes[chunk] = []

bytes[chunk].push(serialized[i])
}

// create a chunk collection ID
const collectionID = Math.floor(Math.random() * 100)

// create chunks
const chunks: Chunk[] = bytes.map((byteGroup, i) => ({
name: "__PLASMO_MESSAGING_CHUNK__",
type: i === byteGroup.length - 1 ? "end" : i === 0 ? "init" : "data",
index: i,
chunkCollectionId: collectionID,
data: byteGroup
}))

// add total chunk length
const initChunk = chunks.find((chunk) => chunk.type === "init") as InitChunk

initChunk.totalChunks = chunks.length
initChunk.dataLength = serialized.length

return chunks
}

/**
* Reconstruct split data from "createChunksFromData()"
*/
export function buildDataFromChunks<T = unknown>(chunks: Chunk[]): T {
// find the init chunk
const initChunk = chunks.find((chunk) => chunk.type === "init") as InitChunk

// validate init chunk and check if
// the chunks are complete
if (
!initChunk ||
initChunk.totalChunks !== chunks.length ||
typeof initChunk.dataLength === "undefined"
) {
throw new Error(
"Failed to validate init chunk: incomplete chunk array / no data length / no init chunk"
)
}

// initialize the encoded data
const encoded = new Uint8Array(initChunk.dataLength)

// sort chunks by their index
// this is to make sure we are
// setting the encoded bytes in
// the correct order
chunks.sort((a, b) => a.index - b.index)

// set bytes
for (let i = 0; i < chunks.length; i++) {
encoded.set(chunks[i].data, chunks[i - 1]?.data?.length || 0)
}

// decode the data
const serialized = new TextDecoder().decode(encoded)
const obj: T = JSON.parse(serialized)

return obj
}

/**
* Advanced chunked streaming port extending the default
* chrome.runtime.Port
*/
export const createChunkedStreamPort = (
name: PortName
): chrome.runtime.Port => {
// connect to the port
const port = getExtRuntime().connect({ name })

// chunk map
const chunkMap = new Map<ChunkCollectionID, Chunk[]>()

// intercepted event listeners map
// Map format: key - original handler, value - interceptor
const listenerMap = new Map<MessageEventCallback, MessageEventCallback>()

// setup interceptor
return {
...port,
postMessage(message: unknown) {
// split chunks
const chunks = createChunksFromData(message)

// get if chunks are needed
// if not, just send the message
if (chunks.length >= 1) {
return port.postMessage(message)
}

// send chunks
for (let i = 0; i < chunks.length; i++) {
port.postMessage(chunks[i])
}
},
onMessage: {
...port.onMessage,
addListener(callback: MessageEventCallback) {
// interceptor for the chunks
const interceptor: MessageEventCallback = (message: Chunk, port) => {
// only handle chunks
if (message?.name !== "__PLASMO_MESSAGING_CHUNK__") {
return callback(message, port)
}

// check if a group exists for this
// chunk in the chunkMap
let group = chunkMap.get(message.chunkCollectionId)

// if the group exists, add chunk to it
// otherwise create the group
if (!!group) group.push(message)
else chunkMap.set(message.chunkCollectionId, [message])

// update group (in case it was undefined before)
group = chunkMap.get(message.chunkCollectionId)

// check if all chunks have been received
const initChunk = group.find(
(chunk) => chunk.type === "init"
) as InitChunk

if (group.length !== initChunk.totalChunks) return

// check if the listener is present
if (!listenerMap.get(callback)) return

// build message data
const data = buildDataFromChunks(group)

// call original listener to handle
// the reconstructed message
return callback(data, port)
}

// add listener
port.onMessage.addListener(interceptor)

// map listener
listenerMap.set(callback, interceptor)
},
removeListener(callback: MessageEventCallback) {
// remove listener from the original port
port.onMessage.removeListener(listenerMap.get(callback))

// remove listener from listener map
listenerMap.delete(callback)
}
}
}
}
12 changes: 6 additions & 6 deletions api/messaging/src/hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ export const useMessage = <RequestBody, ResponseBody>(
}
}

export const usePort: PlasmoMessaging.PortHook = (name) => {
export const usePort: PlasmoMessaging.PortHook = (portKey) => {
const portRef = useRef<chrome.runtime.Port>()
const reconnectRef = useRef(0)
const [data, setData] = useState()

useEffect(() => {
if (!name) {
if (!portKey) {
return null
}

const { port, disconnect } = portListen(
name,
portKey,
(msg) => {
setData(msg)
},
Expand All @@ -50,19 +50,19 @@ export const usePort: PlasmoMessaging.PortHook = (name) => {
portRef.current = port
return disconnect
}, [
name,
portKey,
reconnectRef.current // This is needed to force a new port ref
])

return {
data,
send: (body) => {
portRef.current.postMessage({
name,
name: portKey,
body
})
},
listen: (handler) => portListen(name, handler)
listen: (handler) => portListen(portKey, handler)
}
}

Expand Down
3 changes: 2 additions & 1 deletion api/messaging/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ export type {
PortName,
PortsMetadata,
MessagesMetadata,
OriginContext
OriginContext,
PortKey
} from "./types"

/**
Expand Down
28 changes: 18 additions & 10 deletions api/messaging/src/port.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
import type { PortName } from "./index"
import { createChunkedStreamPort } from "./chunked-stream"
import type { PortKey, PortName } from "./index"
import { getExtRuntime } from "./utils"

const portMap = new Map<PortName, chrome.runtime.Port>()

export const getPort = (name: PortName) => {
const port = portMap.get(name)
export const getPort = (portKey: PortKey) => {
const portName = typeof portKey === "string" ? portKey : portKey.name
const isChunked = typeof portKey !== "string" && portKey.isChunked

const port = portMap.get(portName)

if (!!port) {
return port
}
const newPort = getExtRuntime().connect({ name })
portMap.set(name, newPort)
const newPort = isChunked
? createChunkedStreamPort(portName)
: getExtRuntime().connect({ name: portName })

portMap.set(portName, newPort)
return newPort
}

export const removePort = (name: PortName) => {
portMap.delete(name)
export const removePort = (portKey: PortKey) => {
portMap.delete(typeof portKey === "string" ? portKey : portKey.name)
}

export const listen = <ResponseBody = any>(
name: PortName,
portKey: PortKey,
handler: (msg: ResponseBody) => Promise<void> | void,
onReconnect?: () => void
) => {
const port = getPort(name)
const port = getPort(portKey)

function reconnectHandler() {
removePort(name)
removePort(portKey)
onReconnect?.()
}

Expand Down
41 changes: 40 additions & 1 deletion api/messaging/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ export namespace PlasmoMessaging {
}

export interface PortHook {
<TRequestBody = Record<string, any>, TResponseBody = any>(name: PortName): {
<TRequestBody = Record<string, any>, TResponseBody = any>(
portKey: PortKey
): {
data?: TResponseBody
send: (payload: TRequestBody) => void
listen: <T = TResponseBody>(
Expand All @@ -111,3 +113,40 @@ export type OriginContext =
| "sandbox-page"
| "content-script"
| "window"

export type PortKey =
| PortName
| {
name: PortName
// Enable chunking of port data stream. This split the data into smaller chunk and stream them through the port, overcoming the port bandwidth limitation.
isChunked?: boolean
}

export type ChunkCollectionID = number

export type MessageEventCallback = (
message: unknown,
port: chrome.runtime.Port
) => void

export interface Chunk {
name: "__PLASMO_MESSAGING_CHUNK__"
type: "init" | "end" | "data"
index: number
chunkCollectionId: ChunkCollectionID
data: number[]
}

export interface InitChunk extends Chunk {
type: "init"
dataLength: number
totalChunks: number
}

export interface DataChunk extends Chunk {
type: "data"
}

export interface EndChunk extends Chunk {
type: "end"
}