diff --git a/contrib/golang/common/go/api/api.h b/contrib/golang/common/go/api/api.h index 504d412d3e14..66d9f5f1cc27 100644 --- a/contrib/golang/common/go/api/api.h +++ b/contrib/golang/common/go/api/api.h @@ -17,6 +17,7 @@ typedef struct { // NOLINT(modernize-use-using) Cstring plugin_name; uint64_t configId; int phase; + uint32_t worker_id; } httpRequest; typedef struct { // NOLINT(modernize-use-using) @@ -25,6 +26,7 @@ typedef struct { // NOLINT(modernize-use-using) uint64_t config_ptr; uint64_t config_len; int is_route_config; + uint32_t concurrency; } httpConfig; typedef enum { // NOLINT(modernize-use-using) diff --git a/contrib/golang/filters/http/source/config.cc b/contrib/golang/filters/http/source/config.cc index 6bd9a5bf77c1..91d7eddc32c7 100644 --- a/contrib/golang/filters/http/source/config.cc +++ b/contrib/golang/filters/http/source/config.cc @@ -1,5 +1,7 @@ #include "contrib/golang/filters/http/source/config.h" +#include + #include "envoy/registry/registry.h" #include "source/common/common/fmt.h" @@ -33,7 +35,14 @@ Http::FilterFactoryCb GolangFilterConfig::createFilterFactoryFromProtoTyped( proto_config, dso_lib, fmt::format("{}golang.", stats_prefix), context); config->newGoPluginConfig(); return [config, dso_lib](Http::FilterChainFactoryCallbacks& callbacks) { - auto filter = std::make_shared(config, dso_lib); + const std::string& worker_name = callbacks.dispatcher().name(); + auto pos = worker_name.find_first_of('_'); + ENVOY_BUG(pos != std::string::npos, "worker name is not in expected format worker_{id}"); + uint32_t worker_id; + if (!absl::SimpleAtoi(worker_name.substr(pos + 1), &worker_id)) { + IS_ENVOY_BUG("failed to parse worker id from name"); + } + auto filter = std::make_shared(config, dso_lib, worker_id); callbacks.addStreamFilter(filter); callbacks.addAccessLogHandler(filter); }; diff --git a/contrib/golang/filters/http/source/go/pkg/http/config.go b/contrib/golang/filters/http/source/go/pkg/http/config.go index 99e231e98520..87ad5b7e510f 100644 --- a/contrib/golang/filters/http/source/go/pkg/http/config.go +++ b/contrib/golang/filters/http/source/go/pkg/http/config.go @@ -73,6 +73,8 @@ func envoyGoFilterNewHttpPluginConfig(c *C.httpConfig) uint64 { var any anypb.Any proto.Unmarshal(buf, &any) + Requests.initialize(uint32(c.concurrency)) + configNum := atomic.AddUint64(&configNumGenerator, 1) name := utils.BytesToString(uint64(c.plugin_name_ptr), uint64(c.plugin_name_len)) diff --git a/contrib/golang/filters/http/source/go/pkg/http/shim.go b/contrib/golang/filters/http/source/go/pkg/http/shim.go index f77db4770b1f..44767a4c1a79 100644 --- a/contrib/golang/filters/http/source/go/pkg/http/shim.go +++ b/contrib/golang/filters/http/source/go/pkg/http/shim.go @@ -46,32 +46,40 @@ var ErrDupRequestKey = errors.New("dup request key") var Requests = &requestMap{} type requestMap struct { - m sync.Map // *C.httpRequest -> *httpRequest + initOnce sync.Once + requests []map[*C.httpRequest]*httpRequest +} + +func (f *requestMap) initialize(concurrency uint32) { + f.initOnce.Do(func() { + f.requests = make([]map[*C.httpRequest]*httpRequest, concurrency) + for i := uint32(0); i < concurrency; i++ { + f.requests[i] = map[*C.httpRequest]*httpRequest{} + } + }) } func (f *requestMap) StoreReq(key *C.httpRequest, req *httpRequest) error { - if _, loaded := f.m.LoadOrStore(key, req); loaded { + m := f.requests[key.worker_id] + if _, ok := m[key]; ok { return ErrDupRequestKey } + m[key] = req return nil } func (f *requestMap) GetReq(key *C.httpRequest) *httpRequest { - if v, ok := f.m.Load(key); ok { - return v.(*httpRequest) - } - return nil + return f.requests[key.worker_id][key] } func (f *requestMap) DeleteReq(key *C.httpRequest) { - f.m.Delete(key) + delete(f.requests[key.worker_id], key) } func (f *requestMap) Clear() { - f.m.Range(func(key, _ interface{}) bool { - f.m.Delete(key) - return true - }) + for idx := range f.requests { + f.requests[idx] = map[*C.httpRequest]*httpRequest{} + } } func requestFinalize(r *httpRequest) { diff --git a/contrib/golang/filters/http/source/golang_filter.cc b/contrib/golang/filters/http/source/golang_filter.cc index 306336fc6000..edfab63a4e9c 100644 --- a/contrib/golang/filters/http/source/golang_filter.cc +++ b/contrib/golang/filters/http/source/golang_filter.cc @@ -1458,6 +1458,7 @@ void Filter::initRequest(ProcessorState& state) { req_->configId = getMergedConfigId(state); req_->plugin_name.data = config_->pluginName().data(); req_->plugin_name.len = config_->pluginName().length(); + req_->worker_id = worker_id_; } /* ConfigId */ @@ -1491,6 +1492,7 @@ FilterConfig::FilterConfig( Server::Configuration::FactoryContext& context) : plugin_name_(proto_config.plugin_name()), so_id_(proto_config.library_id()), so_path_(proto_config.library_path()), plugin_config_(proto_config.plugin_config()), + concurrency_(context.serverFactoryContext().options().concurrency()), stats_(GolangFilterStats::generateStats(stats_prefix, context.scope())), dso_lib_(dso_lib), metric_store_(std::make_shared(context.scope().createScope(""))){}; @@ -1508,6 +1510,7 @@ void FilterConfig::newGoPluginConfig() { config_->config_ptr = buf_ptr; config_->config_len = buf.length(); config_->is_route_config = 0; + config_->concurrency = concurrency_; config_id_ = dso_lib_->envoyGoFilterNewHttpPluginConfig(config_); diff --git a/contrib/golang/filters/http/source/golang_filter.h b/contrib/golang/filters/http/source/golang_filter.h index cb342c437327..cd90766e1336 100644 --- a/contrib/golang/filters/http/source/golang_filter.h +++ b/contrib/golang/filters/http/source/golang_filter.h @@ -85,6 +85,7 @@ class FilterConfig : public std::enable_shared_from_this, const std::string so_id_; const std::string so_path_; const ProtobufWkt::Any plugin_config_; + uint32_t concurrency_; GolangFilterStats stats_; @@ -170,9 +171,10 @@ class Filter : public Http::StreamFilter, Logger::Loggable, public AccessLog::Instance { public: - explicit Filter(FilterConfigSharedPtr config, Dso::HttpFilterDsoPtr dynamic_lib) - : config_(config), dynamic_lib_(dynamic_lib), decoding_state_(*this), encoding_state_(*this) { - } + explicit Filter(FilterConfigSharedPtr config, Dso::HttpFilterDsoPtr dynamic_lib, + uint32_t worker_id) + : config_(config), dynamic_lib_(dynamic_lib), decoding_state_(*this), encoding_state_(*this), + worker_id_(worker_id) {} // Http::StreamFilterBase void onDestroy() ABSL_LOCKS_EXCLUDED(mutex_) override; @@ -314,6 +316,10 @@ class Filter : public Http::StreamFilter, // the filter enter encoding phase bool enter_encoding_{false}; + + // The ID of the worker that is processing this request, this enables the go filter to dedicate + // memory to each worker and not require locks + uint32_t worker_id_ = 0; }; // Go code only touch the fields in httpRequest diff --git a/contrib/golang/filters/http/test/config_test.cc b/contrib/golang/filters/http/test/config_test.cc index 6bef239ec22d..315f815c8c42 100644 --- a/contrib/golang/filters/http/test/config_test.cc +++ b/contrib/golang/filters/http/test/config_test.cc @@ -59,7 +59,9 @@ TEST(GolangFilterConfigTest, GolangFilterWithValidConfig) { GolangFilterConfig factory; Http::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, "stats", context).value(); - Http::MockFilterChainFactoryCallbacks filter_callback; + NiceMock filter_callback; + NiceMock dispatcher{"worker_0"}; + ON_CALL(filter_callback, dispatcher()).WillByDefault(ReturnRef(dispatcher)); EXPECT_CALL(filter_callback, addStreamFilter(_)); EXPECT_CALL(filter_callback, addAccessLogHandler(_)); auto plugin_config = proto_config.plugin_config(); @@ -83,7 +85,9 @@ TEST(GolangFilterConfigTest, GolangFilterWithNilPluginConfig) { GolangFilterConfig factory; Http::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, "stats", context).value(); - Http::MockFilterChainFactoryCallbacks filter_callback; + NiceMock filter_callback; + NiceMock dispatcher{"worker_0"}; + ON_CALL(filter_callback, dispatcher()).WillByDefault(ReturnRef(dispatcher)); EXPECT_CALL(filter_callback, addStreamFilter(_)); EXPECT_CALL(filter_callback, addAccessLogHandler(_)); auto plugin_config = proto_config.plugin_config(); diff --git a/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc b/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc index e16ca408fa24..6773ec59a174 100644 --- a/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc +++ b/contrib/golang/filters/http/test/golang_filter_fuzz_test.cc @@ -79,7 +79,7 @@ DEFINE_PROTO_FUZZER(const envoy::extensions::filters::http::golang::GolangFilter // Prepare filter. NiceMock context; FilterConfigSharedPtr config = std::make_shared(proto_config, dso_lib, "", context); - std::unique_ptr filter = std::make_unique(config, dso_lib); + std::unique_ptr filter = std::make_unique(config, dso_lib, 0); filter->setDecoderFilterCallbacks(mocks.decoder_callbacks_); filter->setEncoderFilterCallbacks(mocks.encoder_callbacks_); diff --git a/contrib/golang/filters/http/test/golang_filter_test.cc b/contrib/golang/filters/http/test/golang_filter_test.cc index f8bbbb05ee6e..a5e785edd40c 100644 --- a/contrib/golang/filters/http/test/golang_filter_test.cc +++ b/contrib/golang/filters/http/test/golang_filter_test.cc @@ -131,7 +131,7 @@ class GolangHttpFilterTest : public testing::Test { test_time.setSystemTime(std::chrono::microseconds(1583879145572237)); filter_ = std::make_unique( - config_, Dso::DsoManager::getDsoByPluginName(plugin_name)); + config_, Dso::DsoManager::getDsoByPluginName(plugin_name), 0); filter_->setDecoderFilterCallbacks(decoder_callbacks_); filter_->setEncoderFilterCallbacks(encoder_callbacks_); }