Skip to content

Commit 993412b

Browse files
adshmhred-0neOlshansk
authored
[RelayMiner] Use a customized HTTP client in RelayMiner (#1764)
## Summary Use a customized HTTP client in RelayMiner ## Issue The Default HTTP client, used by Relay Miner, is not optimized for high throughput. - Issue_or_PR: #{ISSUE_OR_PR_NUMBER} ## Type of change Select one or more from the following: - [x] New feature, functionality or library - [ ] Bug fix - [ ] Code health or cleanup - [ ] Documentation - [ ] Other (specify) ## Sanity Checklist - [x] I have updated the GitHub Issue Metadata: `assignees`, `reviewers`, `labels`, `project`, `iteration` and `milestone` - [ ] For docs: `make docusaurus_start` - [ ] For small changes: `make go_develop_and_test` and `make test_e2e` - [ ] For major changes: `devnet-test-e2e` label to run E2E tests in CI - [ ] For migration changes: `make test_e2e_oneshot` - [ ] 'TODO's, configurations and other docs --------- Co-authored-by: Redouane Lakrache <[email protected]> Co-authored-by: Daniel Olshansky <[email protected]>
1 parent 8595e93 commit 993412b

File tree

5 files changed

+540
-21
lines changed

5 files changed

+540
-21
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Buffer Pool for High-Concurrency HTTP Processing
2+
// ================================================
3+
//
4+
// This buffer pool manages reusable byte buffers to optimize memory allocation
5+
// for high-throughput HTTP response processing. When handling thousands of
6+
// concurrent HTTP requests with large response bodies (blockchain data often
7+
// exceeds 1MB), naive allocation patterns create significant performance issues.
8+
//
9+
// Memory Allocation Patterns:
10+
// - Without pooling: Each request allocates new []byte buffers
11+
// - With pooling: Buffers are reused across requests via sync.Pool
12+
//
13+
// Benefits:
14+
// - Reduces garbage collection pressure
15+
// - Provides predictable memory usage under load
16+
// - Maintains consistent performance during traffic spikes
17+
// - Size limits prevent memory bloat
18+
//
19+
// The pool automatically grows buffer capacity as needed while preventing
20+
// oversized buffers from being returned to avoid memory waste.
21+
package concurrency
22+
23+
import (
24+
"bytes"
25+
"io"
26+
"sync"
27+
)
28+
29+
const (
30+
// DefaultInitialBufferSize is the initial size of the buffer pool.
31+
// Start with 256KB buffers - can grow as needed
32+
DefaultInitialBufferSize = 256 * 1024
33+
34+
// TODO_IMPROVE: Make this configurable via YAML settings
35+
// DefaultMaxBufferSize is the maximum size of the buffer pool.
36+
// Set the max buffer size to 4MB to avoid memory bloat.
37+
DefaultMaxBufferSize = 4 * 1024 * 1024
38+
)
39+
40+
// BufferPool manages reusable byte buffers to reduce GC pressure.
41+
// Uses sync.Pool for efficient buffer recycling with size limits.
42+
type BufferPool struct {
43+
pool sync.Pool
44+
maxReaderSize int64
45+
}
46+
47+
func NewBufferPool(maxReaderSize int64) *BufferPool {
48+
return &BufferPool{
49+
pool: sync.Pool{
50+
New: func() interface{} {
51+
return bytes.NewBuffer(make([]byte, 0, DefaultInitialBufferSize))
52+
},
53+
},
54+
maxReaderSize: maxReaderSize,
55+
}
56+
}
57+
58+
// getBuffer retrieves a buffer from the pool.
59+
func (bp *BufferPool) getBuffer() *bytes.Buffer {
60+
buf := bp.pool.Get().(*bytes.Buffer)
61+
buf.Reset() // Always reset to ensure clean state
62+
return buf
63+
}
64+
65+
// putBuffer returns a buffer to the pool.
66+
// Buffers larger than maxBufferSize are not returned to avoid memory bloat.
67+
func (bp *BufferPool) putBuffer(buf *bytes.Buffer) {
68+
// Skip pooling oversized buffers to prevent memory bloat
69+
if buf.Cap() > DefaultMaxBufferSize {
70+
return
71+
}
72+
bp.pool.Put(buf)
73+
}
74+
75+
// ReadWithBuffer reads from an io.Reader using a pooled buffer.
76+
func (bp *BufferPool) ReadWithBuffer(r io.Reader) ([]byte, error) {
77+
buf := bp.getBuffer()
78+
defer bp.putBuffer(buf)
79+
80+
limitedReader := io.LimitReader(r, bp.maxReaderSize)
81+
_, err := buf.ReadFrom(limitedReader)
82+
if err != nil {
83+
return nil, err
84+
}
85+
86+
// Return independent copy to avoid data races
87+
result := make([]byte, buf.Len())
88+
copy(result, buf.Bytes())
89+
return result, nil
90+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Concurrency Limiter for Resource Management
2+
// ===========================================
3+
//
4+
// This concurrency limiter implements a semaphore pattern to bound the number
5+
// of concurrent HTTP operations, preventing resource exhaustion under high load.
6+
//
7+
// When processing thousands of simultaneous HTTP requests, unlimited concurrency
8+
// can overwhelm system resources (memory, file descriptors, network connections).
9+
//
10+
// Resource Protection Mechanisms:
11+
// - Semaphore-based admission control using buffered channels
12+
// - Context-aware blocking with cancellation support
13+
// - Real-time tracking of active request counts
14+
// - Graceful degradation when limits are exceeded
15+
//
16+
// Operational Characteristics:
17+
// - Blocks new requests when limit is reached
18+
// - Respects context cancellation for timeout handling
19+
// - Integrates with metrics for observability
20+
// - Thread-safe for concurrent access
21+
//
22+
// The limiter prevents cascading failures by ensuring system resources remain
23+
// available even during traffic spikes or slow downstream services.
24+
package concurrency
25+
26+
import (
27+
"context"
28+
"sync"
29+
)
30+
31+
// TODO_IMPROVE: Make this configurable via settings
32+
const (
33+
defaultMaxConcurrentRequests = 1_000_000
34+
)
35+
36+
// ConcurrencyLimiter bounds concurrent operations via semaphore pattern.
37+
// Prevents resource exhaustion and tracks active request counts.
38+
type ConcurrencyLimiter struct {
39+
semaphore chan struct{}
40+
maxConcurrent int
41+
activeRequests int64
42+
mu sync.RWMutex
43+
}
44+
45+
// NewConcurrencyLimiter creates a limiter that bounds concurrent operations.
46+
func NewConcurrencyLimiter(maxConcurrent int) *ConcurrencyLimiter {
47+
if maxConcurrent <= 0 {
48+
maxConcurrent = defaultMaxConcurrentRequests // Default reasonable limit
49+
}
50+
51+
return &ConcurrencyLimiter{
52+
semaphore: make(chan struct{}, maxConcurrent),
53+
maxConcurrent: maxConcurrent,
54+
}
55+
}
56+
57+
// TODO_TECHDEBT(@adshmh): Track active relays for observability
58+
//
59+
// Acquire blocks until a slot is available or context is canceled.
60+
// Returns true if acquired, false if context was canceled.
61+
func (cl *ConcurrencyLimiter) Acquire(ctx context.Context) bool {
62+
select {
63+
case cl.semaphore <- struct{}{}:
64+
cl.mu.Lock()
65+
cl.activeRequests++
66+
cl.mu.Unlock()
67+
return true
68+
case <-ctx.Done():
69+
return false
70+
}
71+
}
72+
73+
// Release returns a slot to the pool.
74+
func (cl *ConcurrencyLimiter) Release() {
75+
select {
76+
case <-cl.semaphore:
77+
cl.mu.Lock()
78+
cl.activeRequests--
79+
cl.mu.Unlock()
80+
default:
81+
// TODO_TECHDEBT: Log acquire/release mismatch for debugging
82+
}
83+
}

0 commit comments

Comments
 (0)