-
Notifications
You must be signed in to change notification settings - Fork 25
RM: mining supervisor, per-service eager validation, xsync fast paths #1819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📈 Benchmark: Defer validation/mining work until after writing the responseThis microbench simulates three server behaviors:
handler_flush_test.gopackage demo
import (
"bufio"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
)
var (
sleepDur = 50 * time.Millisecond
firstChunk = []byte("FIRST\n")
lastChunk = []byte("LAST\n")
)
// helper: start server
func startServer(h http.HandlerFunc) (*httptest.Server, *http.Client) {
ts := httptest.NewServer(h)
return ts, ts.Client()
}
// measure only reads the first N bytes (time-to-first-chunk)
func measureReadN(b *testing.B, c *http.Client, url string, n int) {
b.Helper()
buf := make([]byte, n)
for i := 0; i < b.N; i++ {
resp, err := c.Get(url)
if err != nil {
b.Fatalf("GET failed: %v", err)
}
r := bufio.NewReader(resp.Body)
if _, err := io.ReadFull(r, buf); err != nil {
b.Fatalf("ReadFull: %v", err)
}
// Don’t wait for the rest of the body to demonstrate early return.
_ = resp.Body.Close()
}
}
// measureFull reads the full body (EOF or Content-Length)
func measureFull(b *testing.B, c *http.Client, url string) {
b.Helper()
for i := 0; i < b.N; i++ {
resp, err := c.Get(url)
if err != nil {
b.Fatalf("GET failed: %v", err)
}
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}
}
// Handler variants
// No flush: write first chunk then keep working; in practice the first bytes
// often won't be observable by the client until return.
func handlerNoFlush(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(firstChunk)
time.Sleep(sleepDur) // simulate post-response work
_, _ = w.Write(lastChunk)
}
// Flush after first chunk: lets client observe the response early.
func handlerFlush(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(firstChunk)
// Flush explicitly so the client can read FIRST immediately.
if f, ok := w.(http.Flusher); ok {
f.Flush()
// Alternatively (Go 1.20+): http.NewResponseController(w).Flush()
}
time.Sleep(sleepDur) // simulate post-response work
_, _ = w.Write(lastChunk)
}
// Baseline: no extra work.
func handlerBaseline(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(firstChunk)
_, _ = w.Write(lastChunk)
}
// Benchmarks
func Benchmark_NoFlush_ReadFirstChunk(b *testing.B) {
ts, c := startServer(handlerNoFlush)
defer ts.Close()
measureReadN(b, c, ts.URL, len(firstChunk))
}
func Benchmark_Flush_ReadFirstChunk(b *testing.B) {
ts, c := startServer(handlerFlush)
defer ts.Close()
measureReadN(b, c, ts.URL, len(firstChunk))
}
func Benchmark_Baseline_ReadFirstChunk(b *testing.B) {
ts, c := startServer(handlerBaseline)
defer ts.Close()
measureReadN(b, c, ts.URL, len(firstChunk))
}
func Benchmark_NoFlush_ReadFull(b *testing.B) {
ts, c := startServer(handlerNoFlush)
defer ts.Close()
measureFull(b, c, ts.URL)
}
func Benchmark_Flush_ReadFull(b *testing.B) {
ts, c := startServer(handlerFlush)
defer ts.Close()
measureFull(b, c, ts.URL)
}
func Benchmark_Baseline_ReadFull(b *testing.B) {
ts, c := startServer(handlerBaseline)
defer ts.Close()
measureFull(b, c, ts.URL)
}Observed on AMD Ryzen 9 5950X (Linux, Go bench -32):Benchmark_NoFlush_ReadFirstChunk-32 ~50,272,682 ns/op
Benchmark_Flush_ReadFirstChunk-32 ~129,078 ns/op
Benchmark_Baseline_ReadFirstChunk-32 ~44,888 ns/op
Benchmark_NoFlush_ReadFull-32 ~50,277,057 ns/op
Benchmark_Flush_ReadFull-32 ~50,275,506 ns/op
Benchmark_Baseline_ReadFull-32 ~45,569 ns/opWhat this tells us
|
⚡ Benchmark: xsync.Map vs RWMutex for map[string]int64 under read-heavy load (with/without pruning)This benchmark contrasts a classic map+RWMutex against xsync.Map[string,int64] for read-heavy and mixed loads, including a periodic “prune/touch all keys” pass to simulate a maintenance task. sessions_bench_test.go// sessions_bench_test.go
package demo
import (
"context"
"fmt"
"math/rand"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/puzpuzpuz/xsync/v4"
)
// --- configuration ---
const (
keyCount = 10_000 // number of session keys preloaded
pruneEvery = 250 * time.Microsecond
rngSalt = 0x9e3779b97f4a7c15
)
var goroutineSeq uint64 // unique id per RunParallel worker
// ------------------------------------
func makeKeys(n int) []string {
keys := make([]string, n)
for i := 0; i < n; i++ {
keys[i] = fmt.Sprintf("sess-%08d", i)
}
return keys
}
// ===== Implementation A: map + RWMutex =====
type rwMap struct {
mu sync.RWMutex
m map[string]int64
}
func newRWMap(keys []string) *rwMap {
m := make(map[string]int64, len(keys))
for i, k := range keys {
m[k] = int64(i)
}
return &rwMap{m: m}
}
func (s *rwMap) Load(k string) (int64, bool) {
s.mu.RLock()
v, ok := s.m[k]
s.mu.RUnlock()
return v, ok
}
func (s *rwMap) Store(k string, v int64) {
s.mu.Lock()
s.m[k] = v
s.mu.Unlock()
}
// PruneTouchAll simulates a pruning pass by touching every key under write lock.
// This is intentionally write-locked to demonstrate reader blocking under contention.
func (s *rwMap) PruneTouchAll() {
s.mu.Lock()
for k, v := range s.m {
s.m[k] = v + 1
_ = k
}
s.mu.Unlock()
}
// ===== Implementation B: xsync.Map =====
type xMap struct {
m *xsync.Map[string, int64]
}
func newXMap(keys []string) *xMap {
m := xsync.NewMap[string, int64]()
for i, k := range keys {
m.Store(k, int64(i))
}
return &xMap{m: m}
}
func (s *xMap) Load(k string) (int64, bool) {
return s.m.Load(k)
}
func (s *xMap) Store(k string, v int64) {
s.m.Store(k, v)
}
// PruneTouchAll simulates a pruning pass by ranging without blocking readers.
func (s *xMap) PruneTouchAll() {
s.m.Range(func(k string, v int64) bool {
s.m.Store(k, v+1)
return true
})
}
// ===== shared harness =====
type kvStore interface {
Load(string) (int64, bool)
Store(string, int64)
PruneTouchAll()
}
func runMixed(b *testing.B, store kvStore, keys []string, readPct int, withPrune bool) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
if withPrune {
wg.Add(1)
go func() {
defer wg.Done()
t := time.NewTicker(pruneEvery)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
store.PruneTouchAll()
}
}
}()
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
// unique per-goroutine seed
id := atomic.AddUint64(&goroutineSeq, 1)
seed := int64(uint64(time.Now().UnixNano()) ^
(rngSalt * id) ^
uint64(runtime.GOMAXPROCS(0)))
r := rand.New(rand.NewSource(seed))
n := len(keys)
for pb.Next() {
k := keys[r.Intn(n)]
if r.Intn(100) < readPct {
_, _ = store.Load(k)
} else {
store.Store(k, int64(r.Int31()))
}
}
})
cancel()
wg.Wait()
b.StopTimer()
}
// ===== Bench suites =====
func Benchmark_RWMutex_Read100(b *testing.B) {
keys := makeKeys(keyCount)
runMixed(b, newRWMap(keys), keys, 100, false)
}
func Benchmark_XSync_Read100(b *testing.B) {
keys := makeKeys(keyCount)
runMixed(b, newXMap(keys), keys, 100, false)
}
func Benchmark_RWMutex_Read99Write1(b *testing.B) {
keys := makeKeys(keyCount)
runMixed(b, newRWMap(keys), keys, 99, false)
}
func Benchmark_XSync_Read99Write1(b *testing.B) {
keys := makeKeys(keyCount)
runMixed(b, newXMap(keys), keys, 99, false)
}
func Benchmark_RWMutex_Read75Write25(b *testing.B) {
keys := makeKeys(keyCount)
runMixed(b, newRWMap(keys), keys, 75, false)
}
func Benchmark_XSync_Read75Write25(b *testing.B) {
keys := makeKeys(keyCount)
runMixed(b, newXMap(keys), keys, 75, false)
}
// --- With periodic "prune" (touch-all) to show reader stalls vs. xsync's range ---
func Benchmark_RWMutex_Read99Write1_Prune(b *testing.B) {
keys := makeKeys(keyCount)
runMixed(b, newRWMap(keys), keys, 99, true)
}
func Benchmark_XSync_Read99Write1_Prune(b *testing.B) {
keys := makeKeys(keyCount)
runMixed(b, newXMap(keys), keys, 99, true)
}Observed on AMD Ryzen 9 5950X (Linux, Go bench -32):Read-only: RWMutex ~28.98 ns/op vs XSync ~1.946 ns/op → ~14.9× faster
99% reads / 1% writes:
RWMutex ~61.91 ns/op vs XSync ~2.265 ns/op → ~27.3× faster
75% reads / 25% writes:
RWMutex ~53.06 ns/op vs XSync ~7.491 ns/op → ~7.1× faster
99/1 with prune: RWMutex ~103.1 ns/op vs XSync ~2.442 ns/op → ~42.2× fasterWhat this tells us
|
Instruction timing micro benchmarks (perf tracker)This comment documents the microbenchmarks for the new relayer.PerfTracker used to record per-instruction timings (with "instruction" + "service_id" labels). What these benches measure
Observed on AMD Ryzen 9 5950X (Linux, Go bench -32):BenchmarkPerf_Instance_Buffered-32 ~180 ns/op ~96 B/op 2 allocs/op
BenchmarkPerf_Span_Buffered-32 ~206 ns/op ~128 B/op 3 allocs/op
BenchmarkPerf_Pkg_Buffered-32 ~189 ns/op ~96 B/op 2 allocs/op
BenchmarkPerf_Instance_Immediate-32 ~152 ns/op ~96 B/op 2 allocs/op
BenchmarkPerf_Span_Immediate-32 ~175 ns/op ~128 B/op 3 allocs/op
BenchmarkPerf_Pkg_Immediate-32 ~158 ns/op ~96 B/op 2 allocs/op
BenchmarkPerf_SetServiceID-32 ~21 ns/op ~16 B/op 1 alloc/opTakeaways
Why this matters
Caveats
|
Olshansk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecuesta This is a preliminary review/comment before we dive in.
- Can you shorted the github description (or maybe add a diagram) so it's clear what the change (and why).
- If you see opportunity to split this into smaller PRs, I'd appreciate it!
- Can you update this with the base when you can?
If I left comments which are non-relevant (i.e. not done by you), I apoligize in advance! Feel free to simply resolve them without a comment.
I'll also send a message on discord to make sure we're all aligned.
… minimum latency. - Redo Instruction Timer - Update KeyValues cache - Introduce xsync to avoid using raw Lock Mutex - Introduce eager validation per service config - Introduce full optimistic delayed workflow using Mining Supervisor - Updated localnet files to use query cache - Updated localnet files to been able to set eager validations on/off - Cleanup instruction timer names.
c8ea932 to
095ca65
Compare
…essions_manager.go` and renamed to performance_tracker.go as suggested by Olshansky.
Description of the PR is short enough, I add way more context below in comments to avoid make the PR description too verbose. Also there are diagrams already.
This will not be possible right now at my current times, also all of them work together to achieve the 0.2ms extra on EAGER validation. So I don't like splitting them since that will create a lot of more work rebasing them later on each main pr you merge.
Done! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecuesta Note that I'm still not done.
Overall, it's looking good but I'll really get into the "meat of things" tomorrow.
Review info:
- First review was "preliminary".
- Second review is me building context and providing NITs.
- I anticipate 1 or 2 more reviews given the size, context and importance of this.
In terms of next steps, going to outline it to move fast.
- Can you PTAL at #1837 and make any changes you need/want to it.
- Once #1837 looks good, please squash & merge it into this PR.
- After (2), please look at the comments I have in this PR and either tend, close or comment.
- In parallel to (1,2,3), I'll start looking at
pkg/relayer/performance_tracker.goandpkg/relayer/proxy/mining_supervisor.goin more detail to understand it. - Once all of the above are done, I'll tap in @red-0ne to make sure his changes are not overrriden by the changes in this PR.
--
Would appreciate if you can priortize it ASAP to make sure it's part of the beta release, which we have a cutoff for this friday.
| @@ -1,9 +1,9 @@ | |||
| package query | |||
|
|
|||
| import ( | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file seem to be unsafe & incomplete.
Can you:
- Delete unnecessary commented out things
- Either add back the mutexes or use xsync
- Explain (in the comments in the code, not the github response) why we don't need locks here?
| blockHeightCurrent = "block_height_current" | ||
| instructionTimeSeconds = "instruction_time_seconds" | ||
|
|
||
| // TODO: implement metrics for the following: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this TODO?
| Help: "Total relays enqueued for mining, labeled by service_id.", | ||
| }, []string{"service_id"}) | ||
|
|
||
| MiningQueueDroppedTotal = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#PUC (Please Update Comment) how/why/when things get dropped from the queue.
| ) | ||
|
|
||
| // YAMLRelayMinerConfig is the structure used to unmarshal the RelayMiner config file | ||
| type YAMLRelayMinerConfig struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the docs if you're updating configs: https://dev.poktroll.com/operate/configs/relayminer_config
| panic("RelayMiningSupervisor: downstream channel must not be nil") | ||
| } | ||
| if cfg.QueueSize <= 0 { | ||
| cfg.QueueSize = 10_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need "magic numbers" or "magic constants" (e.g. defaults), let's move it into a local constant.
| cfg.DropPolicy = config.DefaultMSDropPolicy | ||
| } | ||
| if cfg.GaugeSampleInterval <= 0 { | ||
| cfg.GaugeSampleInterval = 200 * time.Millisecond |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we're duplicating logic that the hydrator is already doing.
Can you consolidate the logic so defaults are only configured in one place?
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
|
|
||
| s := &RelayMiningSupervisor{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not store the entire relay mining supervisor config int this structuct?
Feels like it'll simplify things.
| Dur("drop_log_interval", cfg.DropLogInterval). | ||
| Msg("relay mining supervisor started") | ||
|
|
||
| // workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you #PUC on not just "what" this is but "why" this is.
| go s.worker(i) | ||
| } | ||
|
|
||
| // gauge sampler (keeps hot path clean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
red-0ne
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Posting a partial review, will get back to the rest of it soon
pkg/cache/memory/kvcache.go
Outdated
|
|
||
| cachedValue, exists := c.values[key] | ||
| if !exists { | ||
| v, ok := c.values.Load(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| v, ok := c.values.Load(key) | |
| cachedValue, exists := c.values.Load(key) |
| return zero, false | ||
| } | ||
|
|
||
| isCacheValueExpired := time.Since(cachedValue.cachedAt) > c.config.ttl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason we're removing intermediate variables? I believe it's a net positive for readability
| c.values[key] = cacheValue[T]{ | ||
| value: value, | ||
| cachedAt: time.Now(), | ||
| if c.config.maxKeys > 0 && int64(c.values.Size()) > c.config.maxKeys { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems to already exist in evictKey.
Any reason it's duplicated here?
| isMaxKeysConfigured := c.config.maxKeys > 0 | ||
| cacheMaxKeysReached := int64(len(c.values)) > c.config.maxKeys | ||
| if !isMaxKeysConfigured || !cacheMaxKeysReached { | ||
| if c.config.maxKeys <= 0 || int64(c.values.Size()) <= c.config.maxKeys { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer keeping the intermediate variables. I believe it's a net positive for readability.
pkg/cache/memory/kvcache.go
Outdated
|
|
||
| // 1) Prefer to evict any TTL-expired entry (cheap scan, remove one). | ||
| var expiredKey string | ||
| c.values.Range(func(k string, v cacheValue[T]) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think going TTL first might have a negative impact on eviction performance.
If no expired entry is found we would have scanned the entire cache for nothing.
This is especially true for update intensive scenarios.
| return | ||
| } | ||
|
|
||
| // !!! CRITICAL MUTEX !!! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double checked.
The usage of xsync allows for the removal of the locks.
Although it does not guarantee a consistent snapshot of the state of the map, it does a good enough job to remove the outdated removing outdated relayMeters
| rmtr.sessionToRelayMeterMap[sessionId] = relayMeter | ||
| rmtr.relayMeterMu.Unlock() | ||
| // Try to publish; if someone beat us, reuse theirs. | ||
| if existing, loaded := rmtr.sessionToRelayMeterMap.LoadOrStore(sessionId, relayMeter); loaded { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LoadOrStore returns the newly created value if an entry does not exist.
Why not do the following?
| if existing, loaded := rmtr.sessionToRelayMeterMap.LoadOrStore(sessionId, relayMeter); loaded { | |
| relayMeter = rmtr.sessionToRelayMeterMap.LoadOrStore(sessionId, relayMeter) |
| // Build a new entry (same logic you already have) | ||
| appAddress := reqMeta.GetSessionHeader().GetApplicationAddress() | ||
|
|
||
| // In order to prevent over-servicing, the protocol must split the application's stake |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Olshansk , all of the relayMeter logic is moved below and xsync is substituting to the locks.
pkg/relayer/proxy/server_builder.go
Outdated
| rp.blockClient, | ||
| rp.sharedQuerier, | ||
| rp.sessionQuerier, | ||
| miningSup, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| miningSup, | |
| miningSupervisor, |
| query_node_rpc_url: tcp://localhost:26657 | ||
| query_node_grpc_url: tcp://localhost:9090 | ||
| tx_node_rpc_url: tcp://localhost:26657 | ||
| mining_supervisor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are those values different from the defaults?
|
This branch is build on top of #1785 so should be review/merge only after that one is on.
Summary
Introduce an in-process mining supervisor to run delayed validations after writing the client response, adopt per-service eager validation (with a global default that services can override), and switch hot maps/queues to xsync for lower latency and contention.
Primary Changes
Mining Supervisor (new)
xsync.Map[string]*SessionCache) tracking{EndHeight, Rewardable}.Per-service eager validation
enable_eager_relay_request_validationis true for that service: do rate-limit snapshot + signature/session verification before the backend call.Hot path concurrency via xsync
xsync.Map, and fan-in buffering withxsync.MPMCQueuewhere appropriate.Secondary Changes
Start/Finish/Span/Flush, nested spans, buffered emission, and aservice_idlabel for instruction timings.Issue
Type of change
Sanity Checklist
assignees,reviewers,labels,project,iterationandmilestonemake docusaurus_startmake go_develop_and_testandmake test_e2edevnet-test-e2elabel to run E2E tests in CImake test_e2e_oneshotConfiguration (YAML)
Performance snapshot
In this snapshot can clearly observe how the latency spike a bit up on a 300rps, the added latency is not that much (~5ms) considering than this is doing eager validation (check eager-on screenshot). That spike will be solved with a PR from @red-0ne #1810 which I personally already set on top of this one, getting as result the latency on spike on less that ~1ms.
Risk / Compatibility
Implementation Notes
Enqueue & backpressure behavior
drop-oldestis enabled, evict one and retry once.enqueue_timeout_ms > 0, wait briefly before deciding to evict or drop.mining_queue_enqueued_total{service_id}mining_queue_dropped_total{service_id,reason}with reasons likefull,evicted,timeout,timeout_full_after_evict,stopped_or_closed,contextmining_queue_lenSession cache semantics (fast path in handler)
xsync.Map[string]*SessionCachewith:EndHeight int64(updated up, never down)Rewardable atomic.Bool(starts true, can only be set to false)GetSessionEntryMarkSessionAsKnown(sessionID, endHeight)MarkSessionAsNonRewardable(sessionID)(monotonic downgrade)PruneOutdatedKnownSessionson new block events (endHeight+1 < head ⇒ delete)Performance tracker (instruction timings)
context.Contextwith buffered spans; minimal overhead.instruction,service_id(defaults tounknown, later upgradable).ctx, tracker, flush := relayer.EnsurePerfBuffered(ctx, serviceId)(after service is known)defer flush()defer relayer.Span(ctx, relayer.InstructionHTTPClientDo)()around hot segmentsPublish.Handler workflow
flowchart TD A[HTTP request received] --> B[Parse and basic validate] B -->|error| E1[Return error] B --> C[Resolve service config] C --> D{Eager validation enabled for service} D -->|No| F[Check session cache and mark known if needed] D -->|Yes| E[Eager: rate limit snapshot and signature verify] E -->|fail| E1 E -->|pass| F F --> G[Setup context deadline and write deadline] G --> H[Build backend request] H --> I[Call backend] I -->|error| E1 I --> J[Serialize backend response] J --> K[Send response to client] K --> L{Eager enabled} L -->|No| M[Publish to mining supervisor] L -->|Yes| N[Final eligibility snapshot] N -->|eligible| O[Forward to miner for rewards] N -->|ineligible| P[Do not reward] M --> R[Return 200 OK] O --> R P --> RMining Supervisor
flowchart TD S1[Publish relay] --> S2{Queue has space} S2 -->|Yes| S3[Enqueue and record metric] S2 -->|No| S4{Drop policy is drop-oldest} S4 -->|Yes| S5[Evict one item] S5 --> S6{Retry enqueue} S6 -->|Success| S3 S6 -->|Fail| S7[Drop and record metric] S4 -->|No| S7 subgraph Workers W1[Dequeue relay] --> W2[Over-service snapshot] W2 --> W3[Signature and session verify] W3 -->|invalid| W8[Mark non-rewardable and revert optimistic accounting] W3 -->|valid| W4[Update session cache] W4 --> W5{Rewardable} W5 -->|Yes| W6{Downstream channel available} W6 -->|Yes| W7[Send downstream] W6 -->|No| W9[Drop and mark downstream closed] W5 -->|No| W8 end