Skip to content

Conversation

@jorgecuesta
Copy link
Contributor

@jorgecuesta jorgecuesta commented Oct 4, 2025

This branch is build on top of #1785 so should be review/merge only after that one is on.

Summary

Introduce an in-process mining supervisor to run delayed validations after writing the client response, adopt per-service eager validation (with a global default that services can override), and switch hot maps/queues to xsync for lower latency and contention.

Primary Changes

  • Mining Supervisor (new)

    • Bounded queue + worker pool; non-blocking enqueue (or short timeout) with optional drop-oldest.
    • Non-blocking downstream send; rate-limited warnings when dropping under backpressure.
    • Session fast-path cache (xsync.Map[string]*SessionCache) tracking {EndHeight, Rewardable}.
    • Instruction timings for enqueue/evict/timeout wait, processing, session upsert, downstream send.
  • Per-service eager validation

    • Handler reads the service config first and decides:
      • If enable_eager_relay_request_validation is true for that service: do rate-limit snapshot + signature/session verification before the backend call.
      • If false: write the response fast, then publish to the mining supervisor for delayed validation + accounting.
    • A global default exists and services may override it.
  • Hot path concurrency via xsync

    • Replace RWMutex maps with xsync.Map, and fan-in buffering with xsync.MPMCQueue where appropriate.

Secondary Changes

  • Lightweight performance tracker
    • Per-request tracker with Start/Finish/Span/Flush, nested spans, buffered emission, and a service_id label for instruction timings.

Issue

  • The latency added before and after the backend call was spiking the response times of the RM.

Type of change

  • New feature, functionality or library
  • Bug fix
  • Code health or cleanup
  • Documentation
  • Other (specify)

Sanity Checklist

  • I have updated the GitHub Issue Metadata: assignees, reviewers, labels, project, iteration and milestone
  • For docs: make docusaurus_start
  • For small changes: make go_develop_and_test and make test_e2e
  • For major changes: add devnet-test-e2e label to run E2E tests in CI
  • For migration changes: make test_e2e_oneshot
  • 'TODO's, configurations and other docs

Configuration (YAML)

# Global default; services can override per entry
default_enable_eager_relay_request_validation: true

mining_supervisor:
  queue_size: 2000
  workers: 8
  enqueue_timeout_ms: 50
  drop_policy: "drop-oldest"          # or "drop-new"
  gauge_sample_interval_ms: 100
  drop_log_interval_ms: 5

suppliers:
  - service_id: static
    listen_url: http://0.0.0.0:8545
    enable_eager_relay_request_validation: false   # per-service override
    service_config:
      backend_url: http://localhost:8548/
      forward_pocket_headers: false

  - service_id: anvil
    listen_url: http://0.0.0.0:8545
    # inherits global default (true in this example)
    service_config:
      backend_url: http://localhost:8547
      forward_pocket_headers: false

Performance snapshot

In this snapshot can clearly observe how the latency spike a bit up on a 300rps, the added latency is not that much (~5ms) considering than this is doing eager validation (check eager-on screenshot). That spike will be solved with a PR from @red-0ne #1810 which I personally already set on top of this one, getting as result the latency on spike on less that ~1ms.

eager-on image

Risk / Compatibility

  • Mining Supervisor is a new, internal component behind configuration; no on-chain or external API changes.
  • Per-service eager validation is config-driven (global default with service-level override).
  • Metrics only add new series/labels; existing names remain.

Implementation Notes

Enqueue & backpressure behavior

  • Default: non-blocking enqueue; if full and drop-oldest is enabled, evict one and retry once.
  • Timeout path: with enqueue_timeout_ms > 0, wait briefly before deciding to evict or drop.
  • Metrics:
    • mining_queue_enqueued_total{service_id}
    • mining_queue_dropped_total{service_id,reason} with reasons like full, evicted, timeout, timeout_full_after_evict, stopped_or_closed, context
    • mining_queue_len
  • Logs: rate-limited warning when dropping due to backpressure (reason, queue len/cap).

Session cache semantics (fast path in handler)

  • xsync.Map[string]*SessionCache with:
    • EndHeight int64 (updated up, never down)
    • Rewardable atomic.Bool (starts true, can only be set to false)
  • API:
    • GetSessionEntry
    • MarkSessionAsKnown(sessionID, endHeight)
    • MarkSessionAsNonRewardable(sessionID) (monotonic downgrade)
    • PruneOutdatedKnownSessions on new block events (endHeight+1 < head ⇒ delete)
  • Effect: subsequent requests in the same session can skip unnecessary checks or be fast-rejected if already non-rewardable.

Performance tracker (instruction timings)

  • Per-request tracker on context.Context with buffered spans; minimal overhead.
  • Labels: instruction, service_id (defaults to unknown, later upgradable).
  • Typical handler usage:
    • ctx, tracker, flush := relayer.EnsurePerfBuffered(ctx, serviceId) (after service is known)
    • defer flush()
    • defer relayer.Span(ctx, relayer.InstructionHTTPClientDo)() around hot segments
  • Supervisor uses the same tracker when the handler passes context into Publish.

Handler workflow

flowchart TD
    A[HTTP request received] --> B[Parse and basic validate]
    B -->|error| E1[Return error]
    B --> C[Resolve service config]
    C --> D{Eager validation enabled for service}
    D -->|No| F[Check session cache and mark known if needed]
    D -->|Yes| E[Eager: rate limit snapshot and signature verify]
    E -->|fail| E1
    E -->|pass| F
    F --> G[Setup context deadline and write deadline]
    G --> H[Build backend request]
    H --> I[Call backend]
    I -->|error| E1
    I --> J[Serialize backend response]
    J --> K[Send response to client]
    K --> L{Eager enabled}
    L -->|No| M[Publish to mining supervisor]
    L -->|Yes| N[Final eligibility snapshot]
    N -->|eligible| O[Forward to miner for rewards]
    N -->|ineligible| P[Do not reward]
    M --> R[Return 200 OK]
    O --> R
    P --> R
Loading

Mining Supervisor

flowchart TD
    S1[Publish relay] --> S2{Queue has space}
    S2 -->|Yes| S3[Enqueue and record metric]
    S2 -->|No| S4{Drop policy is drop-oldest}
    S4 -->|Yes| S5[Evict one item]
    S5 --> S6{Retry enqueue}
    S6 -->|Success| S3
    S6 -->|Fail| S7[Drop and record metric]
    S4 -->|No| S7

    subgraph Workers
        W1[Dequeue relay] --> W2[Over-service snapshot]
        W2 --> W3[Signature and session verify]
        W3 -->|invalid| W8[Mark non-rewardable and revert optimistic accounting]
        W3 -->|valid| W4[Update session cache]
        W4 --> W5{Rewardable}
        W5 -->|Yes| W6{Downstream channel available}
        W6 -->|Yes| W7[Send downstream]
        W6 -->|No| W9[Drop and mark downstream closed]
        W5 -->|No| W8
    end
Loading

@jorgecuesta jorgecuesta requested a review from red-0ne October 4, 2025 07:10
@jorgecuesta jorgecuesta changed the title Experiment/eager optimistic metrics RM: mining supervisor, per-service eager validation, xsync fast paths Oct 4, 2025
@jorgecuesta jorgecuesta self-assigned this Oct 4, 2025
@jorgecuesta jorgecuesta added relayminer Changes related to the Relayminer code health Cleans up some code performance labels Oct 4, 2025
@github-project-automation github-project-automation bot moved this to 📋 Backlog in Shannon Oct 4, 2025
@jorgecuesta jorgecuesta added this to the RelayMiner Stability milestone Oct 4, 2025
@jorgecuesta jorgecuesta moved this from 📋 Backlog to 👀 In review in Shannon Oct 4, 2025
@jorgecuesta
Copy link
Contributor Author

📈 Benchmark: Defer validation/mining work until after writing the response

This microbench simulates three server behaviors:

  • NoFlush – do the heavy work (validation/mining-ish) before writing the response (bad for TTFB).
  • Flush – write the first chunk immediately, then do the heavy work (good TTFB; total unchanged).
  • Baseline – minimal overhead (control).
handler_flush_test.go
package demo

import (
    "bufio"
    "io"
    "net/http"
    "net/http/httptest"
    "testing"
    "time"
)

var (
    sleepDur   = 50 * time.Millisecond
    firstChunk = []byte("FIRST\n")
    lastChunk  = []byte("LAST\n")
)

// helper: start server
func startServer(h http.HandlerFunc) (*httptest.Server, *http.Client) {
    ts := httptest.NewServer(h)
    return ts, ts.Client()
}

// measure only reads the first N bytes (time-to-first-chunk)
func measureReadN(b *testing.B, c *http.Client, url string, n int) {
    b.Helper()
    buf := make([]byte, n)
    for i := 0; i < b.N; i++ {
        resp, err := c.Get(url)
        if err != nil {
            b.Fatalf("GET failed: %v", err)
        }
        r := bufio.NewReader(resp.Body)
        if _, err := io.ReadFull(r, buf); err != nil {
            b.Fatalf("ReadFull: %v", err)
        }
        // Don’t wait for the rest of the body to demonstrate early return.
        _ = resp.Body.Close()
    }
}

// measureFull reads the full body (EOF or Content-Length)
func measureFull(b *testing.B, c *http.Client, url string) {
    b.Helper()
    for i := 0; i < b.N; i++ {
        resp, err := c.Get(url)
        if err != nil {
            b.Fatalf("GET failed: %v", err)
        }
        _, _ = io.Copy(io.Discard, resp.Body)
        _ = resp.Body.Close()
    }
}

// Handler variants

// No flush: write first chunk then keep working; in practice the first bytes
// often won't be observable by the client until return.
func handlerNoFlush(w http.ResponseWriter, r *http.Request) {
    w.WriteHeader(http.StatusOK)
    _, _ = w.Write(firstChunk)
    time.Sleep(sleepDur) // simulate post-response work
    _, _ = w.Write(lastChunk)
}

// Flush after first chunk: lets client observe the response early.
func handlerFlush(w http.ResponseWriter, r *http.Request) {
    w.WriteHeader(http.StatusOK)
    _, _ = w.Write(firstChunk)

    // Flush explicitly so the client can read FIRST immediately.
    if f, ok := w.(http.Flusher); ok {
        f.Flush()
        // Alternatively (Go 1.20+): http.NewResponseController(w).Flush()
    }

    time.Sleep(sleepDur) // simulate post-response work
    _, _ = w.Write(lastChunk)
}

// Baseline: no extra work.
func handlerBaseline(w http.ResponseWriter, r *http.Request) {
    w.WriteHeader(http.StatusOK)
    _, _ = w.Write(firstChunk)
    _, _ = w.Write(lastChunk)
}

// Benchmarks

func Benchmark_NoFlush_ReadFirstChunk(b *testing.B) {
    ts, c := startServer(handlerNoFlush)
    defer ts.Close()
    measureReadN(b, c, ts.URL, len(firstChunk))
}

func Benchmark_Flush_ReadFirstChunk(b *testing.B) {
    ts, c := startServer(handlerFlush)
    defer ts.Close()
    measureReadN(b, c, ts.URL, len(firstChunk))
}

func Benchmark_Baseline_ReadFirstChunk(b *testing.B) {
    ts, c := startServer(handlerBaseline)
    defer ts.Close()
    measureReadN(b, c, ts.URL, len(firstChunk))
}

func Benchmark_NoFlush_ReadFull(b *testing.B) {
    ts, c := startServer(handlerNoFlush)
    defer ts.Close()
    measureFull(b, c, ts.URL)
}

func Benchmark_Flush_ReadFull(b *testing.B) {
    ts, c := startServer(handlerFlush)
    defer ts.Close()
    measureFull(b, c, ts.URL)
}

func Benchmark_Baseline_ReadFull(b *testing.B) {
    ts, c := startServer(handlerBaseline)
    defer ts.Close()
    measureFull(b, c, ts.URL)
}

Observed on AMD Ryzen 9 5950X (Linux, Go bench -32):

Benchmark_NoFlush_ReadFirstChunk-32    ~50,272,682 ns/op
Benchmark_Flush_ReadFirstChunk-32         ~129,078 ns/op
Benchmark_Baseline_ReadFirstChunk-32       ~44,888 ns/op

Benchmark_NoFlush_ReadFull-32           ~50,277,057 ns/op
Benchmark_Flush_ReadFull-32             ~50,275,506 ns/op
Benchmark_Baseline_ReadFull-32              ~45,569 ns/op

What this tells us

  • Writing the first bytes before doing heavy work improves TTFB by ~389× (≈50ms → ≈0.13ms) and gets within ~0.08–0.09ms of the minimal baseline.
  • Reading the full body is unchanged (as expected): total work still needs to happen; we’re just not blocking the client’s first byte.
  • This is the motivation for moving validation/mining into the Mining Supervisor and keeping the handler hot path focused on getting the response out.

@jorgecuesta
Copy link
Contributor Author

⚡ Benchmark: xsync.Map vs RWMutex for map[string]int64 under read-heavy load (with/without pruning)

This benchmark contrasts a classic map+RWMutex against xsync.Map[string,int64] for read-heavy and mixed loads, including a periodic “prune/touch all keys” pass to simulate a maintenance task.

sessions_bench_test.go
// sessions_bench_test.go
package demo

import (
    "context"
    "fmt"
    "math/rand"
    "runtime"
    "sync"
    "sync/atomic"
    "testing"
    "time"

    "github.com/puzpuzpuz/xsync/v4"
)

// --- configuration ---
const (
    keyCount   = 10_000 // number of session keys preloaded
    pruneEvery = 250 * time.Microsecond
    rngSalt    = 0x9e3779b97f4a7c15
)

var goroutineSeq uint64 // unique id per RunParallel worker

// ------------------------------------

func makeKeys(n int) []string {
    keys := make([]string, n)
    for i := 0; i < n; i++ {
        keys[i] = fmt.Sprintf("sess-%08d", i)
    }
    return keys
}

// ===== Implementation A: map + RWMutex =====

type rwMap struct {
    mu sync.RWMutex
    m  map[string]int64
}

func newRWMap(keys []string) *rwMap {
    m := make(map[string]int64, len(keys))
    for i, k := range keys {
        m[k] = int64(i)
    }
    return &rwMap{m: m}
}

func (s *rwMap) Load(k string) (int64, bool) {
    s.mu.RLock()
    v, ok := s.m[k]
    s.mu.RUnlock()
    return v, ok
}

func (s *rwMap) Store(k string, v int64) {
    s.mu.Lock()
    s.m[k] = v
    s.mu.Unlock()
}

// PruneTouchAll simulates a pruning pass by touching every key under write lock.
// This is intentionally write-locked to demonstrate reader blocking under contention.
func (s *rwMap) PruneTouchAll() {
    s.mu.Lock()
    for k, v := range s.m {
        s.m[k] = v + 1
        _ = k
    }
    s.mu.Unlock()
}

// ===== Implementation B: xsync.Map =====

type xMap struct {
    m *xsync.Map[string, int64]
}

func newXMap(keys []string) *xMap {
    m := xsync.NewMap[string, int64]()
    for i, k := range keys {
        m.Store(k, int64(i))
    }
    return &xMap{m: m}
}

func (s *xMap) Load(k string) (int64, bool) {
    return s.m.Load(k)
}

func (s *xMap) Store(k string, v int64) {
    s.m.Store(k, v)
}

// PruneTouchAll simulates a pruning pass by ranging without blocking readers.
func (s *xMap) PruneTouchAll() {
    s.m.Range(func(k string, v int64) bool {
        s.m.Store(k, v+1)
        return true
    })
}

// ===== shared harness =====

type kvStore interface {
    Load(string) (int64, bool)
    Store(string, int64)
    PruneTouchAll()
}

func runMixed(b *testing.B, store kvStore, keys []string, readPct int, withPrune bool) {
    ctx, cancel := context.WithCancel(context.Background())
    var wg sync.WaitGroup
    if withPrune {
        wg.Add(1)
        go func() {
            defer wg.Done()
            t := time.NewTicker(pruneEvery)
            defer t.Stop()
            for {
                select {
                case <-ctx.Done():
                    return
                case <-t.C:
                    store.PruneTouchAll()
                }
            }
        }()
    }

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        // unique per-goroutine seed
        id := atomic.AddUint64(&goroutineSeq, 1)
        seed := int64(uint64(time.Now().UnixNano()) ^
                (rngSalt * id) ^
                uint64(runtime.GOMAXPROCS(0)))
        r := rand.New(rand.NewSource(seed))

        n := len(keys)
        for pb.Next() {
            k := keys[r.Intn(n)]
            if r.Intn(100) < readPct {
                _, _ = store.Load(k)
            } else {
                store.Store(k, int64(r.Int31()))
            }
        }
    })

    cancel()
    wg.Wait()
    b.StopTimer()
}

// ===== Bench suites =====

func Benchmark_RWMutex_Read100(b *testing.B) {
    keys := makeKeys(keyCount)
    runMixed(b, newRWMap(keys), keys, 100, false)
}

func Benchmark_XSync_Read100(b *testing.B) {
    keys := makeKeys(keyCount)
    runMixed(b, newXMap(keys), keys, 100, false)
}

func Benchmark_RWMutex_Read99Write1(b *testing.B) {
    keys := makeKeys(keyCount)
    runMixed(b, newRWMap(keys), keys, 99, false)
}

func Benchmark_XSync_Read99Write1(b *testing.B) {
    keys := makeKeys(keyCount)
    runMixed(b, newXMap(keys), keys, 99, false)
}

func Benchmark_RWMutex_Read75Write25(b *testing.B) {
    keys := makeKeys(keyCount)
    runMixed(b, newRWMap(keys), keys, 75, false)
}

func Benchmark_XSync_Read75Write25(b *testing.B) {
    keys := makeKeys(keyCount)
    runMixed(b, newXMap(keys), keys, 75, false)
}

// --- With periodic "prune" (touch-all) to show reader stalls vs. xsync's range ---

func Benchmark_RWMutex_Read99Write1_Prune(b *testing.B) {
    keys := makeKeys(keyCount)
    runMixed(b, newRWMap(keys), keys, 99, true)
}

func Benchmark_XSync_Read99Write1_Prune(b *testing.B) {
    keys := makeKeys(keyCount)
    runMixed(b, newXMap(keys), keys, 99, true)
}

Observed on AMD Ryzen 9 5950X (Linux, Go bench -32):

Read-only:         RWMutex ~28.98 ns/op   vs   XSync ~1.946 ns/op   → ~14.9× faster
99% reads / 1% writes:
                   RWMutex ~61.91 ns/op   vs   XSync ~2.265 ns/op   → ~27.3× faster
75% reads / 25% writes:
                   RWMutex ~53.06 ns/op   vs   XSync ~7.491 ns/op   → ~7.1× faster
99/1 with prune:   RWMutex ~103.1 ns/op   vs   XSync ~2.442 ns/op   → ~42.2× faster

What this tells us

  • xsync.Map removes global lock contention in read-heavy paths and during “touch-all” maintenance, which is exactly our session-cache and “known sessions” use case.
  • The speedups hold across mixes; the biggest wins are when a periodic write/maintenance (prune) would otherwise block readers.

@jorgecuesta
Copy link
Contributor Author

Instruction timing micro benchmarks (perf tracker)

This comment documents the microbenchmarks for the new relayer.PerfTracker used to record per-instruction timings (with "instruction" + "service_id" labels).

What these benches measure

  • Immediate mode: Finish() observes directly (intended for low-volume paths).
  • Buffered mode: Finish() enqueues to a lock-free MPMC queue; a single Flush() emits all spans (intended for hot paths—handler & mining supervisor).
  • Three call styles are compared:
    • Instance: call tr.Start/Finish.
    • Span helper: defer relayer.Span(ctx, "name")().
    • Pkg helpers: relayer.Start/Finish(ctx, "name").

Observed on AMD Ryzen 9 5950X (Linux, Go bench -32):

BenchmarkPerf_Instance_Buffered-32   ~180 ns/op   ~96  B/op   2 allocs/op
BenchmarkPerf_Span_Buffered-32       ~206 ns/op   ~128 B/op   3 allocs/op
BenchmarkPerf_Pkg_Buffered-32        ~189 ns/op   ~96  B/op   2 allocs/op

BenchmarkPerf_Instance_Immediate-32  ~152 ns/op   ~96  B/op   2 allocs/op
BenchmarkPerf_Span_Immediate-32      ~175 ns/op   ~128 B/op   3 allocs/op
BenchmarkPerf_Pkg_Immediate-32       ~158 ns/op   ~96  B/op   2 allocs/op

BenchmarkPerf_SetServiceID-32         ~21 ns/op   ~16  B/op   1 alloc/op

Takeaways

  • Overhead per timing event is sub-0.25 µs in both modes.
    Buffered adds a small enqueue cost; immediate adds the direct observe cost. Both are negligible compared to request latencies.

  • Span(ctx,"x") (the defer helper) is only ~25 ns slower than calling the tracker directly—use it freely where it improves clarity.

  • Service ID upgrades are cheap (~21 ns per call) and used when the service is discovered later in the handler.

  • We default to buffered mode in hot paths (handler + mining supervisor) and flush once near the end, minimizing metric write amplification.

Why this matters

  • We can instrument many fine-grained steps without impacting tail latency.
  • Timers are nesting-safe (stacks per name) and service-aware (label includes service_id).
  • Labels are pre-bound on the tracker to avoid repeated map allocations in the hot path.

Caveats

  • Benchmarks use a no-op observer in immediate mode; real Prometheus export will be slightly higher but still far below millisecond scale.
  • If you enable extremely high-cardinality instruction names, you’ll pay for it in the Prom backend rather than here.

@Olshansk Olshansk self-requested a review October 6, 2025 17:16
Copy link
Collaborator

@Olshansk Olshansk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgecuesta This is a preliminary review/comment before we dive in.

  1. Can you shorted the github description (or maybe add a diagram) so it's clear what the change (and why).
  2. If you see opportunity to split this into smaller PRs, I'd appreciate it!
  3. Can you update this with the base when you can?

If I left comments which are non-relevant (i.e. not done by you), I apoligize in advance! Feel free to simply resolve them without a comment.

I'll also send a message on discord to make sure we're all aligned.

@red-0ne red-0ne changed the base branch from experiment/test-rm-rps to main October 9, 2025 11:27
… minimum latency.

- Redo Instruction Timer
- Update KeyValues cache
- Introduce xsync to avoid using raw Lock Mutex
- Introduce eager validation per service config
- Introduce full optimistic delayed workflow using Mining Supervisor
- Updated localnet files to use query cache
- Updated localnet files to been able to set eager validations on/off
- Cleanup instruction timer names.
@jorgecuesta jorgecuesta force-pushed the experiment/eager-optimistic-metrics branch from c8ea932 to 095ca65 Compare October 10, 2025 21:55
…essions_manager.go` and renamed to performance_tracker.go as suggested by Olshansky.
@jorgecuesta jorgecuesta requested a review from Olshansk October 10, 2025 22:09
@jorgecuesta
Copy link
Contributor Author

1. Can you shorted the github description (or maybe add a diagram) so it's clear what the change (and why).

Description of the PR is short enough, I add way more context below in comments to avoid make the PR description too verbose. Also there are diagrams already.

2. If you see opportunity to split this into smaller PRs, I'd appreciate it!

This will not be possible right now at my current times, also all of them work together to achieve the 0.2ms extra on EAGER validation. So I don't like splitting them since that will create a lot of more work rebasing them later on each main pr you merge.

3. Can you update this with the base when you can?

Done!

Copy link
Collaborator

@Olshansk Olshansk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgecuesta Note that I'm still not done.

Screenshot 2025-10-14 at 6 48 55 PM

Overall, it's looking good but I'll really get into the "meat of things" tomorrow.


Review info:

  1. First review was "preliminary".
  2. Second review is me building context and providing NITs.
  3. I anticipate 1 or 2 more reviews given the size, context and importance of this.

In terms of next steps, going to outline it to move fast.

  1. Can you PTAL at #1837 and make any changes you need/want to it.
  2. Once #1837 looks good, please squash & merge it into this PR.
  3. After (2), please look at the comments I have in this PR and either tend, close or comment.
  4. In parallel to (1,2,3), I'll start looking at pkg/relayer/performance_tracker.go and pkg/relayer/proxy/mining_supervisor.go in more detail to understand it.
  5. Once all of the above are done, I'll tap in @red-0ne to make sure his changes are not overrriden by the changes in this PR.

--

Would appreciate if you can priortize it ASAP to make sure it's part of the beta release, which we have a cutoff for this friday.

@@ -1,9 +1,9 @@
package query

import (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file seem to be unsafe & incomplete.

Can you:

  1. Delete unnecessary commented out things
  2. Either add back the mutexes or use xsync
  3. Explain (in the comments in the code, not the github response) why we don't need locks here?

blockHeightCurrent = "block_height_current"
instructionTimeSeconds = "instruction_time_seconds"

// TODO: implement metrics for the following:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this TODO?

Help: "Total relays enqueued for mining, labeled by service_id.",
}, []string{"service_id"})

MiningQueueDroppedTotal = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#PUC (Please Update Comment) how/why/when things get dropped from the queue.

)

// YAMLRelayMinerConfig is the structure used to unmarshal the RelayMiner config file
type YAMLRelayMinerConfig struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the docs if you're updating configs: https://dev.poktroll.com/operate/configs/relayminer_config

panic("RelayMiningSupervisor: downstream channel must not be nil")
}
if cfg.QueueSize <= 0 {
cfg.QueueSize = 10_000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need "magic numbers" or "magic constants" (e.g. defaults), let's move it into a local constant.

cfg.DropPolicy = config.DefaultMSDropPolicy
}
if cfg.GaugeSampleInterval <= 0 {
cfg.GaugeSampleInterval = 200 * time.Millisecond
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we're duplicating logic that the hydrator is already doing.

Can you consolidate the logic so defaults are only configured in one place?


ctx, cancel := context.WithCancel(context.Background())

s := &RelayMiningSupervisor{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not store the entire relay mining supervisor config int this structuct?

Feels like it'll simplify things.

Dur("drop_log_interval", cfg.DropLogInterval).
Msg("relay mining supervisor started")

// workers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you #PUC on not just "what" this is but "why" this is.

go s.worker(i)
}

// gauge sampler (keeps hot path clean)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Contributor

@red-0ne red-0ne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posting a partial review, will get back to the rest of it soon


cachedValue, exists := c.values[key]
if !exists {
v, ok := c.values.Load(key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
v, ok := c.values.Load(key)
cachedValue, exists := c.values.Load(key)

return zero, false
}

isCacheValueExpired := time.Since(cachedValue.cachedAt) > c.config.ttl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we're removing intermediate variables? I believe it's a net positive for readability

c.values[key] = cacheValue[T]{
value: value,
cachedAt: time.Now(),
if c.config.maxKeys > 0 && int64(c.values.Size()) > c.config.maxKeys {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to already exist in evictKey.

Any reason it's duplicated here?

isMaxKeysConfigured := c.config.maxKeys > 0
cacheMaxKeysReached := int64(len(c.values)) > c.config.maxKeys
if !isMaxKeysConfigured || !cacheMaxKeysReached {
if c.config.maxKeys <= 0 || int64(c.values.Size()) <= c.config.maxKeys {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer keeping the intermediate variables. I believe it's a net positive for readability.


// 1) Prefer to evict any TTL-expired entry (cheap scan, remove one).
var expiredKey string
c.values.Range(func(k string, v cacheValue[T]) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think going TTL first might have a negative impact on eviction performance.

If no expired entry is found we would have scanned the entire cache for nothing.

This is especially true for update intensive scenarios.

return
}

// !!! CRITICAL MUTEX !!!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checked.

The usage of xsync allows for the removal of the locks.

Although it does not guarantee a consistent snapshot of the state of the map, it does a good enough job to remove the outdated removing outdated relayMeters

rmtr.sessionToRelayMeterMap[sessionId] = relayMeter
rmtr.relayMeterMu.Unlock()
// Try to publish; if someone beat us, reuse theirs.
if existing, loaded := rmtr.sessionToRelayMeterMap.LoadOrStore(sessionId, relayMeter); loaded {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LoadOrStore returns the newly created value if an entry does not exist.

Why not do the following?

Suggested change
if existing, loaded := rmtr.sessionToRelayMeterMap.LoadOrStore(sessionId, relayMeter); loaded {
relayMeter = rmtr.sessionToRelayMeterMap.LoadOrStore(sessionId, relayMeter)

// Build a new entry (same logic you already have)
appAddress := reqMeta.GetSessionHeader().GetApplicationAddress()

// In order to prevent over-servicing, the protocol must split the application's stake
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Olshansk , all of the relayMeter logic is moved below and xsync is substituting to the locks.

rp.blockClient,
rp.sharedQuerier,
rp.sessionQuerier,
miningSup,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
miningSup,
miningSupervisor,

query_node_rpc_url: tcp://localhost:26657
query_node_grpc_url: tcp://localhost:9090
tx_node_rpc_url: tcp://localhost:26657
mining_supervisor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are those values different from the defaults?

@jorgecuesta
Copy link
Contributor Author

  1. Can you PTAL at [Review] Review of #1819 #1837 and make any changes you need/want to it.

#1837

Changes to accompany review #2 of #1819
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

code health Cleans up some code performance relayminer Changes related to the Relayminer

Projects

Status: 👀 In review

Development

Successfully merging this pull request may close these issues.

4 participants