-
Notifications
You must be signed in to change notification settings - Fork 25
[RelayMiner] Streaming Support for SSE and NDJSON (used for LLMs) #1570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Hybrid Flow: Between Sync and Async
This is a very interesting use case that lands somewhere between traditional synchronous and asynchronous workflows.
It introduces flexibility but also surfaces protocol and integration gaps that need to be addressed to make it robust.
⚙️ Protocol Weaknesses & Gateway Limitations
Despite the potential, the current support is not ideal:
- Streaming behavior doesn’t cleanly map to the "synchronous" protocol expectations.
- The protocol needs stronger guarantees to handle streaming deterministically (i.e. Knows before hand and not after the fact that it's a streamed response)
- On the integration side, the
Gateway(PATH) needs to implement this flow.
🔍 Observations from the Current Behavior
From what I understand of the implementation:
-
Each streamed chunk is individually wrapped in a signed
RelayResponse.- However, only the last chunk is mined.
-
This behavior likely confuses the Gateway:
- Does it unwrap and stream each chunk as it arrives?
- My suspicion: the first
RelayResponsegets unwrapped, while the rest are forwarded as-is.
📡 Need for Explicit Flow Awareness in PATH
Just like with WebSocket relays, the Gateway needs to:
- Be explicitly aware it's dealing with a request that will have a streamed response.
- Handle the flow consistently, both in terms of routing and unwrapping.
Without that awareness, the Gateway might misinterpret intermediate chunks or mining status.
🚫 Why Implicit Detection via Content-Type Falls Short
I’m not a big fan of dynamically switching flows based on backend response Content-Type. Here’s why:
- It’s non-deterministic: both the
Gatewayand theRelayMinerlearn the flow type only after receiving a response. - That introduces brittleness and makes reasoning about the system harder.
✅ A Better Path: Deterministic Flow Declaration
This feels like a solid use case for introducing a dedicated RPCType and deterministic request routing [1]
- The
Gatewayshould be able to infer the streaming nature of the call from the request itself (via request path, headers, or query params). - It then passes this context to the
RelayMiner, which selects the appropriate execution flow.
🔄 Response Handling Pipeline (Streamed)
This new flow would allow the RelayMiner to Properly mine the produced Relays
And the Gateway to:
- Validate each chunk’s
RelayResponsesignature. - Unwrap it safely.
- Write the payload to the client’s stream without mistaking the first response as final.
That would enable proper streaming while preserving the protocol’s guarantees.
🚨 Final Concern: Mining Abuse
As with WebSockets, this opens the door to relay stuffing and fake chunk mining:
- We need to define clear mining semantics for streaming workflows.
- Prevent suppliers from exploiting chunked delivery to inflate mined relays.
cc @commoddity, @Olshansk, @adshmh
[1] https://www.notion.so/buildwithgrove/Deterministic-request-routing-1a0a36edfff68037ac2bef453d793f33
|
Thanks for the feedback @red-0ne , letme make some comments Main thoughtsThere is difference between this implementation and a web-socket. The use of multiple signatures, one per chunk, roots in the need of showing the App that the chuck they received was sent by the Supplier and the connection was not modified. The fact that we use the last package to mine is arbitrary, as any piece of payload (let it be the whole payload or a chunk) would suffice to make the inclusion. Subsequent inclusions to the tree that origin from the same app request would be the same as replay attacks (an scenario that exists regardless this functionality) The problem of non-deterministic responsesThere is no way of knowing if a backend will provide a streaming response of this kind in advance. Some services (like OpenAI) are non-deterministic, receiving or not a streaming response depends on a parameter embedded in the request JSON payload. So, the gateway would need to know all services specifications in order to be able to call the service with the stream support, adding complexity (and non deterministic behavior) to the gateway and making service staking more complex (multiple protocol supports for same backend). The larger problemIf we go deeper into the non-deterministic problems, the current handling of requests if highly non-deterministic from a resource perspective. Now, I don't expect to make such a wild change at this point to the RM -> App communication (we have other pressing issues), but the stream protocol being implemented here could be the new standard for their communication. On the integration sideThis PR includes example code for the reception of the streamed response, these can be implemented into PATH or any other gateway implementation. I don't see this as an issue, this is just a protocol upgrade that the different gateway clients could support or not. |
|
@RawthiL thanks a lot for your comments, they highlight some very pertinent point that fully agree with. 1 · Why chunked must be the defaultI fully agree that Gateway ↔ RelayMiner traffic should be exclusively chunked. I though about adding classing streaming to spare the RelayMiner from buffering large responses, but it still forces the Gateway to buffer everything before signature verification. A dedicated chunked protocol removes that bottleneck and lets us scale to more heterogeneous services. Let's push to have that on both 2 · RelayMiner must stay content-agnosticThe RelayMiner should ignore downstream headers and payload details. Relying on Again, streaming signed chunks solves this problem. 3 · Fixing mining semanticsThe current PR mines only the last chunk; this doesn't align with our current assumption that we require the entire response to be verifiable on-chain. While this is needed for It is also future proof, when we solve the full |
c4f751a to
e2031e0
Compare
7360240 to
5842f9b
Compare
5842f9b to
d891753
Compare
|
@RawthiL I synched on this with @red-0ne and here's where we landed:
|
d891753 to
fd5a740
Compare
|
I'll look forward to see the long term solution, which I hope covers the needs of this issue. I agree that the current PR is not a generalist solution nor compatible with a fully-chunked protocol solution. Given that the current spirit of the protocol is "move fast break things" I only see a net positive in including this, because:
When the long term solution arrives we can remove all this code, and it wont produce much overhead, as the new approach/solution will most likely mean that the whole |
|
@RawthiL No updates. ETA is 3-4 weeks from now. The entire's team focus right now is session rollovers. It's what we need for enterprise-grade QoS, which is a precursor to increase sales, which is a precursor to getting all suppliers more POKT/earnings. |
0eb11bd to
cee9aa7
Compare
cee9aa7 to
4096fd7
Compare
|
Re-based this to latest main version. While this is not a permanent solution is a patch that wont affect PATH, as blockchain RPC or non-SSE/NDJSON streams won't modify the flow. A more robust solution should be done later, but it will be a breaking change for PATH as it will affect any relay regardless of payload. Even if I work on that today, it will require a lot of coordination with the Grove team to implement.
|
|
Ack the review request. I see @Olshansk has this on his plate for review this week, so will hold off until he's taken a pass at it. |
|
lol I'm getting sick of this, waited months, now the whole sync is changing. |
5a5f04c to
0836e61
Compare
|
I dropped the whole changes for Streaming is working, with my old test code, but the normal relays are not. Current version of main broke the timeouts, probably at this hard-code of headers timeout: poktroll/pkg/network/http/http_client.go Line 106 in 993412b
Because, the client says it timeout after I will probably have to wait even more to have this merged, because now we are worst than before. |
|
tl;dr We're prioritizing RelayMiner performance on basic RPCs before we add features. @RawthiL Apoligies for the delay. The rationale here is to prioritize RelayMiner performance before bloating it with features. The priority is: My concern is:
Concrete ask: The beauty of OSS is the ability to maintain forks and sync them until the time is right. Concrete ask: @RawthiL What can we do to make ☝️ possible? |
|
tl;dr basic RPCs performance on the RelayMiner is degraded in current main, not for streamed RPCs, for all RPCs. This PR has nothing to do with basic RPCs. Streaming is working. @Olshansk please tell me the root of you concern since this PR:
Concrete ask: The beauty of OSS is the ability to merge contributions. Concrete ask: @Olshansk What can we do to make ☝️ possible? |
Olshansk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RawthiL Thank you for your patience!
This is going to be very important very soon, so I review it, suggesting some changes and will outline next steps.
- See my review in #1805. Feel free to edit/modify it as need be.
- Once it looks good to you, let's squash & merge it into this PR.
- Then, my request is that you add some tooling/configs/make targets, etc so we can test this out in LocalNet.
- I'm hoping that the config changes in #1805 can act as a reference to kick things off.
Happy to provide more details/support but trying to keep this short. Lmk!
|
@RawthiL Bump on this branch. Let's iterate on it if you want to get it into the next release |
f61ad03 to
e98915f
Compare
|
Updated this to latest branch and recovered the Also included a fix to the signer creation, that appeared with the CGO change. |
Olshansk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RawthiL Let's be iterative on this but I want to push this through.
- Can you PTAL at #1805
- Review, edit, update #1805. Once it looks good to you, squash & merge it into this.
- Please add the new command you suggested.
- Please verify
make send_relay_path_RESTandmake pocketd_relayminer_relay_JSONRPCwork for you as well. - Re-request a review here and I'll review, edit and approve to get it over the finish line.
…1805) Reviewing `Streaming Support for SSE and NDJSON (used for LLMs)` in #1750. **Next steps**: - Outlined in this comment: #1570 (review) **Reproducing this locally (after running localnet)**: ```bash make send_relay_path_REST make pocketd_relayminer_relay_JSONRP ``` **Changes made**: - Updated the LocalNet toolchain to make it easier to test REST & LLM services - Added `make pocketd_relayminer_relay_JSONRPC` and revived `make send_relay_path_REST` - Reviewing streaming RelayMiner changes in [#1570](#1570) for code health & maintainbility - Moved logic from `sync.go` into `server.handleHttp` to follow a similar pattern as `server.handleHttpStream` --- The image below shows the result in LocalNet: <img width="3552" height="1527" alt="Screenshot 2025-09-30 at 8 10 22 PM" src="https://github.com/user-attachments/assets/bcf9b2dc-5560-4ab4-b06f-c1ebdfaaaa93" /> --- Co-authored-by: Ramiro Rodriguez Colmeiro <[email protected]>
cc: @Olshansk |
…1805) Reviewing `Streaming Support for SSE and NDJSON (used for LLMs)` in #1750. **Next steps**: - Outlined in this comment: #1570 (review) **Reproducing this locally (after running localnet)**: ```bash make send_relay_path_REST make pocketd_relayminer_relay_JSONRP ``` **Changes made**: - Updated the LocalNet toolchain to make it easier to test REST & LLM services - Added `make pocketd_relayminer_relay_JSONRPC` and revived `make send_relay_path_REST` - Reviewing streaming RelayMiner changes in [#1570](#1570) for code health & maintainbility - Moved logic from `sync.go` into `server.handleHttp` to follow a similar pattern as `server.handleHttpStream` --- The image below shows the result in LocalNet: <img width="3552" height="1527" alt="Screenshot 2025-09-30 at 8 10 22 PM" src="https://github.com/user-attachments/assets/bcf9b2dc-5560-4ab4-b06f-c1ebdfaaaa93" /> --- Co-authored-by: Ramiro Rodriguez Colmeiro <[email protected]>
b61b4dc to
c6a77bd
Compare
Olshansk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RawthiL Please re-review your changes.
I pulled. Tried to run it. It's broken.
I haven't had a chance to debug yet.
|
My steps were simply:
Please make sure:
|
33a4198 to
d90614d
Compare
|
Rebased to last main version. make localnet_up
make acc_initialize_pubkeys
make pocketd_relayminer_relay_NDJSONRPCStream output➜ make pocketd_relayminer_relay_NDJSONRPC
+---------------------------------------------------------------------------------------+
| |
| 🚨 IMPORTANT: Please run the following make command after the network has started: 🚨 |
| |
| make acc_initialize_pubkeys POCKET_NODE=http://localhost:26657 |
| |
| This is required for the following scenarios: |
| - Running Localnet |
| - Running E2E tests |
| |
| 💡 If you receive the following error response when sending a relay: |
| |
| 'Failed to receive any response from endpoints. This could be due to |
| network issues or high load. Please try again.' |
| |
| You probably forgot to run 'make acc_initialize_pubkeys'. |
+---------------------------------------------------------------------------------------+
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"About to send 1 relay(s) to supplier '' for app 'pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst'"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ gRPC connection initialized: &{0xc00220c140 0x5a5880 localhost:9090 {{dns <nil> /localhost:9090 false false }} localhost:9090 {<nil> <nil> [] [] <nil> <nil> {{1000000000 1.6 0.2 120000000000}} false false 0 <nil> {grpc-go/1.72.0 <nil> false [] {} <nil> {0 0 false} [] 0 0 32768 32768 false <nil> <nil> 0xc000829440} [] <nil> false false false <nil> <nil> <nil> [] 1800000000000 dns 5 false true} 0xc001f7a240 0x96ffc80 0xc0023081e0 0xc00223c240 0xc0025850b0 0xc002517860 {{{{} {0 0}} 0 0 {{} 0} {{} 0}} 0xc000534770} {<nil>} {{{} {0 0}} 0 0 {{} 0} {{} 0}} 0xc0012d4600 0xc001f7e500 <nil> map[] {0 0 false} 0xc0025178c0 {{} {0 0}} <nil>}"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Node status fetcher initialized"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Account client initialized"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Application client initialized"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Application fetched: {pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst 100000068upokt [service_id:\"ollama\" ] [pokt15vzxjqklzjtlz7lahe8z2dfe9nm5vxwwmscne4] map[] 0 <nil>}"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Service identified: 'ollama'"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Application ring created"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Block height retrieved: 52"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Session with id e6fe4b08c3a1aa1da4d4d3657899ebad37fba41a6dec67417d880d044adfc489 at height \t52 fetched for app pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst and service ID ollama with 1 suppliers"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ 1 endpoints fetched"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ No supplier specified, randomly selected endpoint: {{pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst ollama e6fe4b08c3a1aa1da4d4d3657899ebad37fba41a6dec67417d880d044adfc489 51 60} 4 {http://relayminer1:8545 4 []} pokt19a3t4yunp0dlpfjrp7qwnzwlrzd5fzs2gjaaaj}"}
{"level":"warn","time":"2025-10-29T11:46:48-03:00","message":"⚠️ Using override endpoint URL: http://localhost:8085/v1/chat/completions"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Retrieved private key for app pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Endpoint URL parsed: http://localhost:8085/v1/chat/completions"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ JSON-RPC request payload serialized."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Relay request built."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"⏱️ Request building duration: 67.821µs"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Relay request signed."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"⏱️ Request signing duration: 3.615394ms"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Relay request marshaled."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"⏱️ Request marshalling duration: 3.104µs"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"⏱️ Request sending duration: 889.373942ms"}
{"level":"warn","time":"2025-10-29T11:46:48-03:00","message":"⚠️ This is a streaming response, if more than one relay was requested the log data can be inconsistent."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"🌊 Handling streaming response with status:"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"🔍 Detected SSE stream, we will try to unmarshal."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:1 role:assistant] finish_reason:<nil> index:0]] created:1.761749208e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749208e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:2 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:3 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:4 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:5 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:6 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:7 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:8 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:9 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 462"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: role:assistant] finish_reason:length index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 248"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ SSE Done"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"} |
303d9ee to
6c49427
Compare
Summary
Added streaming support for contentypes:
These are streams delimited by newlines (
\n) that can be supported by Pocket Network without any change to the verification and accounting logic.The first one,
text/event-stream, is the one used by streaming LLM responses (OpenAI format) and is a functionality that we need asap.Primary Changes:
http_stream.gothat implements streaming responses of the mentioned types.sync.goto check for the backend response headers and implement either this streaming protocol or respond normally (read full response first and then relay)Secondary Changes:
serveSyncRequest()function, to keep code more tidy.processStreamRequestto provide an example on how to support streaming responses by an app.processStreamRequestto make code more modular and re-use functions between normal and streamed response.Checks:
I tested this in beta-testnet, against a vLLM enpoint, using this custom RM and the
pocketd relayminer relaycommand:pocketd relayminer relay \ --app=pokt1wvn4a8kj4mfnq0cjakadskxwr2zkev35psjxh9 \ --supplier=pokt19a3t4yunp0dlpfjrp7qwnzwlrzd5fzs2gjaaaj \ --node=https://shannon-testnet-grove-rpc.beta.poktroll.com \ --grpc-addr=shannon-testnet-grove-grpc.beta.poktroll.com:443 \ --grpc-insecure=false \ --payload='{"messages": [{"role": "user", "content": "Tell me how to properly eat a Choripan."}],"max_tokens":200, "model":"pocket_network", "stream":true}' \ --supplier-public-endpoint-override=http://localhost:8545/v1/chat/completions \ --network=betaresponse:
{"level":"warn","time":"2025-06-24T16:40:29-03:00","message":"⚠️ Using override endpoint URL: http://localhost:8546/v1/chat/completions"} {"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ JSON-RPC request payload serialized."} {"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Relay request built."} {"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Retrieved private key for app pokt1wvn4a8kj4mfnq0cjakadskxwr2zkev35psjxh9"} {"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Relay request signed."} {"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Relay request marshaled."} {"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Endpoint URL parsed: http://localhost:8546/v1/chat/completions"} {"level":"info","time":"2025-06-24T16:40:30-03:00","message":"🔍 Content-Type: text/event-stream; charset=utf-8"} {"level":"info","time":"2025-06-24T16:40:30-03:00","message":"🌊 Handling streaming response with status:"} {"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 449"} {"level":"info","time":"2025-06-24T16:40:30-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: role:assistant] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"} {"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 213"} {"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 432"} {"level":"info","time":"2025-06-24T16:40:30-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:Ch] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"} {"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 213"} {"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 432"} {"level":"info","time":"2025-06-24T16:40:31-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:or] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"} {"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 213"} {"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 432"} {"level":"info","time":"2025-06-24T16:40:31-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:ip] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"} {"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 213"} {"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 432"} {"level":"info","time":"2025-06-24T16:40:31-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:an] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"} {"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 213"} {"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 433"} {"level":"info","time":"2025-06-24T16:40:32-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: is] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"} ...Type of change
Select one or more from the following:
Sanity Checklist
assignees,reviewers,labels,project,iterationandmilestonemake docusaurus_startmake go_develop_and_testandmake test_e2edevnet-test-e2elabel to run E2E tests in CImake test_e2e_oneshot