Skip to content

Commit

Permalink
Add Tracing Request Sink to E2E (prysmaticlabs#9341)
Browse files Browse the repository at this point in the history
* add tracing request sink

* fix struct order

* add in base64 encode and gzip

* add encoding and gzip

* tracing sink and replay tool

* post

* replay

* include latest sink and replay tool

* capture the gzout file instead

* rem time sleep

* handle err

* better handling

* add documentation

* changes

* working sync

* working

* added more logging
  • Loading branch information
rauljordan authored Aug 11, 2021
1 parent 0dd7a8b commit c128086
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 8 deletions.
1 change: 1 addition & 0 deletions endtoend/components/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"eth1.go",
"log.go",
"slasher.go",
"tracing_sink.go",
"validator.go",
],
importpath = "github.com/prysmaticlabs/prysm/endtoend/components",
Expand Down
106 changes: 106 additions & 0 deletions endtoend/components/tracing_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package components

import (
"bytes"
"context"
"encoding/base64"
"io"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/prysmaticlabs/prysm/endtoend/helpers"
e2e "github.com/prysmaticlabs/prysm/endtoend/params"
)

// TracingSink to capture HTTP requests from opentracing pushes. This is meant
// to capture all opentracing spans from Prysm during an end-to-end test. Spans
// are normally sent to a jaeger (https://www.jaegertracing.io/docs/1.25/getting-started/)
// endpoint, but here we instead replace that with our own http request sink.
// The request sink receives any requests, raw marshals them and base64-encodes them,
// then writes them newline-delimited into a file.
//
// The output file from this component can then be used by tools/replay-http in
// the Prysm repository to replay requests to a jaeger collector endpoint. This
// can then be used to visualize the spans themselves in the jaeger UI.
type TracingSink struct {
started chan struct{}
endpoint string
server *http.Server
}

// NewTracingSink initializes the tracing sink component.
func NewTracingSink(endpoint string) *TracingSink {
return &TracingSink{
started: make(chan struct{}, 1),
endpoint: endpoint,
}
}

// Start the tracing sink.
func (ts *TracingSink) Start(ctx context.Context) error {
go ts.initializeSink()
close(ts.started)
return nil
}

// Started checks whether a tracing sink is started and ready to be queried.
func (ts *TracingSink) Started() <-chan struct{} {
return ts.started
}

// Initialize an http handler that writes all requests to a file.
func (ts *TracingSink) initializeSink() {
ts.server = &http.Server{Addr: ts.endpoint}
defer func() {
if err := ts.server.Close(); err != nil {
log.WithError(err).Error("Failed to close http server")
return
}
}()
stdOutFile, err := helpers.DeleteAndCreateFile(e2e.TestParams.LogPath, e2e.TracingRequestSinkFileName)
if err != nil {
log.WithError(err).Error("Failed to create stdout file")
return
}
cleanup := func() {
if err := stdOutFile.Close(); err != nil {
log.WithError(err).Error("Could not close stdout file")
}
}

http.HandleFunc("/", func(_ http.ResponseWriter, r *http.Request) {
if err := captureRequest(stdOutFile, r); err != nil {
log.WithError(err).Error("Failed to capture http request")
return
}
})
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
cleanup()
os.Exit(0)
}()
if err := ts.server.ListenAndServe(); err != http.ErrServerClosed {
log.WithError(err).Error("Failed to serve http")
}
}

// Captures raw requests in base64 encoded form in a line-delimited file.
func captureRequest(f io.Writer, r *http.Request) error {
buf := bytes.NewBuffer(nil)
err := r.Write(buf)
if err != nil {
return err
}
encoded := make([]byte, base64.StdEncoding.EncodedLen(len(buf.Bytes())))
base64.StdEncoding.Encode(encoded, buf.Bytes())
encoded = append(encoded, []byte("\n")...)
_, err = f.Write(encoded)
if err != nil {
return err
}
return nil
}
7 changes: 6 additions & 1 deletion endtoend/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func (r *testRunner) run() {
ctx, done := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

tracingSink := components.NewTracingSink(config.TracingSinkEndpoint)
g.Go(func() error {
return tracingSink.Start(ctx)
})

// ETH1 node.
eth1Node := components.NewEth1Node()
g.Go(func() error {
Expand Down Expand Up @@ -129,7 +134,7 @@ func (r *testRunner) run() {

// Wait for all required nodes to start.
requiredComponents := []e2etypes.ComponentRunner{
eth1Node, bootNode, beaconNodes, validatorNodes,
tracingSink, eth1Node, bootNode, beaconNodes, validatorNodes,
}
if config.TestSlasher && slasherNodes != nil {
requiredComponents = append(requiredComponents, slasherNodes)
Expand Down
6 changes: 5 additions & 1 deletion endtoend/minimal_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ func e2eMinimal(t *testing.T, usePrysmSh bool) {
epochsToRun, err = strconv.Atoi(epochStr)
require.NoError(t, err)
}

const tracingEndpoint = "127.0.0.1:9411"
testConfig := &types.E2EConfig{
BeaconFlags: []string{
fmt.Sprintf("--slots-per-archive-point=%d", params.BeaconConfig().SlotsPerEpoch*16),
fmt.Sprintf("--tracing-endpoint=http://%s", tracingEndpoint),
"--enable-tracing",
"--trace-sample-fraction=1.0",
},
ValidatorFlags: []string{},
EpochsToRun: uint64(epochsToRun),
Expand All @@ -46,6 +49,7 @@ func e2eMinimal(t *testing.T, usePrysmSh bool) {
TestSlasher: true,
UsePrysmShValidator: usePrysmSh,
UsePprof: !longRunning,
TracingSinkEndpoint: tracingEndpoint,
Evaluators: []types.Evaluator{
ev.PeersConnect,
ev.HealthzCheck,
Expand Down
3 changes: 3 additions & 0 deletions endtoend/params/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ var TestParams *params
// BootNodeLogFileName is the file name used for the beacon chain node logs.
var BootNodeLogFileName = "bootnode.log"

// TracingRequestSinkFileName is the file name for writing raw trace requests.
var TracingRequestSinkFileName = "tracing-http-requests.log.gz"

// BeaconNodeLogFileName is the file name used for the beacon chain node logs.
var BeaconNodeLogFileName = "beacon-%d.log"

Expand Down
13 changes: 7 additions & 6 deletions endtoend/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import (

// E2EConfig defines the struct for all configurations needed for E2E testing.
type E2EConfig struct {
BeaconFlags []string
ValidatorFlags []string
EpochsToRun uint64
TestSync bool
TestSlasher bool
TestDeposits bool
UsePprof bool
UsePrysmShValidator bool
UsePprof bool
TestDeposits bool
TestSlasher bool
EpochsToRun uint64
TracingSinkEndpoint string
Evaluators []Evaluator
BeaconFlags []string
ValidatorFlags []string
}

// Evaluator defines the structure of the evaluators used to
Expand Down
16 changes: 16 additions & 0 deletions tools/replay-http/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary")
load("@prysm//tools/go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["main.go"],
importpath = "github.com/prysmaticlabs/prysm/tools/replay-http",
visibility = ["//visibility:private"],
deps = ["@com_github_sirupsen_logrus//:go_default_library"],
)

go_binary(
name = "replay-http",
embed = [":go_default_library"],
visibility = ["//visibility:public"],
)
78 changes: 78 additions & 0 deletions tools/replay-http/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
Tool for replaying http requests from a file of base64 encoded, line-delimited
Go http raw requests. Credits to https://gist.github.com/kasey/c9e663eae5baebbf8fbe548c2b1d961b.
*/
package main

import (
"bufio"
"bytes"
"encoding/base64"
"flag"
"io"
"net/http"
"net/url"
"os"
"path"

log "github.com/sirupsen/logrus"
)

var (
filePath = flag.String("file", "", "file of line-delimited, base64-encoded Go http requests")
endpoint = flag.String("endpoint", "http://localhost:14268/api/traces", "host:port endpoint to make HTTP requests to")
)

func main() {
flag.Parse()
if *filePath == "" {
log.Fatal("Must provide --file")
}

f, err := os.Open(path.Clean(*filePath))
if err != nil {
log.Fatal(err)
}
defer func() {
if err := f.Close(); err != nil {
log.WithError(err).Error("Could not close stdout file")
}
}()
lr := bufio.NewReader(f)
for {
line, err := lr.ReadBytes([]byte("\n")[0])
if err == io.EOF {
os.Exit(0)
}
if err != nil {
log.Fatal(err)
}
line = line[0 : len(line)-1]
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(line)))
_, err = base64.StdEncoding.Decode(decoded, line)
if err != nil {
log.Fatal(err)
}
dbuf := bytes.NewBuffer(decoded)
req, err := http.ReadRequest(bufio.NewReader(dbuf))
if err != nil {
log.Fatal(err)
}
parsed, err := url.Parse(*endpoint)
if err != nil {
log.Fatal(err)
}
req.URL = parsed
req.RequestURI = ""
log.Println(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
respBuf := bytes.NewBuffer(nil)
if err := resp.Write(respBuf); err != nil {
log.Fatal(err)
}
log.Println(respBuf.String())
}
}

0 comments on commit c128086

Please sign in to comment.