Skip to content

Conversation

@RawthiL
Copy link
Contributor

@RawthiL RawthiL commented Jun 25, 2025

Summary

Added streaming support for contentypes:

These are streams delimited by newlines (\n) that can be supported by Pocket Network without any change to the verification and accounting logic.

The first one, text/event-stream, is the one used by streaming LLM responses (OpenAI format) and is a functionality that we need asap.

Primary Changes:

  • Added new file http_stream.go that implements streaming responses of the mentioned types.
  • Added a custom delimiter for the Pocket Network streamed responses, necessary to avoid collision with arbitrary payloads and supplier signature.
  • Modified sync.go to check for the backend response headers and implement either this streaming protocol or respond normally (read full response first and then relay)

Secondary Changes:

  • Modified the order of some checks in the serveSyncRequest() function, to keep code more tidy.
  • Modified processStreamRequest to provide an example on how to support streaming responses by an app.
  • Modified processStreamRequest to make code more modular and re-use functions between normal and streamed response.

Checks:

I tested this in beta-testnet, against a vLLM enpoint, using this custom RM and the pocketd relayminer relay command:

pocketd relayminer relay \
    --app=pokt1wvn4a8kj4mfnq0cjakadskxwr2zkev35psjxh9 \
    --supplier=pokt19a3t4yunp0dlpfjrp7qwnzwlrzd5fzs2gjaaaj \
    --node=https://shannon-testnet-grove-rpc.beta.poktroll.com  \
    --grpc-addr=shannon-testnet-grove-grpc.beta.poktroll.com:443 \
    --grpc-insecure=false \
    --payload='{"messages": [{"role": "user", "content": "Tell me how to properly eat a Choripan."}],"max_tokens":200, "model":"pocket_network", "stream":true}' \
    --supplier-public-endpoint-override=http://localhost:8545/v1/chat/completions \
    --network=beta

response:

{"level":"warn","time":"2025-06-24T16:40:29-03:00","message":"⚠️ Using override endpoint URL: http://localhost:8546/v1/chat/completions"}
{"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ JSON-RPC request payload serialized."}
{"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Relay request built."}
{"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Retrieved private key for app pokt1wvn4a8kj4mfnq0cjakadskxwr2zkev35psjxh9"}
{"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Relay request signed."}
{"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Relay request marshaled."}
{"level":"info","time":"2025-06-24T16:40:29-03:00","message":"✅ Endpoint URL parsed: http://localhost:8546/v1/chat/completions"}
{"level":"info","time":"2025-06-24T16:40:30-03:00","message":"🔍 Content-Type: text/event-stream; charset=utf-8"}
{"level":"info","time":"2025-06-24T16:40:30-03:00","message":"🌊 Handling streaming response with status:"}
{"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 449"}
{"level":"info","time":"2025-06-24T16:40:30-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: role:assistant] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"}
{"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 213"}
{"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 432"}
{"level":"info","time":"2025-06-24T16:40:30-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:Ch] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"}
{"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 213"}
{"level":"info","time":"2025-06-24T16:40:30-03:00","message":"📦 Read chunk of length 432"}
{"level":"info","time":"2025-06-24T16:40:31-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:or] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"}
{"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 213"}
{"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 432"}
{"level":"info","time":"2025-06-24T16:40:31-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:ip] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"}
{"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 213"}
{"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 432"}
{"level":"info","time":"2025-06-24T16:40:31-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:an] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"}
{"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 213"}
{"level":"info","time":"2025-06-24T16:40:31-03:00","message":"📦 Read chunk of length 433"}
{"level":"info","time":"2025-06-24T16:40:32-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: is] finish_reason:<nil> index:0 logprobs:<nil>]] created:1.75079403e+09 id:chat-2b20014c1d124d1da32d6a71139ea77e model:pocket_network object:chat.completion.chunk]"}
...

Type of change

Select one or more from the following:

  • New feature, functionality or library
  • Bug fix
  • Code health or cleanup
  • Documentation
  • Other (specify)

Sanity Checklist

  • I have updated the GitHub Issue Metadata: assignees, reviewers, labels, project, iteration and milestone
  • For docs: make docusaurus_start
  • For small changes: make go_develop_and_test and make test_e2e
  • For major changes: devnet-test-e2e label to run E2E tests in CI
  • For migration changes: make test_e2e_oneshot
  • 'TODO's, configurations and other docs

@RawthiL RawthiL requested a review from Olshansk June 25, 2025 13:18
@Olshansk Olshansk requested a review from red-0ne June 25, 2025 17:10
@Olshansk Olshansk assigned Olshansk and RawthiL and unassigned Olshansk Jun 25, 2025
@Olshansk Olshansk added the relayminer Changes related to the Relayminer label Jun 25, 2025
@Olshansk Olshansk changed the title [Feature] Streaming Support for SSE and NDJSON (used for LLMs) [RelayMiner] Streaming Support for SSE and NDJSON (used for LLMs) Jun 25, 2025
@Olshansk Olshansk added this to Shannon Jun 25, 2025
@github-project-automation github-project-automation bot moved this to 📋 Backlog in Shannon Jun 25, 2025
@Olshansk Olshansk moved this from 📋 Backlog to 👀 In review in Shannon Jun 25, 2025
@Olshansk Olshansk added this to the MainNet to 100% Traffic milestone Jun 25, 2025
Copy link
Contributor

@red-0ne red-0ne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Hybrid Flow: Between Sync and Async

This is a very interesting use case that lands somewhere between traditional synchronous and asynchronous workflows.

It introduces flexibility but also surfaces protocol and integration gaps that need to be addressed to make it robust.


⚙️ Protocol Weaknesses & Gateway Limitations

Despite the potential, the current support is not ideal:

  • Streaming behavior doesn’t cleanly map to the "synchronous" protocol expectations.
  • The protocol needs stronger guarantees to handle streaming deterministically (i.e. Knows before hand and not after the fact that it's a streamed response)
  • On the integration side, the Gateway (PATH) needs to implement this flow.

🔍 Observations from the Current Behavior

From what I understand of the implementation:

  1. Each streamed chunk is individually wrapped in a signed RelayResponse.

    • However, only the last chunk is mined.
  2. This behavior likely confuses the Gateway:

    • Does it unwrap and stream each chunk as it arrives?
    • My suspicion: the first RelayResponse gets unwrapped, while the rest are forwarded as-is.

📡 Need for Explicit Flow Awareness in PATH

Just like with WebSocket relays, the Gateway needs to:

  • Be explicitly aware it's dealing with a request that will have a streamed response.
  • Handle the flow consistently, both in terms of routing and unwrapping.

Without that awareness, the Gateway might misinterpret intermediate chunks or mining status.


🚫 Why Implicit Detection via Content-Type Falls Short

I’m not a big fan of dynamically switching flows based on backend response Content-Type. Here’s why:

  • It’s non-deterministic: both the Gateway and the RelayMiner learn the flow type only after receiving a response.
  • That introduces brittleness and makes reasoning about the system harder.

✅ A Better Path: Deterministic Flow Declaration

This feels like a solid use case for introducing a dedicated RPCType and deterministic request routing [1]

  • The Gateway should be able to infer the streaming nature of the call from the request itself (via request path, headers, or query params).
  • It then passes this context to the RelayMiner, which selects the appropriate execution flow.

🔄 Response Handling Pipeline (Streamed)

This new flow would allow the RelayMiner to Properly mine the produced Relays

And the Gateway to:

  • Validate each chunk’s RelayResponse signature.
  • Unwrap it safely.
  • Write the payload to the client’s stream without mistaking the first response as final.

That would enable proper streaming while preserving the protocol’s guarantees.


🚨 Final Concern: Mining Abuse

As with WebSockets, this opens the door to relay stuffing and fake chunk mining:

  • We need to define clear mining semantics for streaming workflows.
  • Prevent suppliers from exploiting chunked delivery to inflate mined relays.

cc @commoddity, @Olshansk, @adshmh

[1] https://www.notion.so/buildwithgrove/Deterministic-request-routing-1a0a36edfff68037ac2bef453d793f33

@RawthiL
Copy link
Contributor Author

RawthiL commented Jun 26, 2025

Thanks for the feedback @red-0ne , letme make some comments

Main thoughts

There is difference between this implementation and a web-socket.
In the case of a web-socket one expects a long-lasting connection with an unknown number of events that correspond to an uncontrolled generation (like new blocks being minted or other kind of event happening).
In this case, there is a single work request by Pocket App that results in a single response generation. Now, the generation can be relayed as a whole (current scenario) or in chunks (partially proposed here).
This means that we do not need to deal with any changes in relay mining because the premise of "one request -> one relay" still holds.

The use of multiple signatures, one per chunk, roots in the need of showing the App that the chuck they received was sent by the Supplier and the connection was not modified. The fact that we use the last package to mine is arbitrary, as any piece of payload (let it be the whole payload or a chunk) would suffice to make the inclusion. Subsequent inclusions to the tree that origin from the same app request would be the same as replay attacks (an scenario that exists regardless this functionality)

The problem of non-deterministic responses

There is no way of knowing if a backend will provide a streaming response of this kind in advance.

Some services (like OpenAI) are non-deterministic, receiving or not a streaming response depends on a parameter embedded in the request JSON payload. So, the gateway would need to know all services specifications in order to be able to call the service with the stream support, adding complexity (and non deterministic behavior) to the gateway and making service staking more complex (multiple protocol supports for same backend).

The larger problem

If we go deeper into the non-deterministic problems, the current handling of requests if highly non-deterministic from a resource perspective.
Pocket can support any service, and services can produce responses of just a few bytes or hundreds of MBytes. This is something we get today in blockchain nodes. The current approach of reading a whole backend response, sign it and relay it makes the memory and CPU consumption per-request highly variable on the Relayminer side (something that already creates issues).
The correct way of making the RM and Pocket App behavior deterministic is to expect the communication between the RM and the App to be a stream in chunks of a given size (in bytes). So, if all RM->App transfers follow a stream pattern, the non-deterministic problems that are raised here are gone and we also gain memory and CPU consumption bounds per relay.

Now, I don't expect to make such a wild change at this point to the RM -> App communication (we have other pressing issues), but the stream protocol being implemented here could be the new standard for their communication.

On the integration side

This PR includes example code for the reception of the streamed response, these can be implemented into PATH or any other gateway implementation. I don't see this as an issue, this is just a protocol upgrade that the different gateway clients could support or not.

@red-0ne
Copy link
Contributor

red-0ne commented Jun 27, 2025

@RawthiL thanks a lot for your comments, they highlight some very pertinent point that fully agree with.


1 · Why chunked must be the default

I fully agree that Gateway ↔ RelayMiner traffic should be exclusively chunked. I though about adding classing streaming to spare the RelayMiner from buffering large responses, but it still forces the Gateway to buffer everything before signature verification. A dedicated chunked protocol removes that bottleneck and lets us scale to more heterogeneous services.

Let's push to have that on both Gateways and RelayMiners, it's a net benefit.


2 · RelayMiner must stay content-agnostic

The RelayMiner should ignore downstream headers and payload details. Relying on Content-Type (or similar) doesn’t scale as we introduce new flows. Instead, we need a single framing rule that treats the backend as a black box and only concerns itself with agnostic chunking content, signatures and delivery.

Again, streaming signed chunks solves this problem.


3 · Fixing mining semantics

The current PR mines only the last chunk; this doesn't align with our current assumption that we require the entire response to be verifiable on-chain. While this is needed for Watchers to work, it's a far future feature that we don't need to burden ourselves with for now. So i suggest an even more radical approach: mine a hash of the complete payload. This will drastically reduce the RelayMiner's memory usage and avoids a lot of onchain state bloat.

It is also future proof, when we solve the full Relay storage problem.

cc @bryanchriswhite

@RawthiL RawthiL force-pushed the streaming-support branch 2 times, most recently from 7360240 to 5842f9b Compare July 7, 2025 20:44
@RawthiL RawthiL force-pushed the streaming-support branch from 5842f9b to d891753 Compare July 14, 2025 19:22
@Olshansk
Copy link
Collaborator

@RawthiL I synched on this with @red-0ne and here's where we landed:

  1. Long-term scope: @red-0ne is going to put together a detailed ticket w/ a proper long-term solution.
  2. poktroll fork - Because this specific PR doesn't really fit with (1), though it's a great Proof-Of-Concept, I recommend forking poktroll and keeping it in other in the meantime
  3. shannon-sdk main - If you want/need this to be on the "official" main, I recommend finding a way to port these changes into the shannon-sdk.
  4. Gateway callout - Please note that even if/when we do (1,2,3), this won't work in production until PATH, a fork of PATH, or an alternative gateway implementation adds support for this. It's definitely on the roadmap but not in the immediate future.

@RawthiL RawthiL force-pushed the streaming-support branch from d891753 to fd5a740 Compare July 17, 2025 18:37
@RawthiL
Copy link
Contributor Author

RawthiL commented Jul 17, 2025

@Olshansk

I'll look forward to see the long term solution, which I hope covers the needs of this issue. I agree that the current PR is not a generalist solution nor compatible with a fully-chunked protocol solution.
However, the current PR is compatible with the non-streaming relays (all non-SSE and non-NDJSON) and is production-tested (we have deployed this image both in testnet and mainnet).

Given that the current spirit of the protocol is "move fast break things" I only see a net positive in including this, because:

  • It will heavily reduce our overhead when the protocol is updated (done quite often).
  • It will enable anyone to deploy LLM services that meet the expectation of the users (i.e. streaming responses).
  • It will promote the creation of gateways with streaming capabilities.

When the long term solution arrives we can remove all this code, and it wont produce much overhead, as the new approach/solution will most likely mean that the whole relayminer<>gateway interaction will change to a chunked version (otherwise it wont include the needs of this PR).

@RawthiL
Copy link
Contributor Author

RawthiL commented Jul 31, 2025

@Olshansk @red-0ne

Any updates on the detailed ticket?

@Olshansk
Copy link
Collaborator

Olshansk commented Jul 31, 2025

@RawthiL No updates. ETA is 3-4 weeks from now.

The entire's team focus right now is session rollovers.

It's what we need for enterprise-grade QoS, which is a precursor to increase sales, which is a precursor to getting all suppliers more POKT/earnings.

@RawthiL RawthiL force-pushed the streaming-support branch 2 times, most recently from 0eb11bd to cee9aa7 Compare August 15, 2025 20:25
@Olshansk Olshansk marked this pull request as draft August 22, 2025 22:30
@RawthiL RawthiL marked this pull request as ready for review September 12, 2025 19:55
@RawthiL
Copy link
Contributor Author

RawthiL commented Sep 12, 2025

Re-based this to latest main version.

While this is not a permanent solution is a patch that wont affect PATH, as blockchain RPC or non-SSE/NDJSON streams won't modify the flow.

A more robust solution should be done later, but it will be a breaking change for PATH as it will affect any relay regardless of payload. Even if I work on that today, it will require a lot of coordination with the Grove team to implement.
I my opinion the correct solution should:

  • Stream binary chunks regardless the kind of request (deterministic behavior, up to some point at least...).
  • All streamed chucks will be signed, but only one will be mined (the App requested one relay, the fact that the Servicer returns N parts is irrelevant to the economic transcation of the relay).
  • The streamed chunks should be streamed at fixed sized and/or time (whatever happens first): This fixes the problem of large payloads (affecting the RM) and also the slow streamed payloads (SSE / NDJSON case)
  • Applications should unpack the pocket stream and proceed to do what they want/need.

@TheFeloniousMonk
Copy link

Ack the review request. I see @Olshansk has this on his plate for review this week, so will hold off until he's taken a pass at it.

@RawthiL
Copy link
Contributor Author

RawthiL commented Sep 25, 2025

lol I'm getting sick of this, waited months, now the whole sync is changing.

@RawthiL
Copy link
Contributor Author

RawthiL commented Sep 25, 2025

I dropped the whole changes for cmd_relay.go, too many changes, I got tiered of trying to give support to it.

Streaming is working, with my old test code, but the normal relays are not.

Current version of main broke the timeouts, probably at this hard-code of headers timeout:

ResponseHeaderTimeout: 5 * time.Second, // Header timeout to allow for server processing time

Because, the client says it timeout after 10m0s, but it timesout at 5s:

ERR HTTP request failed - detailed phase breakdown for timeout debugging error="Post \"http://192.168.1.11:9087/v1/completions\": net/http: timeout awaiting response headers"  ....
....
ERR ⚠️ Failed serving synchronous relay request. This MIGHT be a client error. error="request to service text-to-text timed out after 10m0s: relayer proxy request timed out" remote_addr=127.0.0.1:43888 request_id= server_host=0.0.0.0:8546 server_type=http user_agent=Go-http-client/1.1

I will probably have to wait even more to have this merged, because now we are worst than before.

@Olshansk
Copy link
Collaborator

tl;dr We're prioritizing RelayMiner performance on basic RPCs before we add features.


@RawthiL Apoligies for the delay.

The rationale here is to prioritize RelayMiner performance before bloating it with features.

The priority is: RelayMiner performance works and scales.

My concern is:

  1. Add streaming support
  2. Iterate on RelayMiner performance
  3. Go down rabbit holes related to streaming support BEFORE RelayMiner performance is done.

Concrete ask: The beauty of OSS is the ability to maintain forks and sync them until the time is right.

Concrete ask: @RawthiL What can we do to make ☝️ possible?


cc @red-0ne @jorgecuesta @RawthiL @TheFeloniousMonk

@RawthiL
Copy link
Contributor Author

RawthiL commented Sep 26, 2025

tl;dr basic RPCs performance on the RelayMiner is degraded in current main, not for streamed RPCs, for all RPCs. This PR has nothing to do with basic RPCs. Streaming is working.


@Olshansk please tell me the root of you concern since this PR:

  • Does not affect basic RPC path.
  • Code is separated from basic RPC code.
  • No testing is being performed w.r.t. slow RPCs.

Concrete ask: The beauty of OSS is the ability to merge contributions.

Concrete ask: @Olshansk What can we do to make ☝️ possible?

Copy link
Collaborator

@Olshansk Olshansk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RawthiL Thank you for your patience!

This is going to be very important very soon, so I review it, suggesting some changes and will outline next steps.

  1. See my review in #1805. Feel free to edit/modify it as need be.
  2. Once it looks good to you, let's squash & merge it into this PR.
  3. Then, my request is that you add some tooling/configs/make targets, etc so we can test this out in LocalNet.
  4. I'm hoping that the config changes in #1805 can act as a reference to kick things off.

Happy to provide more details/support but trying to keep this short. Lmk!

@Olshansk
Copy link
Collaborator

Olshansk commented Oct 8, 2025

@RawthiL Bump on this branch.

Let's iterate on it if you want to get it into the next release

@RawthiL RawthiL force-pushed the streaming-support branch 2 times, most recently from f61ad03 to e98915f Compare October 13, 2025 19:52
@RawthiL
Copy link
Contributor Author

RawthiL commented Oct 13, 2025

Updated this to latest branch and recovered the relayminer relay functionality with stream support so it can be tested.

Also included a fix to the signer creation, that appeared with the CGO change.

Copy link
Collaborator

@Olshansk Olshansk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RawthiL Let's be iterative on this but I want to push this through.

  1. Can you PTAL at #1805
  2. Review, edit, update #1805. Once it looks good to you, squash & merge it into this.
  3. Please add the new command you suggested.
  4. Please verify make send_relay_path_REST and make pocketd_relayminer_relay_JSONRPC work for you as well.
  5. Re-request a review here and I'll review, edit and approve to get it over the finish line.
Screenshot 2025-10-15 at 4 03 40 PM

Olshansk added a commit that referenced this pull request Oct 16, 2025
…1805)

Reviewing `Streaming Support for SSE and NDJSON (used for LLMs)` in #1750. 

**Next steps**:
- Outlined in this comment: #1570 (review)

**Reproducing this locally (after running localnet)**:

```bash
make send_relay_path_REST
make pocketd_relayminer_relay_JSONRP
```


**Changes made**:
- Updated the LocalNet toolchain to make it easier to test REST & LLM services
- Added `make pocketd_relayminer_relay_JSONRPC` and revived `make send_relay_path_REST`
- Reviewing streaming RelayMiner changes in [#1570](#1570) for code health & maintainbility
- Moved logic from `sync.go` into `server.handleHttp` to follow a similar pattern as `server.handleHttpStream`

---

The image below shows the result in LocalNet:

<img width="3552" height="1527" alt="Screenshot 2025-09-30 at 8 10 22 PM" src="https://github.com/user-attachments/assets/bcf9b2dc-5560-4ab4-b06f-c1ebdfaaaa93" />

---

Co-authored-by: Ramiro Rodriguez Colmeiro <[email protected]>
@RawthiL
Copy link
Contributor Author

RawthiL commented Oct 16, 2025

  1. ✅ Added make pocketd_relayminer_relay_NDJSONRPC, it should work with the provcided Ollama service via make localnet_up
  2. ⚠️ Partial success:
    • make pocketd_relayminer_relay_JSONRPC,
    • ⚠️ I was not able to execute make send_relay_path_REST, I get an error with config from the path gateway (I've not used Path in a long time):

{"level":"fatal","error":"open /app/config/.config.yaml: no such file or directory","message":"failed to load config"}

  1. 🔍 Please re-review!

cc: @Olshansk

RawthiL added a commit that referenced this pull request Oct 16, 2025
…1805)

Reviewing `Streaming Support for SSE and NDJSON (used for LLMs)` in #1750. 

**Next steps**:
- Outlined in this comment: #1570 (review)

**Reproducing this locally (after running localnet)**:

```bash
make send_relay_path_REST
make pocketd_relayminer_relay_JSONRP
```


**Changes made**:
- Updated the LocalNet toolchain to make it easier to test REST & LLM services
- Added `make pocketd_relayminer_relay_JSONRPC` and revived `make send_relay_path_REST`
- Reviewing streaming RelayMiner changes in [#1570](#1570) for code health & maintainbility
- Moved logic from `sync.go` into `server.handleHttp` to follow a similar pattern as `server.handleHttpStream`

---

The image below shows the result in LocalNet:

<img width="3552" height="1527" alt="Screenshot 2025-09-30 at 8 10 22 PM" src="https://github.com/user-attachments/assets/bcf9b2dc-5560-4ab4-b06f-c1ebdfaaaa93" />

---

Co-authored-by: Ramiro Rodriguez Colmeiro <[email protected]>
@Olshansk Olshansk self-requested a review October 17, 2025 20:26
Copy link
Collaborator

@Olshansk Olshansk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RawthiL Please re-review your changes.

I pulled. Tried to run it. It's broken.

I haven't had a chance to debug yet.

Screenshot 2025-10-17 at 1 55 58 PM

@RawthiL
Copy link
Contributor Author

RawthiL commented Oct 20, 2025

@Olshansk

My steps were simply:

  • make localnet_up
  • make acc_initialize_pubkeys
  • make pocketd_relayminer_relay_NDJSONRPC

Please make sure:

  • ollama is enabled in your localnet_config.yaml.
  • ollama reached the "llama runner started" point in the log.

@RawthiL
Copy link
Contributor Author

RawthiL commented Oct 29, 2025

Rebased to last main version.
Retested with:

make localnet_up
make acc_initialize_pubkeys
make pocketd_relayminer_relay_NDJSONRPC
Stream output
➜ make pocketd_relayminer_relay_NDJSONRPC
+---------------------------------------------------------------------------------------+
|                                                                                       |
| 🚨 IMPORTANT: Please run the following make command after the network has started: 🚨 |
|                                                                                       |
|     make acc_initialize_pubkeys POCKET_NODE=http://localhost:26657                    |
|                                                                                       |
|     This is required for the following scenarios:                                     |
|       - Running Localnet                                                              |
|       - Running E2E tests                                                             |
|                                                                                       |
|     💡 If you receive the following error response when sending a relay:              |
|                                                                                       |
|     'Failed to receive any response from endpoints. This could be due to              |
|     network issues or high load. Please try again.'                                   |
|                                                                                       |
|     You probably forgot to run 'make acc_initialize_pubkeys'.                         |
+---------------------------------------------------------------------------------------+
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"About to send 1 relay(s) to supplier '' for app 'pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst'"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ gRPC connection initialized: &{0xc00220c140 0x5a5880 localhost:9090 {{dns  <nil>  /localhost:9090  false false   }} localhost:9090 {<nil> <nil> [] [] <nil> <nil> {{1000000000 1.6 0.2 120000000000}} false false 0  <nil> {grpc-go/1.72.0 <nil> false [] {} <nil> {0 0 false} [] 0 0 32768 32768 false <nil> <nil> 0xc000829440} [] <nil> false false false <nil> <nil> <nil> [] 1800000000000 dns 5 false true} 0xc001f7a240 0x96ffc80 0xc0023081e0 0xc00223c240 0xc0025850b0 0xc002517860 {{{{} {0 0}} 0 0 {{} 0} {{} 0}} 0xc000534770} {<nil>} {{{} {0 0}} 0 0 {{} 0} {{} 0}} 0xc0012d4600 0xc001f7e500 <nil> map[] {0 0 false} 0xc0025178c0 {{} {0 0}} <nil>}"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Node status fetcher initialized"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Account client initialized"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Application client initialized"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Application fetched: {pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst 100000068upokt [service_id:\"ollama\" ] [pokt15vzxjqklzjtlz7lahe8z2dfe9nm5vxwwmscne4] map[] 0 <nil>}"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Service identified: 'ollama'"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Application ring created"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Block height retrieved: 52"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Session with id e6fe4b08c3a1aa1da4d4d3657899ebad37fba41a6dec67417d880d044adfc489 at height \t52 fetched for app pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst and service ID ollama with 1 suppliers"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ 1 endpoints fetched"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ No supplier specified, randomly selected endpoint: {{pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst ollama e6fe4b08c3a1aa1da4d4d3657899ebad37fba41a6dec67417d880d044adfc489 51 60} 4 {http://relayminer1:8545 4 []} pokt19a3t4yunp0dlpfjrp7qwnzwlrzd5fzs2gjaaaj}"}
{"level":"warn","time":"2025-10-29T11:46:48-03:00","message":"⚠️ Using override endpoint URL: http://localhost:8085/v1/chat/completions"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Retrieved private key for app pokt1pn64d94e6u5g8cllsnhgrl6t96ysnjw59j5gst"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Endpoint URL parsed: http://localhost:8085/v1/chat/completions"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ JSON-RPC request payload serialized."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Relay request built."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"⏱️ Request building duration: 67.821µs"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Relay request signed."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"⏱️ Request signing duration: 3.615394ms"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Relay request marshaled."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"⏱️ Request marshalling duration: 3.104µs"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"⏱️ Request sending duration: 889.373942ms"}
{"level":"warn","time":"2025-10-29T11:46:48-03:00","message":"⚠️ This is a streaming response, if more than one relay was requested the log data can be inconsistent."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"🌊 Handling streaming response with status:"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"🔍 Detected SSE stream, we will try to unmarshal."}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:1 role:assistant] finish_reason:<nil> index:0]] created:1.761749208e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749208e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:48-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:2 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:3 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:4 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:5 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:6 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:7 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:8 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:. role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 461"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: \n role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 459"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content:9 role:assistant] finish_reason:<nil> index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 462"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ Deserialized response body as JSON map: map[choices:[map[delta:map[content: role:assistant] finish_reason:length index:0]] created:1.761749209e+09 id:chatcmpl-62 model:qwen:0.5b object:chat.completion.chunk system_fingerprint:fp_ollama]"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 248"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"✅ SSE Done"}
{"level":"info","time":"2025-10-29T11:46:49-03:00","message":"📦 Read chunk of length 236"}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

relayminer Changes related to the Relayminer

Projects

Status: 👀 In review

Development

Successfully merging this pull request may close these issues.

6 participants