Skip to content

rsocket-core: 每次建立连接之后就会被cancel #291

@Jason8080

Description

@Jason8080

这是控制台的错误
RequestChannelStream.ts:453 Uncaught TypeError: Cannot read properties of undefined (reading 'cancel')
at We.close ( RequestChannelStream.ts:453:21 )
at Oe.close ( ClientServerMultiple…ltiplexer.ts:161:14 )
at h.close ( Deferred.ts:47:9 )
at h.close ( WebsocketDuplexConnection.ts:70:11 )
at WebSocket.handleMessage ( WebsocketDuplexConnection.ts:102:12 )

Expected Behavior

希望建立连接后能够正常使用channel

Actual Behavior

实际上每次成功建立连接之后都会触发cancel即控制台错误

Steps to Reproduce

①建立连接(路由): /connection 成功
②发送首帧数据(路由): / 成功
③控制台报错 即: 问题已经产生

值得一提的是②中只有路由元数据, 数据的长度是0

import { RSocketConnector } from "@rsocket/core"
import { WebsocketClientTransport } from "@rsocket/websocket-client"
import { Buffer } from "buffer"
import { encodeCompositeMetadata, encodeRoute, WellKnownMimeType } from "rsocket-composite-metadata"
import { getToken } from "./auth"

// RSocket configuration
const WS_URL = "wss://ai.gmlee.cn/rsocket"
const SETUP_ROUTE = "/microphone/ai_translator/connection"
const CHANNEL_ROUTE = "/microphone/ai_translator"

// Audio configuration
const CHUNK_SIZE = 4096

// Translation options
export interface TranslationOptions {
  language: string
  onTranslation: (text: string) => void
  onError: (error: string) => void
  onConnectionChange: (connected: boolean) => void
}

// Connection states
export enum ConnectionState {
  DISCONNECTED = "disconnected",
  CONNECTING = "connecting",
  CONNECTED = "connected",
  ERROR = "error",
}

// Global state to track active connections and prevent multiple instances
const globalRSocketState = {
  rsocket: null as any,
  transport: null as any,
  ws: null as WebSocket | null,
  channel: null as any,
  connectionState: ConnectionState.DISCONNECTED,
  isRecording: false,
  audioContext: null as AudioContext | null,
  stream: null as MediaStream | null,
  scriptNode: null as ScriptProcessorNode | null,
  source: null as MediaStreamAudioSourceNode | null,
}

/**
 * Clean up audio resources only
 */
function cleanupAudioResources() {
  try {
    // Stop audio processing
    if (globalRSocketState.scriptNode) {
      try {
        globalRSocketState.scriptNode.disconnect()
      } catch (e) {
        console.error("Error disconnecting script node:", e)
      }
      globalRSocketState.scriptNode = null
    }

    if (globalRSocketState.source) {
      try {
        globalRSocketState.source.disconnect()
      } catch (e) {
        console.error("Error disconnecting source:", e)
      }
      globalRSocketState.source = null
    }

    if (globalRSocketState.audioContext) {
      try {
        globalRSocketState.audioContext.close()
      } catch (e) {
        console.error("Error closing audio context:", e)
      }
      globalRSocketState.audioContext = null
    }

    // Stop media stream
    if (globalRSocketState.stream) {
      try {
        globalRSocketState.stream.getTracks().forEach((track) => track.stop())
      } catch (e) {
        console.error("Error stopping media tracks:", e)
      }
      globalRSocketState.stream = null
    }

    globalRSocketState.isRecording = false
  } catch (e) {
    console.error("Error during audio cleanup:", e)
  }
}

/**
 * Clean up all resources including connection
 */
function cleanupAllResources() {
  // First clean up audio resources
  cleanupAudioResources()

  try {
    // Clear channel reference
    globalRSocketState.channel = null

    // Close WebSocket if it exists and is open
    if (globalRSocketState.ws && globalRSocketState.ws.readyState === WebSocket.OPEN) {
      try {
        // Just close the raw WebSocket directly
        globalRSocketState.ws.close()
      } catch (e) {
        console.error("Error closing WebSocket:", e)
      }
    }

    // Clear all references
    globalRSocketState.rsocket = null
    globalRSocketState.transport = null
    globalRSocketState.ws = null
    globalRSocketState.connectionState = ConnectionState.DISCONNECTED
  } catch (e) {
    console.error("Error during cleanup:", e)
  }
}

/**
 * Simple RSocket translation service
 */
class RSocketTranslationService {
  private options: TranslationOptions | null = null

  /**
   * Get current connection state
   */
  getConnectionState(): ConnectionState {
    return globalRSocketState.connectionState
  }

  /**
   * Connect to the RSocket server
   */
  async connect(options: TranslationOptions): Promise<boolean> {
    // Store options
    this.options = options

    // If already connecting or connected, return current state
    if (globalRSocketState.connectionState === ConnectionState.CONNECTING) {
      return false
    }

    if (globalRSocketState.connectionState === ConnectionState.CONNECTED && globalRSocketState.rsocket) {
      options.onConnectionChange(true)
      return true
    }

    // Start connecting
    globalRSocketState.connectionState = ConnectionState.CONNECTING
    options.onConnectionChange(false)

    try {
      console.log("Starting RSocket connection...")

      // Get authentication token
      const token = getToken()
      if (!token) {
        options.onError("Authentication required. Please login first.")
        globalRSocketState.connectionState = ConnectionState.ERROR
        options.onConnectionChange(false)
        return false
      }

      // Prepare metadata for setup frame
      const meta = {
        token,
        language: options.language,
      }

      const setupMetadata = encodeCompositeMetadata([
        [WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, encodeRoute(SETUP_ROUTE)],
        [WellKnownMimeType.APPLICATION_JSON, Buffer.from(JSON.stringify(meta))],
      ])

      // Configure RSocket connection
      const setup = {
        dataMimeType: "application/stream+json",
        metadataMimeType: WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.toString(),
        keepAlive: 60000,
        lifetime: 180000,
        payload: {
          data: undefined,
          metadata: setupMetadata,
        },
      }

      // Create WebSocket transport
      const transport = new WebsocketClientTransport({
        url: WS_URL,
        wsCreator: (url: string) => {
          const newWs = new WebSocket(url)

          newWs.onopen = () => {
            console.log("WebSocket connection opened")
          }

          newWs.onclose = (event) => {
            console.log("WebSocket connection closed", event)

            // Notify of disconnection
            if (this.options) {
              this.options.onConnectionChange(false)
            }

            globalRSocketState.connectionState = ConnectionState.DISCONNECTED

            // Clean up resources but don't try to close the WebSocket again
            cleanupAudioResources()
            globalRSocketState.channel = null
            globalRSocketState.rsocket = null
            globalRSocketState.transport = null
            globalRSocketState.ws = null
          }

          newWs.onerror = (error) => {
            console.error("WebSocket error:", error)
            if (this.options) {
              this.options.onError(`WebSocket error: Connection failed`)
            }
            globalRSocketState.connectionState = ConnectionState.ERROR
          }

          globalRSocketState.ws = newWs
          return newWs
        },
      })

      globalRSocketState.transport = transport

      // Create RSocket connector
      const connector = new RSocketConnector({
        transport: transport,
        setup: setup,
      })

      // Connect with timeout
      const connectionPromise = connector.connect()
      const timeoutPromise = new Promise<any>((_, reject) =>
        setTimeout(() => reject(new Error("RSocket connection timeout")), 10000),
      )

      const rsocket = await Promise.race([connectionPromise, timeoutPromise])
      globalRSocketState.rsocket = rsocket

      // Create channel immediately after connection
      await this.createChannel()

      globalRSocketState.connectionState = ConnectionState.CONNECTED
      options.onConnectionChange(true)
      return true
    } catch (err: any) {
      console.error("RSocket connection error:", err)
      options.onError(`Connection error: ${err.message}`)
      globalRSocketState.connectionState = ConnectionState.ERROR
      options.onConnectionChange(false)

      // Clean up any partial connection
      cleanupAllResources()
      return false
    }
  }

  /**
   * Create a new RSocket channel
   */
  async createChannel(): Promise<boolean> {
    if (!globalRSocketState.rsocket || !this.options) {
      return false
    }

    // If channel already exists, return true
    if (globalRSocketState.channel) {
      return true
    }

    try {
      const token = getToken()
      if (!token) return false

      const meta = {
        token,
        language: this.options.language,
      }

      const metadata = encodeCompositeMetadata([
        [WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, encodeRoute(CHANNEL_ROUTE)],
        [WellKnownMimeType.APPLICATION_JSON, Buffer.from(JSON.stringify(meta))],
      ])

      // Create an empty buffer for the first message
      const emptyBuffer = Buffer.alloc(0)

      // Create a new channel
      globalRSocketState.channel = globalRSocketState.rsocket.requestChannel({
        metadata,
        data: emptyBuffer,
        onNext: (payload: any) => {
          if (!globalRSocketState.rsocket) return

          const text = payload.data.toString()
          if (this.options) {
            this.options.onTranslation(text)
          }
        },
        onError: (err: Error) => {
          console.error("Channel error:", err)
          if (this.options) {
            this.options.onError(`Translation error: ${err.message}`)
          }
          globalRSocketState.channel = null

          // Try to recreate the channel after a short delay
          setTimeout(() => {
            this.createChannel().catch(console.error)
          }, 3000)
        },
        onComplete: () => {
          console.log("Channel completed")
          globalRSocketState.channel = null
        },
        // Important: Provide a cancel function to avoid the error
        cancel: () => {
          console.log("Channel cancelled")
          globalRSocketState.channel = null
        },
      })

      return true
    } catch (err: any) {
      console.error("Error creating channel:", err)
      if (this.options) {
        this.options.onError(`Channel error: ${err.message}`)
      }
      return false
    }
  }

  /**
   * Disconnect from the RSocket server
   */
  disconnect(): void {
    this.stopRecording()
    cleanupAllResources()

    if (this.options) {
      this.options.onConnectionChange(false)
    }
  }

  /**
   * Start recording and streaming audio for translation
   */
  async startRecording(): Promise<boolean> {
    if (globalRSocketState.connectionState !== ConnectionState.CONNECTED || !this.options) {
      return false
    }

    // If already recording, return true
    if (globalRSocketState.isRecording) {
      return true
    }

    try {
      // Ensure we have a channel
      if (!globalRSocketState.channel) {
        const channelCreated = await this.createChannel()
        if (!channelCreated) {
          return false
        }
      }

      // Get microphone access
      const stream = await navigator.mediaDevices.getUserMedia({
        audio: {
          channelCount: 1, // Mono
          echoCancellation: true,
          noiseSuppression: true,
        },
      })
      globalRSocketState.stream = stream

      // Create audio context
      const audioContext = new AudioContext()
      globalRSocketState.audioContext = audioContext

      const source = audioContext.createMediaStreamSource(stream)
      globalRSocketState.source = source

      const scriptNode = audioContext.createScriptProcessor(CHUNK_SIZE, 1, 1)
      globalRSocketState.scriptNode = scriptNode

      // Buffer for accumulating audio data
      let buffer = Buffer.alloc(0)

      // Process audio data
      scriptNode.onaudioprocess = (audioProcessingEvent) => {
        // Skip if no RSocket connection or channel
        if (globalRSocketState.connectionState !== ConnectionState.CONNECTED || !globalRSocketState.channel) return

        const inputData = audioProcessingEvent.inputBuffer.getChannelData(0)

        // Convert Float32Array to Int16Array (PCM)
        const pcmData = new Int16Array(inputData.length)
        for (let i = 0; i < inputData.length; i++) {
          pcmData[i] = Math.max(-1, Math.min(1, inputData[i])) * 0x7fff
        }

        // Convert Int16Array to Buffer
        const newData = Buffer.from(pcmData.buffer)
        buffer = Buffer.concat([buffer, newData])

        // Send data when buffer reaches chunk size
        while (buffer.length >= CHUNK_SIZE * 2 && globalRSocketState.channel) {
          // *2 because Int16Array uses 2 bytes per sample
          const chunk = buffer.slice(0, CHUNK_SIZE * 2)
          buffer = buffer.slice(CHUNK_SIZE * 2)

          try {
            // Send audio data through the channel
            globalRSocketState.channel.onNext({
              data: chunk,
              metadata: undefined,
            })
          } catch (err: any) {
            console.error("Error sending data:", err)
            if (this.options) {
              this.options.onError(`Data sending error: ${err.message}`)
            }
          }
        }
      }

      // Connect nodes
      source.connect(scriptNode)
      scriptNode.connect(audioContext.destination)

      globalRSocketState.isRecording = true
      return true
    } catch (err: any) {
      console.error("Recording error:", err)
      if (this.options) {
        this.options.onError(`Recording error: ${err.message}`)
      }
      return false
    }
  }

  /**
   * Stop recording and streaming audio
   */
  stopRecording(): void {
    if (!globalRSocketState.isRecording) {
      return
    }

    // Only clean up audio resources, keep the channel open
    cleanupAudioResources()
  }

  /**
   * Check if currently recording
   */
  isCurrentlyRecording(): boolean {
    return globalRSocketState.isRecording
  }

  /**
   * Check if connected to server
   */
  isConnected(): boolean {
    return globalRSocketState.connectionState === ConnectionState.CONNECTED
  }

  /**
   * Update translation language
   */
  updateLanguage(language: string): void {
    if (this.options) {
      this.options.language = language

      // If we have an active channel, close it and create a new one with the new language
      if (globalRSocketState.channel) {
        try {
          globalRSocketState.channel.onComplete()
        } catch (e) {
          console.error("Error completing channel:", e)
        }
        globalRSocketState.channel = null

        // Create a new channel with the updated language
        this.createChannel().catch(console.error)
      }
    }
  }
}

// Export singleton instance
export const rsocketTranslationService = new RSocketTranslationService()

// Add window unload handler to clean up resources
if (typeof window !== "undefined") {
  window.addEventListener("beforeunload", () => {
    cleanupAllResources()
  })
}

Possible Solution

Your Environment

"dependencies": {
"@rsocket/core": "1.0.0-alpha.3",
"@rsocket/websocket-client": "1.0.0-alpha.3",
"buffer": "^6.0.3",
"next": "15.3.3",
"react": "^19.0.0",
"react-dom": "^19.0.0",
"rsocket-composite-metadata": "^1.0.0-alpha.3",
"rsocket-flowable": "^0.0.29-alpha.0"
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    needs triageIssue/PR needs triage by a project maintainer

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions