From 4265b99cfb42516296d95c63da9b7e02990264af Mon Sep 17 00:00:00 2001 From: Braden Bassingthwaite Date: Tue, 30 Jan 2024 08:38:19 -0600 Subject: [PATCH] Go HTTP Filter: Improve performance (#31987) The current HTTP Go filter utilizes a sync.Map to manage the lifecycle of the requests memory between Go and C++. Envoy instances that have a larger number of workers (>=32), the sync.Map causes contention on the underlying lock and reduces performance. Before this fix, a direct reply benchmark would get about 210,000 req/sec which is considerably lower than 400,000 req/sec (Envoy that uses a direct reply within a route with no Go HTTP filter). The same benchmark with this fix included gets 350,000 req/sec, a 67% increase in performance. The sync.Map is replaced with a []map[*C.httpRequest]*httpRequest which allows each worker to get its own map. This slice is initialized envoyGoFilterNewHttpPluginConfig which now passes along the concurrency value that Envoy was started with which controls the number of workers. The httpRequest that is shared between Envoy and Go Plugin has been updated to pass along the worker_id that is responsible for the request. Since each worker is single threaded, we no longer need a mutex to control access to the map. Fixes #31916 Commit Message: Additional Description: Risk Level: Testing: Docs Changes: Release Notes: Platform Specific Features: Fixes: #31916 Signed-off-by: Braden Bassingthwaite --- contrib/golang/common/go/api/api.h | 2 ++ contrib/golang/filters/http/source/config.cc | 11 ++++++- .../filters/http/source/go/pkg/http/config.go | 2 ++ .../filters/http/source/go/pkg/http/shim.go | 30 ++++++++++++------- .../filters/http/source/golang_filter.cc | 3 ++ .../filters/http/source/golang_filter.h | 12 ++++++-- .../golang/filters/http/test/config_test.cc | 8 +++-- .../http/test/golang_filter_fuzz_test.cc | 2 +- .../filters/http/test/golang_filter_test.cc | 2 +- 9 files changed, 53 insertions(+), 19 deletions(-) diff --git a/contrib/golang/common/go/api/api.h b/contrib/golang/common/go/api/api.h index 504d412d3e14..66d9f5f1cc27 100644 --- a/contrib/golang/common/go/api/api.h +++ b/contrib/golang/common/go/api/api.h @@ -17,6 +17,7 @@ typedef struct { // NOLINT(modernize-use-using) Cstring plugin_name; uint64_t configId; int phase; + uint32_t worker_id; } httpRequest; typedef struct { // NOLINT(modernize-use-using) @@ -25,6 +26,7 @@ typedef struct { // NOLINT(modernize-use-using) uint64_t config_ptr; uint64_t config_len; int is_route_config; + uint32_t concurrency; } httpConfig; typedef enum { // NOLINT(modernize-use-using) diff --git a/contrib/golang/filters/http/source/config.cc b/contrib/golang/filters/http/source/config.cc index 6bd9a5bf77c1..91d7eddc32c7 100644 --- a/contrib/golang/filters/http/source/config.cc +++ b/contrib/golang/filters/http/source/config.cc @@ -1,5 +1,7 @@ #include "contrib/golang/filters/http/source/config.h" +#include + #include "envoy/registry/registry.h" #include "source/common/common/fmt.h" @@ -33,7 +35,14 @@ Http::FilterFactoryCb GolangFilterConfig::createFilterFactoryFromProtoTyped( proto_config, dso_lib, fmt::format("{}golang.", stats_prefix), context); config->newGoPluginConfig(); return [config, dso_lib](Http::FilterChainFactoryCallbacks& callbacks) { - auto filter = std::make_shared(config, dso_lib); + const std::string& worker_name = callbacks.dispatcher().name(); + auto pos = worker_name.find_first_of('_'); + ENVOY_BUG(pos != std::string::npos, "worker name is not in expected format worker_{id}"); + uint32_t worker_id; + if (!absl::SimpleAtoi(worker_name.substr(pos + 1), &worker_id)) { + IS_ENVOY_BUG("failed to parse worker id from name"); + } + auto filter = std::make_shared(config, dso_lib, worker_id); callbacks.addStreamFilter(filter); callbacks.addAccessLogHandler(filter); }; diff --git a/contrib/golang/filters/http/source/go/pkg/http/config.go b/contrib/golang/filters/http/source/go/pkg/http/config.go index 99e231e98520..87ad5b7e510f 100644 --- a/contrib/golang/filters/http/source/go/pkg/http/config.go +++ b/contrib/golang/filters/http/source/go/pkg/http/config.go @@ -73,6 +73,8 @@ func envoyGoFilterNewHttpPluginConfig(c *C.httpConfig) uint64 { var any anypb.Any proto.Unmarshal(buf, &any) + Requests.initialize(uint32(c.concurrency)) + configNum := atomic.AddUint64(&configNumGenerator, 1) name := utils.BytesToString(uint64(c.plugin_name_ptr), uint64(c.plugin_name_len)) diff --git a/contrib/golang/filters/http/source/go/pkg/http/shim.go b/contrib/golang/filters/http/source/go/pkg/http/shim.go index f77db4770b1f..44767a4c1a79 100644 --- a/contrib/golang/filters/http/source/go/pkg/http/shim.go +++ b/contrib/golang/filters/http/source/go/pkg/http/shim.go @@ -46,32 +46,40 @@ var ErrDupRequestKey = errors.New("dup request key") var Requests = &requestMap{} type requestMap struct { - m sync.Map // *C.httpRequest -> *httpRequest + initOnce sync.Once + requests []map[*C.httpRequest]*httpRequest +} + +func (f *requestMap) initialize(concurrency uint32) { + f.initOnce.Do(func() { + f.requests = make([]map[*C.httpRequest]*httpRequest, concurrency) + for i := uint32(0); i < concurrency; i++ { + f.requests[i] = map[*C.httpRequest]*httpRequest{} + } + }) } func (f *requestMap) StoreReq(key *C.httpRequest, req *httpRequest) error { - if _, loaded := f.m.LoadOrStore(key, req); loaded { + m := f.requests[key.worker_id] + if _, ok := m[key]; ok { return ErrDupRequestKey } + m[key] = req return nil } func (f *requestMap) GetReq(key *C.httpRequest) *httpRequest { - if v, ok := f.m.Load(key); ok { - return v.(*httpRequest) - } - return nil + return f.requests[key.worker_id][key] } func (f *requestMap) DeleteReq(key *C.httpRequest) { - f.m.Delete(key) + delete(f.requests[key.worker_id], key) } func (f *requestMap) Clear() { - f.m.Range(func(key, _ interface{}) bool { - f.m.Delete(key) - return true - }) + for idx := range f.requests { + f.requests[idx] = map[*C.httpRequest]*httpRequest{} + } } func requestFinalize(r *httpRequest) { diff --git a/contrib/golang/filters/http/source/golang_filter.cc b/contrib/golang/filters/http/source/golang_filter.cc index 306336fc6000..edfab63a4e9c 100644 --- a/contrib/golang/filters/http/source/golang_filter.cc +++ b/contrib/golang/filters/http/source/golang_filter.cc @@ -1458,6 +1458,7 @@ void Filter::initRequest(ProcessorState& state) { req_->configId = getMergedConfigId(state); req_->plugin_name.data = config_->pluginName().data(); req_->plugin_name.len = config_->pluginName().length(); + req_->worker_id = worker_id_; } /* ConfigId */ @@ -1491,6 +1492,7 @@ FilterConfig::FilterConfig( Server::Configuration::FactoryContext& context) : plugin_name_(proto_config.plugin_name()), so_id_(proto_config.library_id()), so_path_(proto_config.library_path()), plugin_config_(proto_config.plugin_config()), + concurrency_(context.serverFactoryContext().options().concurrency()), stats_(GolangFilterStats::generateStats(stats_prefix, context.scope())), dso_lib_(dso_lib), metric_store_(std::make_shared(context.scope().createScope(""))){}; @@ -1508,6 +1510,7 @@ void FilterConfig::newGoPluginConfig() { config_->config_ptr = buf_ptr; config_->config_len = buf.length(); config_->is_route_config = 0; + config_->concurrency = concurrency_; config_id_ = dso_lib_->envoyGoFilterNewHttpPluginConfig(config_); diff --git a/contrib/golang/filters/http/source/golang_filter.h b/contrib/golang/filters/http/source/golang_filter.h index cb342c437327..cd90766e1336 100644 --- a/contrib/golang/filters/http/source/golang_filter.h +++ b/contrib/golang/filters/http/source/golang_filter.h @@ -85,6 +85,7 @@ class FilterConfig : public std::enable_shared_from_this, const std::string so_id_; const std::string so_path_; const ProtobufWkt::Any plugin_config_; + uint32_t concurrency_; GolangFilterStats stats_; @@ -170,9 +171,10 @@ class Filter : public Http::StreamFilter, Logger::Loggable, public AccessLog::Instance { public: - explicit Filter(FilterConfigSharedPtr config, Dso::HttpFilterDsoPtr dynamic_lib) - : config_(config), dynamic_lib_(dynamic_lib), decoding_state_(*this), encoding_state_(*this) { - } + explicit Filter(FilterConfigSharedPtr config, Dso::HttpFilterDsoPtr dynamic_lib, + uint32_t worker_id) + : config_(config), dynamic_lib_(dynamic_lib), decoding_state_(*this), encoding_state_(*this), + worker_id_(worker_id) {} // Http::StreamFilterBase void onDestroy() ABSL_LOCKS_EXCLUDED(mutex_) override; @@ -314,6 +316,10 @@ class Filter : public Http::StreamFilter, // the filter enter encoding phase bool enter_encoding_{false}; + + // The ID of the worker that is processing this request, this enables the go filter to dedicate + // memory to each worker and not require locks + uint32_t worker_id_ = 0; }; // Go code only touch the fields in httpRequest diff --git a/contrib/golang/filters/http/test/config_test.cc b/contrib/golang/filters/http/test/config_test.cc index 6bef239ec22d..315f815c8c42 100644 --- a/contrib/golang/filters/http/test/config_test.cc +++ b/contrib/golang/filters/http/test/config_test.cc @@ -59,7 +59,9 @@ TEST(GolangFilterConfigTest, GolangFilterWithValidConfig) { GolangFilterConfig factory; Http::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, "stats", context).value(); - Http::MockFilterChainFactoryCallbacks filter_callback; + NiceMock filter_callback; + NiceMock dispatcher{"worker_0"}; + ON_CALL(filter_callback, dispatcher()).WillByDefault(ReturnRef(dispatcher)); EXPECT_CALL(filter_callback, addStreamFilter(_)); EXPECT_CALL(filter_callback, addAccessLogHandler(_)); auto plugin_config = proto_config.plugin_config(); @@ -83,7 +85,9 @@ TEST(GolangFilterConfigTest, GolangFilterWithNilPluginConfig) { GolangFilterConfig factory; Http::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, "stats", context).value(); - Http::MockFilterChainFactoryCallbacks filter_callback; + NiceMock filter_callback; + NiceMock dispatcher{"worker_0"}; + ON_CALL(filter_callback, dispatcher()).WillByDefault(ReturnRef(dispatcher)); EXPECT_CALL(filter_callback, addStreamFilter(_)); EXPECT_CALL(filter_callback, addAccessLogHandler(_)); auto plugin_config = proto_config.plugin_config(); diff --git a/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc b/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc index e16ca408fa24..6773ec59a174 100644 --- a/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc +++ b/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc @@ -79,7 +79,7 @@ DEFINE_PROTO_FUZZER(const envoy::extensions::filters::http::golang::GolangFilter // Prepare filter. NiceMock context; FilterConfigSharedPtr config = std::make_shared(proto_config, dso_lib, "", context); - std::unique_ptr filter = std::make_unique(config, dso_lib); + std::unique_ptr filter = std::make_unique(config, dso_lib, 0); filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); filter->setEncoderFilterCallbacks(mocks.encoder_callbacks_); diff --git a/contrib/golang/filters/http/test/golang_filter_test.cc b/contrib/golang/filters/http/test/golang_filter_test.cc index f8bbbb05ee6e..a5e785edd40c 100644 --- a/contrib/golang/filters/http/test/golang_filter_test.cc +++ b/contrib/golang/filters/http/test/golang_filter_test.cc @@ -131,7 +131,7 @@ class GolangHttpFilterTest : public testing::Test { test_time.setSystemTime(std::chrono::microseconds(1583879145572237)); filter_ = std::make_unique( - config_, Dso::DsoManager::getDsoByPluginName(plugin_name)); + config_, Dso::DsoManager::getDsoByPluginName(plugin_name), 0); filter_->setDecoderFilterCallbacks(decoder_callbacks_); filter_->setEncoderFilterCallbacks(encoder_callbacks_); }