Draft
Conversation
002a16e to
561194f
Compare
Signed-off-by: eric <eric117649@gmail.com>
Implements RFC 6455 WebSocket framing over TCP so YMQ can traverse WebSocket-aware proxies and firewalls without an external library. - Add WebSocketAddress struct and ws:// / wss:// parsing to Address - Implement WebSocketStream: HTTP/1.1 Upgrade handshake (client + server), binary frame encode/decode, SHA-1 + Base64 inline for Sec-WebSocket-Accept - Extend Client variant to include WebSocketStream alongside TCP and IPC - Wire WebSocket accept path into AcceptServer (binds TCP, upgrades on connect) - Wire WebSocket connect path into ConnectClient (TCP connect then HTTP upgrade) - Document libwebsockets in library_tool.sh for future wss:// support - Add address parsing tests and an end-to-end ClientServerHandshake test Closes finos#728
Introduces WebSocketSocket, a blocking Socket implementation backed by RFC 6455 WebSocket framing over TCP, and wires it into the parameterised YMQSocketTest suite alongside the existing tcp/ipc variants. All 12 socket test cases now run under ws://, including raw-client / raw-server combinations and edge-case tests (huge header, incomplete identity, big message, slow network).
- Handle fragmented messages: FIN bit parsed, continuation frames assembled in _fragmentBuffer before delivery - Respond to PING with PONG; echo CLOSE frame on receipt (RFC 6455 §5.5) - Reject frames with payload > 64 MiB to prevent OOM (UV_EPROTO) - Cap upgrade-phase read buffer at 64 KiB to prevent unbounded growth - Validate HTTP upgrade: GET method, Connection header, Sec-WebSocket-Version: 13 - Send WebSocket CLOSE frame in shutdown() before TCP FIN - Replace assert() in finishClientUpgrade/finishServerUpgrade with error returns - Fix extractHeader to tolerate headers with no space after colon (RFC 7230 OWS) - Add 10 new tests covering each fix
Moves sha1, base64Encode, generateWebSocketKey, computeWebSocketAccept, and extractHeader out of anonymous namespaces in websocket_stream.cpp and websocket_socket.cpp into a shared websocket_utils.h/.cpp. Adds a comment explaining why these are hand-rolled rather than pulled from a library.
The WebSocket transport is now implemented without libwebsockets; remove the library script support and compiled/downloaded artifacts.
The YMQ transport was crashing during tests (specifically in WebSocket transport) when a socket was disconnected exactly as an operation was being submitted or during shutdown. This was caused by the use of UV_EXIT_ON_ERROR for network errors that should be handled as disconnections rather than fatal system failures. Changes: - Added UV_ENOTCONN to the handled errors in onRead and onWriteDone. It is now treated as a graceful/aborted disconnection rather than a crash. - Updated shutdownClient to ignore UV_ENOTCONN. If a socket is already disconnected, shutting it down is now a no-op. - Refactored processSendOperation to remove UV_EXIT_ON_ERROR from write calls. It now handles immediate failures by notifying the caller through the provided callback. - Introduced a shared_ptr to manage SendMessageCallback ownership during chunked writes, ensuring the callback is executed exactly once even if an intermediate chunk fails. Safety Rationale: These changes distinguish between network volatility (expected in async IO) and actual logic bugs. Fatal system errors (like EINVAL or EBADF) still trigger UV_EXIT_ON_ERROR, while connectivity-related errors are propagated to the upper layers of the application to be handled by the socket's state machine.
This fixes the Cap'n Proto version mismatch error in macOS CI where library version (1.0.1) did not match the version of the committed generated files (1.2.0). Generated files are now correctly ignored in src/protocol/ and will be regenerated during the build process.
Browser/Pyodide environments cannot load the _ymq C extension. This adds a pure-Python implementation in scaler.io.ymq._ymq_wasm that mirrors the C extension's surface (Bytes, Message, Address, IOContext, ConnectorSocket, errors) on top of js.WebSocket and uses the same YMQ wire protocol as the native build (4-byte magic + 8-byte LE length-prefixed frames). scaler.io.ymq.__init__ dispatches to the shim when sys.platform == 'emscripten'; native imports are unchanged. BinderSocket and ConnectorSocket.bind raise NotImplementedError since browsers cannot accept inbound connections. Includes 34 unit tests covering Bytes/Message/Address surface, handshake, framing (single/split/concatenated), zero-length, recv-before-callback buffering, send-before-open queueing, shutdown, and remote close. Tests use a fake WebSocket so they run in any CPython env without a browser.
Cross-verify the pure-Python browser shim against the native _ymq C
extension so that drift in the wire protocol, error taxonomy, or API
surface fails loudly rather than silently breaking browser clients.
Three levels of parity are checked:
1. Wire-protocol constants are parsed from the authoritative C++ header
(src/cpp/scaler/ymq/configuration.h) and compared against the shim's
_MAGIC_STRING and header-format constants.
2. Module surface: every name the shim re-exports must exist on the
native module, and exception subclasses must share the same parent.
3. Value semantics: Bytes round-trip, Address scheme classification, and
ErrorCode enum values must match between the two implementations.
Pure mechanical refactor: move the 'create internal address + spawn ClientAgent thread + create SyncConnector' code path out of Client.__initialize__ into a new ClientAgentBridge abstract base with an IPCAgentBridge implementation that preserves the existing behavior exactly. Client.__initialize__, shutdown, and __destroy now delegate to the bridge. No runtime or protocol change; existing tests in tests/client and tests/io continue to pass. This sets up a follow-up commit to add an InProcessAgentBridge for the browser / Pyodide environment, where threads and IPC are unavailable but the same ClientAgent logic can run on the single available asyncio loop.
Browser clients need to express their scheduler endpoint as ws://host:port since the only transport available to a WebAssembly page is the browser's WebSocket API. Native clients gain the same capability for free (the ymq C++ layer already speaks ws:// via WebSocketStream) though no native code path currently hands a ws:// AddressConfig to it. ws/wss parse and render exactly like tcp (host:port, port required). Paths are not supported at the AddressConfig level; if needed later they should be added as an explicit optional field rather than embedded in host.
Implements ClientAgentBridge without threads or real IPC sockets so the same ClientAgent code can run in the browser. The agent coroutine runs on the user's asyncio loop and exchanges BaseMessage objects with the Client via two asyncio.Queue halves. The sync half of the connector pair blocks synchronously via pyodide.ffi.run_sync (JSPI), so Client's public sync API keeps working unchanged. - ClientAgent gains an optional internal_connector_factory parameter so the bridge can inject an in-process internal connector while the external connector still goes through the real network backend. Native path is unchanged (the parameter defaults to None). - Client dispatches to IPCAgentBridge or InProcessAgentBridge via the new create_default_bridge() factory keyed on sys.platform. - Bridge tests cover: in-process queue handoff (both directions), bind/connect no-ops, routine dispatch, shutdown/destroy sentinel propagation, platform dispatch, and surface parity between the two bridges (a missing method on either fails the parity test). Known limitation addressed in a follow-up commit: object storage is still reached via SyncObjectStorageConnector which is raw-TCP-only. Browser clients will route object I/O through a WebSocket object-storage gateway added in Layer 3.
ScalerFuture is a concurrent.futures.Future subclass, so wrapping it with asyncio.wrap_future inside __await__ lets notebook code use either the blocking .result() path or the async 'await future' path with the same object. In the browser (Pyodide) the sync path depends on JSPI; await is the natural form and doesn't require any browser capability. Native CPython behavior is unchanged — wrap_future handles the cross-thread case; existing tests/client/test_future.py still passes. Tests cover: pre-set result, mid-await result, exception propagation, cancel propagation (as asyncio.CancelledError), and a regression guard that the sync .result() / exception-raising path keeps working.
…g.__str__ Mostly IDE format-on-save output matching the repo's black/isort/flake8 configuration (line-length=120, skip-magic-trailing-comma=true, isort profile=black). Also drops an accidental duplicated (unreachable) 'return repr(self)' line that had snuck into AddressConfig.__str__. No behavior change; all wasm-branch tests still pass (74/74).
Browser Client instantiation fails fast with a clear RuntimeError when JSPI (pyodide.ffi.run_sync) is unavailable, instead of deadlocking at the first sync call. Native platforms are untouched. The check lives alongside the bridge that actually needs JSPI so it can share the same sys.platform gate and import probe. It runs once at the top of Client.__initialize__ so it fires on construction rather than at first task submit.
CI mypy run flagged the bytes literal 'b"id"' as incompatible with create_default_bridge's identity: ClientID parameter. Switch to ClientID.generate_client_id() to get the right type.
asyncio.Queue() reads the current event loop at construction time on Python 3.8 (the deprecation only landed in 3.10). The previous test constructed the queues outside any running loop, so they bound to the process default loop while the test driver ran a separate loop, causing 'Future attached to a different loop' under 3.8 CI. Set the driver's loop as the current event loop in setUp and create the queues from inside that loop so they bind correctly on every Python version we support.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Testing the feasibility of WASM support for the client, changes are applied on top of, and should be merged after #739