-
Notifications
You must be signed in to change notification settings - Fork 100
Description
这是控制台的错误
RequestChannelStream.ts:453 Uncaught TypeError: Cannot read properties of undefined (reading 'cancel')
at We.close ( RequestChannelStream.ts:453:21 )
at Oe.close ( ClientServerMultiple…ltiplexer.ts:161:14 )
at h.close ( Deferred.ts:47:9 )
at h.close ( WebsocketDuplexConnection.ts:70:11 )
at WebSocket.handleMessage ( WebsocketDuplexConnection.ts:102:12 )
Expected Behavior
希望建立连接后能够正常使用channel
Actual Behavior
实际上每次成功建立连接之后都会触发cancel即控制台错误
Steps to Reproduce
①建立连接(路由): /connection 成功
②发送首帧数据(路由): / 成功
③控制台报错 即: 问题已经产生
值得一提的是②中只有路由元数据, 数据的长度是0
import { RSocketConnector } from "@rsocket/core"
import { WebsocketClientTransport } from "@rsocket/websocket-client"
import { Buffer } from "buffer"
import { encodeCompositeMetadata, encodeRoute, WellKnownMimeType } from "rsocket-composite-metadata"
import { getToken } from "./auth"
// RSocket configuration
const WS_URL = "wss://ai.gmlee.cn/rsocket"
const SETUP_ROUTE = "/microphone/ai_translator/connection"
const CHANNEL_ROUTE = "/microphone/ai_translator"
// Audio configuration
const CHUNK_SIZE = 4096
// Translation options
export interface TranslationOptions {
language: string
onTranslation: (text: string) => void
onError: (error: string) => void
onConnectionChange: (connected: boolean) => void
}
// Connection states
export enum ConnectionState {
DISCONNECTED = "disconnected",
CONNECTING = "connecting",
CONNECTED = "connected",
ERROR = "error",
}
// Global state to track active connections and prevent multiple instances
const globalRSocketState = {
rsocket: null as any,
transport: null as any,
ws: null as WebSocket | null,
channel: null as any,
connectionState: ConnectionState.DISCONNECTED,
isRecording: false,
audioContext: null as AudioContext | null,
stream: null as MediaStream | null,
scriptNode: null as ScriptProcessorNode | null,
source: null as MediaStreamAudioSourceNode | null,
}
/**
* Clean up audio resources only
*/
function cleanupAudioResources() {
try {
// Stop audio processing
if (globalRSocketState.scriptNode) {
try {
globalRSocketState.scriptNode.disconnect()
} catch (e) {
console.error("Error disconnecting script node:", e)
}
globalRSocketState.scriptNode = null
}
if (globalRSocketState.source) {
try {
globalRSocketState.source.disconnect()
} catch (e) {
console.error("Error disconnecting source:", e)
}
globalRSocketState.source = null
}
if (globalRSocketState.audioContext) {
try {
globalRSocketState.audioContext.close()
} catch (e) {
console.error("Error closing audio context:", e)
}
globalRSocketState.audioContext = null
}
// Stop media stream
if (globalRSocketState.stream) {
try {
globalRSocketState.stream.getTracks().forEach((track) => track.stop())
} catch (e) {
console.error("Error stopping media tracks:", e)
}
globalRSocketState.stream = null
}
globalRSocketState.isRecording = false
} catch (e) {
console.error("Error during audio cleanup:", e)
}
}
/**
* Clean up all resources including connection
*/
function cleanupAllResources() {
// First clean up audio resources
cleanupAudioResources()
try {
// Clear channel reference
globalRSocketState.channel = null
// Close WebSocket if it exists and is open
if (globalRSocketState.ws && globalRSocketState.ws.readyState === WebSocket.OPEN) {
try {
// Just close the raw WebSocket directly
globalRSocketState.ws.close()
} catch (e) {
console.error("Error closing WebSocket:", e)
}
}
// Clear all references
globalRSocketState.rsocket = null
globalRSocketState.transport = null
globalRSocketState.ws = null
globalRSocketState.connectionState = ConnectionState.DISCONNECTED
} catch (e) {
console.error("Error during cleanup:", e)
}
}
/**
* Simple RSocket translation service
*/
class RSocketTranslationService {
private options: TranslationOptions | null = null
/**
* Get current connection state
*/
getConnectionState(): ConnectionState {
return globalRSocketState.connectionState
}
/**
* Connect to the RSocket server
*/
async connect(options: TranslationOptions): Promise<boolean> {
// Store options
this.options = options
// If already connecting or connected, return current state
if (globalRSocketState.connectionState === ConnectionState.CONNECTING) {
return false
}
if (globalRSocketState.connectionState === ConnectionState.CONNECTED && globalRSocketState.rsocket) {
options.onConnectionChange(true)
return true
}
// Start connecting
globalRSocketState.connectionState = ConnectionState.CONNECTING
options.onConnectionChange(false)
try {
console.log("Starting RSocket connection...")
// Get authentication token
const token = getToken()
if (!token) {
options.onError("Authentication required. Please login first.")
globalRSocketState.connectionState = ConnectionState.ERROR
options.onConnectionChange(false)
return false
}
// Prepare metadata for setup frame
const meta = {
token,
language: options.language,
}
const setupMetadata = encodeCompositeMetadata([
[WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, encodeRoute(SETUP_ROUTE)],
[WellKnownMimeType.APPLICATION_JSON, Buffer.from(JSON.stringify(meta))],
])
// Configure RSocket connection
const setup = {
dataMimeType: "application/stream+json",
metadataMimeType: WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.toString(),
keepAlive: 60000,
lifetime: 180000,
payload: {
data: undefined,
metadata: setupMetadata,
},
}
// Create WebSocket transport
const transport = new WebsocketClientTransport({
url: WS_URL,
wsCreator: (url: string) => {
const newWs = new WebSocket(url)
newWs.onopen = () => {
console.log("WebSocket connection opened")
}
newWs.onclose = (event) => {
console.log("WebSocket connection closed", event)
// Notify of disconnection
if (this.options) {
this.options.onConnectionChange(false)
}
globalRSocketState.connectionState = ConnectionState.DISCONNECTED
// Clean up resources but don't try to close the WebSocket again
cleanupAudioResources()
globalRSocketState.channel = null
globalRSocketState.rsocket = null
globalRSocketState.transport = null
globalRSocketState.ws = null
}
newWs.onerror = (error) => {
console.error("WebSocket error:", error)
if (this.options) {
this.options.onError(`WebSocket error: Connection failed`)
}
globalRSocketState.connectionState = ConnectionState.ERROR
}
globalRSocketState.ws = newWs
return newWs
},
})
globalRSocketState.transport = transport
// Create RSocket connector
const connector = new RSocketConnector({
transport: transport,
setup: setup,
})
// Connect with timeout
const connectionPromise = connector.connect()
const timeoutPromise = new Promise<any>((_, reject) =>
setTimeout(() => reject(new Error("RSocket connection timeout")), 10000),
)
const rsocket = await Promise.race([connectionPromise, timeoutPromise])
globalRSocketState.rsocket = rsocket
// Create channel immediately after connection
await this.createChannel()
globalRSocketState.connectionState = ConnectionState.CONNECTED
options.onConnectionChange(true)
return true
} catch (err: any) {
console.error("RSocket connection error:", err)
options.onError(`Connection error: ${err.message}`)
globalRSocketState.connectionState = ConnectionState.ERROR
options.onConnectionChange(false)
// Clean up any partial connection
cleanupAllResources()
return false
}
}
/**
* Create a new RSocket channel
*/
async createChannel(): Promise<boolean> {
if (!globalRSocketState.rsocket || !this.options) {
return false
}
// If channel already exists, return true
if (globalRSocketState.channel) {
return true
}
try {
const token = getToken()
if (!token) return false
const meta = {
token,
language: this.options.language,
}
const metadata = encodeCompositeMetadata([
[WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, encodeRoute(CHANNEL_ROUTE)],
[WellKnownMimeType.APPLICATION_JSON, Buffer.from(JSON.stringify(meta))],
])
// Create an empty buffer for the first message
const emptyBuffer = Buffer.alloc(0)
// Create a new channel
globalRSocketState.channel = globalRSocketState.rsocket.requestChannel({
metadata,
data: emptyBuffer,
onNext: (payload: any) => {
if (!globalRSocketState.rsocket) return
const text = payload.data.toString()
if (this.options) {
this.options.onTranslation(text)
}
},
onError: (err: Error) => {
console.error("Channel error:", err)
if (this.options) {
this.options.onError(`Translation error: ${err.message}`)
}
globalRSocketState.channel = null
// Try to recreate the channel after a short delay
setTimeout(() => {
this.createChannel().catch(console.error)
}, 3000)
},
onComplete: () => {
console.log("Channel completed")
globalRSocketState.channel = null
},
// Important: Provide a cancel function to avoid the error
cancel: () => {
console.log("Channel cancelled")
globalRSocketState.channel = null
},
})
return true
} catch (err: any) {
console.error("Error creating channel:", err)
if (this.options) {
this.options.onError(`Channel error: ${err.message}`)
}
return false
}
}
/**
* Disconnect from the RSocket server
*/
disconnect(): void {
this.stopRecording()
cleanupAllResources()
if (this.options) {
this.options.onConnectionChange(false)
}
}
/**
* Start recording and streaming audio for translation
*/
async startRecording(): Promise<boolean> {
if (globalRSocketState.connectionState !== ConnectionState.CONNECTED || !this.options) {
return false
}
// If already recording, return true
if (globalRSocketState.isRecording) {
return true
}
try {
// Ensure we have a channel
if (!globalRSocketState.channel) {
const channelCreated = await this.createChannel()
if (!channelCreated) {
return false
}
}
// Get microphone access
const stream = await navigator.mediaDevices.getUserMedia({
audio: {
channelCount: 1, // Mono
echoCancellation: true,
noiseSuppression: true,
},
})
globalRSocketState.stream = stream
// Create audio context
const audioContext = new AudioContext()
globalRSocketState.audioContext = audioContext
const source = audioContext.createMediaStreamSource(stream)
globalRSocketState.source = source
const scriptNode = audioContext.createScriptProcessor(CHUNK_SIZE, 1, 1)
globalRSocketState.scriptNode = scriptNode
// Buffer for accumulating audio data
let buffer = Buffer.alloc(0)
// Process audio data
scriptNode.onaudioprocess = (audioProcessingEvent) => {
// Skip if no RSocket connection or channel
if (globalRSocketState.connectionState !== ConnectionState.CONNECTED || !globalRSocketState.channel) return
const inputData = audioProcessingEvent.inputBuffer.getChannelData(0)
// Convert Float32Array to Int16Array (PCM)
const pcmData = new Int16Array(inputData.length)
for (let i = 0; i < inputData.length; i++) {
pcmData[i] = Math.max(-1, Math.min(1, inputData[i])) * 0x7fff
}
// Convert Int16Array to Buffer
const newData = Buffer.from(pcmData.buffer)
buffer = Buffer.concat([buffer, newData])
// Send data when buffer reaches chunk size
while (buffer.length >= CHUNK_SIZE * 2 && globalRSocketState.channel) {
// *2 because Int16Array uses 2 bytes per sample
const chunk = buffer.slice(0, CHUNK_SIZE * 2)
buffer = buffer.slice(CHUNK_SIZE * 2)
try {
// Send audio data through the channel
globalRSocketState.channel.onNext({
data: chunk,
metadata: undefined,
})
} catch (err: any) {
console.error("Error sending data:", err)
if (this.options) {
this.options.onError(`Data sending error: ${err.message}`)
}
}
}
}
// Connect nodes
source.connect(scriptNode)
scriptNode.connect(audioContext.destination)
globalRSocketState.isRecording = true
return true
} catch (err: any) {
console.error("Recording error:", err)
if (this.options) {
this.options.onError(`Recording error: ${err.message}`)
}
return false
}
}
/**
* Stop recording and streaming audio
*/
stopRecording(): void {
if (!globalRSocketState.isRecording) {
return
}
// Only clean up audio resources, keep the channel open
cleanupAudioResources()
}
/**
* Check if currently recording
*/
isCurrentlyRecording(): boolean {
return globalRSocketState.isRecording
}
/**
* Check if connected to server
*/
isConnected(): boolean {
return globalRSocketState.connectionState === ConnectionState.CONNECTED
}
/**
* Update translation language
*/
updateLanguage(language: string): void {
if (this.options) {
this.options.language = language
// If we have an active channel, close it and create a new one with the new language
if (globalRSocketState.channel) {
try {
globalRSocketState.channel.onComplete()
} catch (e) {
console.error("Error completing channel:", e)
}
globalRSocketState.channel = null
// Create a new channel with the updated language
this.createChannel().catch(console.error)
}
}
}
}
// Export singleton instance
export const rsocketTranslationService = new RSocketTranslationService()
// Add window unload handler to clean up resources
if (typeof window !== "undefined") {
window.addEventListener("beforeunload", () => {
cleanupAllResources()
})
}
Possible Solution
Your Environment
"dependencies": {
"@rsocket/core": "1.0.0-alpha.3",
"@rsocket/websocket-client": "1.0.0-alpha.3",
"buffer": "^6.0.3",
"next": "15.3.3",
"react": "^19.0.0",
"react-dom": "^19.0.0",
"rsocket-composite-metadata": "^1.0.0-alpha.3",
"rsocket-flowable": "^0.0.29-alpha.0"
}