Skip to content

Commit 92ce3fd

Browse files
add pacer based on wall clock instead of relative differences in duration (#12)
* add pacer based on wall clock instead of relative differences in duration The reason for the change is that errors in pacing add up if we pace relative to the previous request, but do not add up if we always do our calculations relative to the start of the phase. Like measuring 1 meter by measuring 2 millimeters on top of each other 500 times will not be as accurate as measuring the 1 meter in one go. The source of errors is simply execution time. * fix last request's wall time * dummyweb with latency, worker tuning * add idle/max connections * print stats from pacer * spelling --------- Co-authored-by: George Malamidis <[email protected]>
1 parent 2c8eb69 commit 92ce3fd

File tree

6 files changed

+101
-19
lines changed

6 files changed

+101
-19
lines changed

etc/dummyweb.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package main
22

33
import (
4+
"fmt"
45
"log"
6+
"math/rand"
57
"net/http"
68
"time"
79
)
@@ -10,14 +12,15 @@ var count int
1012

1113
func handler(w http.ResponseWriter, r *http.Request) {
1214
count++
15+
time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond)
1316
w.Write([]byte("hi\n"))
1417
}
1518

1619
func main() {
1720
// Crude RPS reporting
1821
go func() {
1922
for {
20-
println(count)
23+
fmt.Printf("rps: %d\n", count)
2124
count = 0
2225
time.Sleep(time.Second)
2326
}

main.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,38 @@ func main() {
3535
silent := flag.Bool("silent", false, "Suppress output")
3636
dryRun := flag.Bool("dry-run", false, "Consume input but do not send HTTP requests to targets")
3737
timeout := flag.Int("timeout", 10, "HTTP client timeout in seconds")
38+
connections := flag.Int("connections", 10000, "Max open idle connections per target host")
39+
maxConnections := flag.Int("max-connections", 0, "Max connections per target host (default unlimited)")
3840
strict := flag.Bool("strict", false, "Panic on bad input")
3941
memprofile := flag.String("memprofile", "", "Write memory profile to `file` before exit")
4042
cpuprofile := flag.String("cpuprofile", "", "Write cpu profile to `file` before exit")
41-
numWorkers := flag.Int("workers", 1000, "Number of client workers to use")
43+
numWorkers := flag.Int("workers", runtime.NumCPU()*2, "Number of client workers to use")
44+
printStatsInterval := flag.Duration("print-stats", 0, `Statistics report interval, e.g., "1m"
45+
46+
Each report line is printed to stderr with the following fields in logfmt format:
47+
48+
report_time
49+
The calculated wall time for when this line should be printed in RFC3339 format.
50+
51+
skew_seconds
52+
Difference between "report_time" and current time in seconds. When the absolute
53+
value of this is higher than about 100ms, it shows that ripley cannot generate
54+
enough load. Consider increasing workers, max connections, and/or CPU and IO requests.
55+
56+
last_request_time
57+
Original request time of the last request in RFC3339 format.
58+
59+
rate
60+
Current rate of playback as specified in "pace" flag.
61+
62+
expected_rps
63+
Expected requests per second since the last report. This will differ from the
64+
actual requests per second if the system is unable to drive that many requests.
65+
If that is the case, consider increasing workers, max connections, and/or
66+
CPU and IO requests.
67+
68+
When 0 (default) or negative, reporting is switched off.
69+
`)
4270

4371
flag.Parse()
4472

@@ -58,7 +86,7 @@ func main() {
5886
defer pprof.StopCPUProfile()
5987
}
6088

61-
exitCode = ripley.Replay(*paceStr, *silent, *dryRun, *timeout, *strict, *numWorkers)
89+
exitCode = ripley.Replay(*paceStr, *silent, *dryRun, *timeout, *strict, *numWorkers, *connections, *maxConnections, *printStatsInterval)
6290

6391
if *memprofile != "" {
6492
f, err := os.Create(*memprofile)

pkg/client.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,19 @@ type Result struct {
3131
ErrorMsg string `json:"error"`
3232
}
3333

34-
func startClientWorkers(numWorkers int, requests <-chan *request, results chan<- *Result, dryRun bool, timeout int) {
34+
func startClientWorkers(numWorkers int, requests <-chan *request, results chan<- *Result, dryRun bool, timeout, connections, maxConnections int) {
3535
client := &http.Client{
3636
Timeout: time.Duration(timeout) * time.Second,
3737
CheckRedirect: func(req *http.Request, via []*http.Request) error {
3838
return http.ErrUseLastResponse
3939
},
40+
Transport: &http.Transport{
41+
MaxIdleConnsPerHost: connections,
42+
MaxConnsPerHost: maxConnections,
43+
},
4044
}
4145

42-
for i := 0; i <= numWorkers; i++ {
46+
for i := 0; i < numWorkers; i++ {
4347
go doHttpRequest(client, requests, results, dryRun)
4448
}
4549
}

pkg/pace.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,23 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
1919
package ripley
2020

2121
import (
22+
"fmt"
23+
"os"
2224
"strconv"
2325
"strings"
2426
"time"
2527
)
2628

2729
type pacer struct {
28-
phases []*phase
29-
lastRequestTime time.Time
30-
done bool
30+
ReportInterval time.Duration
31+
phases []*phase
32+
lastRequestTime time.Time // last request that we already replayed in "log time"
33+
lastRequestWallTime time.Time // last request that we already replayed in "wall time"
34+
phaseStartRequestTime time.Time
35+
phaseStartWallTime time.Time
36+
done bool
37+
requestCounter int
38+
nextReport time.Time
3139
}
3240

3341
type phase struct {
@@ -48,11 +56,16 @@ func newPacer(phasesStr string) (*pacer, error) {
4856
func (p *pacer) start() {
4957
// Run a timer for the first phase's duration
5058
time.AfterFunc(p.phases[0].duration, p.onPhaseElapsed)
59+
if p.ReportInterval > 0 {
60+
p.nextReport = time.Now().Add(p.ReportInterval)
61+
}
5162
}
5263

5364
func (p *pacer) onPhaseElapsed() {
5465
// Pop phase
5566
p.phases = p.phases[1:]
67+
p.phaseStartRequestTime = p.lastRequestTime
68+
p.phaseStartWallTime = p.lastRequestWallTime
5669

5770
if len(p.phases) == 0 {
5871
p.done = true
@@ -63,14 +76,42 @@ func (p *pacer) onPhaseElapsed() {
6376
}
6477

6578
func (p *pacer) waitDuration(t time.Time) time.Duration {
66-
// If there are no more phases left, continue with the last phase's rate
79+
now := time.Now()
80+
6781
if p.lastRequestTime.IsZero() {
6882
p.lastRequestTime = t
83+
p.lastRequestWallTime = now
84+
p.phaseStartRequestTime = p.lastRequestTime
85+
p.phaseStartWallTime = p.lastRequestWallTime
6986
}
7087

71-
duration := t.Sub(p.lastRequestTime)
88+
originalDurationFromPhaseStart := t.Sub(p.phaseStartRequestTime)
89+
expectedDurationFromPhaseStart := time.Duration(float64(originalDurationFromPhaseStart) / p.phases[0].rate)
90+
expectedWallTime := p.phaseStartWallTime.Add(expectedDurationFromPhaseStart)
91+
92+
p.reportStats(now, expectedWallTime)
93+
94+
duration := expectedWallTime.Sub(now)
7295
p.lastRequestTime = t
73-
return time.Duration(float64(duration) / p.phases[0].rate)
96+
p.lastRequestWallTime = expectedWallTime
97+
return duration
98+
}
99+
100+
func (p *pacer) reportStats(now, expectedWallTime time.Time) {
101+
if p.ReportInterval <= 0 {
102+
return
103+
}
104+
for p.nextReport.Before(expectedWallTime) {
105+
fmt.Fprintf(os.Stderr, "report_time=%s skew_seconds=%f last_request_time=%s rate=%f expected_rps=%d\n",
106+
p.nextReport.Format(time.RFC3339),
107+
now.Sub(p.nextReport).Seconds(),
108+
p.lastRequestTime.Format(time.RFC3339),
109+
p.phases[0].rate, // note that this is correct... the phase change itself is incorrect, but its error is minimal with enough requests, and it is simpler
110+
p.requestCounter/int(p.ReportInterval.Seconds()))
111+
p.nextReport = p.nextReport.Add(p.ReportInterval)
112+
p.requestCounter = 0
113+
}
114+
p.requestCounter++
74115
}
75116

76117
// Format is [duration]@[rate] [duration]@[rate]..."

pkg/pace_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
1919
package ripley
2020

2121
import (
22+
"math"
2223
"testing"
2324
"time"
2425
)
@@ -83,15 +84,15 @@ func TestWaitDuration(t *testing.T) {
8384
now := time.Now()
8485
duration := pacer.waitDuration(now)
8586

86-
if duration != 0 {
87-
t.Errorf("duration = %v; want 0", duration)
87+
if duration > 0 {
88+
t.Errorf("duration = %v; want 0 or negative", duration)
8889
}
8990

9091
now = now.Add(2 * time.Second)
9192
duration = pacer.waitDuration(now)
9293
expected := 2 * time.Second
9394

94-
if duration != expected {
95+
if !equalsWithinThreshold(duration, expected, 10*time.Microsecond) {
9596
t.Errorf("duration = %v; want %v", duration, expected)
9697
}
9798
}
@@ -106,15 +107,15 @@ func TestWaitDuration10X(t *testing.T) {
106107
now := time.Now()
107108
duration := pacer.waitDuration(now)
108109

109-
if duration != 0 {
110-
t.Errorf("duration = %v; want 0", duration)
110+
if duration > 0 {
111+
t.Errorf("duration = %v; want 0 or negative", duration)
111112
}
112113

113114
now = now.Add(1 * time.Second)
114115
duration = pacer.waitDuration(now)
115116
expected := time.Second / 10
116117

117-
if duration != expected {
118+
if !equalsWithinThreshold(duration, expected, 10*time.Microsecond) {
118119
t.Errorf("duration = %v; want %v", duration, expected)
119120
}
120121
}
@@ -136,3 +137,7 @@ func TestPacerDoneOnLastPhaseElapsed(t *testing.T) {
136137
t.Errorf("pacer.done = %v; want true", pacer.done)
137138
}
138139
}
140+
141+
func equalsWithinThreshold(d1, d2, threshold time.Duration) bool {
142+
return math.Abs(float64(d1-d2)) <= float64(threshold)
143+
}

pkg/replay.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"time"
2828
)
2929

30-
func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, numWorkers int) int {
30+
func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, numWorkers, connections, maxConnections int, printStatsInterval time.Duration) int {
3131
// Default exit code
3232
var exitCode int = 0
3333
// Ensures we have handled all HTTP request results before exiting
@@ -43,6 +43,7 @@ func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, num
4343

4444
// The pacer controls the rate of replay
4545
pacer, err := newPacer(phasesStr)
46+
pacer.ReportInterval = printStatsInterval
4647

4748
if err != nil {
4849
panic(err)
@@ -52,7 +53,7 @@ func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, num
5253
scanner := bufio.NewScanner(bufio.NewReaderSize(os.Stdin, 32*1024*1024))
5354

5455
// Start HTTP client goroutine pool
55-
startClientWorkers(numWorkers, requests, results, dryRun, timeout)
56+
startClientWorkers(numWorkers, requests, results, dryRun, timeout, connections, maxConnections)
5657
pacer.start()
5758

5859
// Goroutine to handle the HTTP client result

0 commit comments

Comments
 (0)