Skip to content

Commit 9b8f095

Browse files
committed
feat grpc: grpc client subscribe on config update
commit_hash:67bc5c92974883cf587f2c449bbf4727e19560b1
1 parent b77bb86 commit 9b8f095

File tree

17 files changed

+247
-128
lines changed

17 files changed

+247
-128
lines changed

.mapping.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1927,6 +1927,8 @@
19271927
"grpc/include/userver/ugrpc/client/impl/codegen_declarations.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/codegen_declarations.hpp",
19281928
"grpc/include/userver/ugrpc/client/impl/codegen_definitions.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/codegen_definitions.hpp",
19291929
"grpc/include/userver/ugrpc/client/impl/completion_queue_pool.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/completion_queue_pool.hpp",
1930+
"grpc/include/userver/ugrpc/client/impl/stub_any.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/stub_any.hpp",
1931+
"grpc/include/userver/ugrpc/client/impl/stub_pool.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/impl/stub_pool.hpp",
19301932
"grpc/include/userver/ugrpc/client/middlewares/baggage/component.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/middlewares/baggage/component.hpp",
19311933
"grpc/include/userver/ugrpc/client/middlewares/base.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/middlewares/base.hpp",
19321934
"grpc/include/userver/ugrpc/client/middlewares/deadline_propagation/component.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/middlewares/deadline_propagation/component.hpp",
@@ -2023,6 +2025,7 @@
20232025
"grpc/src/ugrpc/client/impl/client_factory_config.hpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/client_factory_config.hpp",
20242026
"grpc/src/ugrpc/client/impl/client_qos.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/client_qos.cpp",
20252027
"grpc/src/ugrpc/client/impl/completion_queue_pool.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/completion_queue_pool.cpp",
2028+
"grpc/src/ugrpc/client/impl/stub_pool.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/impl/stub_pool.cpp",
20262029
"grpc/src/ugrpc/client/middlewares/baggage/component.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/middlewares/baggage/component.cpp",
20272030
"grpc/src/ugrpc/client/middlewares/baggage/middleware.cpp":"taxi/uservices/userver/grpc/src/ugrpc/client/middlewares/baggage/middleware.cpp",
20282031
"grpc/src/ugrpc/client/middlewares/baggage/middleware.hpp":"taxi/uservices/userver/grpc/src/ugrpc/client/middlewares/baggage/middleware.hpp",

grpc/functional_tests/basic_chaos/client.hpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@ class GreeterClient final : public components::ComponentBase {
3131
// Tests dedicated-channel-count from SimpleClientComponent
3232
auto& client =
3333
context.FindComponent<ugrpc::client::SimpleClientComponent<Client>>("greeter-client-component").GetClient();
34-
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(0) == 3);
35-
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(1) == 0);
36-
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(2) == 2);
37-
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(3) == 0);
38-
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(4) == 0);
34+
const auto& data = ugrpc::client::impl::GetClientData(client);
35+
const auto stub_state = data.GetStubState();
36+
UASSERT(stub_state->dedicated_stubs[0].Size() == 3);
37+
UASSERT(stub_state->dedicated_stubs[1].Size() == 0);
38+
UASSERT(stub_state->dedicated_stubs[2].Size() == 2);
39+
UASSERT(stub_state->dedicated_stubs[3].Size() == 0);
40+
UASSERT(stub_state->dedicated_stubs[4].Size() == 0);
3941
}
4042

4143
inline std::string SayHello(std::string name, bool is_small_timeout);

grpc/include/userver/ugrpc/client/impl/async_methods.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ class RpcData final {
7474
RpcData& operator=(RpcData&&) noexcept = delete;
7575
~RpcData();
7676

77+
ClientData::StubHandle& GetStub() noexcept;
78+
7779
const grpc::ClientContext& GetContext() const noexcept;
7880

7981
grpc::ClientContext& GetContext() noexcept;
@@ -154,6 +156,7 @@ class RpcData final {
154156
};
155157

156158
private:
159+
ClientData::StubHandle stub_;
157160
std::unique_ptr<grpc::ClientContext> context_;
158161
std::string client_name_;
159162
ugrpc::impl::MaybeOwnedString call_name_;

grpc/include/userver/ugrpc/client/impl/call_params.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ struct CallParams {
2323
grpc::CompletionQueue& queue;
2424
dynamic_config::Snapshot config;
2525
ugrpc::impl::MaybeOwnedString call_name;
26+
ClientData::StubHandle stub;
2627
std::unique_ptr<grpc::ClientContext> context;
2728
ugrpc::impl::MethodStatistics& statistics;
2829
const Middlewares& mws;

grpc/include/userver/ugrpc/client/impl/client_data.hpp

Lines changed: 81 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@
99

1010
#include <userver/dynamic_config/source.hpp>
1111
#include <userver/engine/task/task_processor_fwd.hpp>
12+
#include <userver/rcu/rcu.hpp>
1213
#include <userver/testsuite/grpc_control.hpp>
1314
#include <userver/utils/fixed_array.hpp>
1415

1516
#include <userver/ugrpc/client/client_factory_settings.hpp>
1617
#include <userver/ugrpc/client/client_settings.hpp>
1718
#include <userver/ugrpc/client/fwd.hpp>
1819
#include <userver/ugrpc/client/impl/channel_factory.hpp>
20+
#include <userver/ugrpc/client/impl/stub_any.hpp>
21+
#include <userver/ugrpc/client/impl/stub_pool.hpp>
1922
#include <userver/ugrpc/client/middlewares/fwd.hpp>
2023
#include <userver/ugrpc/impl/static_service_metadata.hpp>
2124
#include <userver/ugrpc/impl/statistics.hpp>
@@ -36,7 +39,7 @@ struct ClientDependencies final {
3639
Middlewares mws;
3740
ugrpc::impl::CompletionQueuePoolBase& completion_queues;
3841
ugrpc::impl::StatisticsStorage& statistics_storage;
39-
const dynamic_config::Source config_source;
42+
dynamic_config::Source config_source;
4043
testsuite::GrpcControl& testsuite_grpc;
4144
const dynamic_config::Key<ClientQos>* qos{nullptr};
4245
const ClientFactorySettings& client_factory_settings;
@@ -51,8 +54,31 @@ struct GenericClientTag final {
5154
/// The internal state of generated gRPC clients
5255
class ClientData final {
5356
public:
54-
template <typename Service>
55-
using Stub = typename Service::Stub;
57+
struct StubState {
58+
StubPool stubs;
59+
// method_id -> stub_pool
60+
utils::FixedArray<StubPool> dedicated_stubs;
61+
};
62+
63+
class StubHandle {
64+
public:
65+
StubHandle(rcu::ReadablePtr<StubState>&& state, StubAny& stub) : state_{std::move(state)}, stub_{stub} {}
66+
67+
StubHandle(StubHandle&&) noexcept = default;
68+
StubHandle& operator=(StubHandle&&) = delete;
69+
70+
StubHandle(const StubHandle&) = delete;
71+
StubHandle& operator=(const StubHandle&) = delete;
72+
73+
template <typename Stub>
74+
Stub& Get() {
75+
return StubCast<Stub>(stub_);
76+
}
77+
78+
private:
79+
rcu::ReadablePtr<StubState> state_;
80+
StubAny& stub_;
81+
};
5682

5783
ClientData() = delete;
5884

@@ -62,36 +88,42 @@ class ClientData final {
6288
metadata_(metadata),
6389
service_statistics_(&GetServiceStatistics()),
6490
channel_factory_(CreateChannelFactory(dependencies_)),
65-
channels_(CreateChannels(channel_factory_, dependencies_.client_factory_settings.channel_count)),
66-
stubs_(MakeStubs<Service>(channels_)),
67-
dedicated_stubs_(
68-
MakeDedicatedStubs<Service>(channel_factory_, *metadata_, dependencies_.dedicated_methods_config)
69-
) {}
91+
stub_state_(std::make_unique<rcu::Variable<StubState>>()) {
92+
if (dependencies_.qos) {
93+
SubscribeOnConfigUpdate<Service>(*dependencies_.qos);
94+
} else {
95+
ConstructStubState<typename Service::Stub>();
96+
}
97+
}
7098

7199
template <typename Service>
72100
ClientData(ClientDependencies&& dependencies, GenericClientTag, std::in_place_type_t<Service>)
73101
: dependencies_(std::move(dependencies)),
74102
channel_factory_(CreateChannelFactory(dependencies_)),
75-
channels_(CreateChannels(channel_factory_, dependencies_.client_factory_settings.channel_count)),
76-
stubs_(MakeStubs<Service>(channels_)) {}
103+
stub_state_(std::make_unique<rcu::Variable<StubState>>()) {
104+
ConstructStubState<typename Service::Stub>();
105+
}
106+
107+
~ClientData();
77108

78109
ClientData(ClientData&&) noexcept = default;
79110
ClientData& operator=(ClientData&&) = delete;
80111

81112
ClientData(const ClientData&) = delete;
82113
ClientData& operator=(const ClientData&) = delete;
83114

84-
template <typename Service>
85-
Stub<Service>& NextStubFromMethodId(std::size_t method_id) const {
86-
if (!dedicated_stubs_[method_id].empty()) {
87-
return *static_cast<Stub<Service>*>(NextStubPtr(dedicated_stubs_[method_id]).get());
88-
}
89-
return NextGenericStub<Service>();
115+
StubHandle NextStubFromMethodId(std::size_t method_id) const {
116+
auto stub_state = stub_state_->Read();
117+
auto& dedicated_stubs = stub_state->dedicated_stubs[method_id];
118+
auto& stubs = dedicated_stubs.Size() ? dedicated_stubs : stub_state->stubs;
119+
auto& stub = stubs.NextStub();
120+
return StubHandle{std::move(stub_state), stub};
90121
}
91122

92-
template <typename Service>
93-
Stub<Service>& NextGenericStub() const {
94-
return *static_cast<Stub<Service>*>(NextStubPtr(stubs_).get());
123+
StubHandle NextStub() const {
124+
auto stub_state = stub_state_->Read();
125+
auto& stub = stub_state->stubs.NextStub();
126+
return StubHandle{std::move(stub_state), stub};
95127
}
96128

97129
grpc::CompletionQueue& NextQueue() const;
@@ -102,8 +134,6 @@ class ClientData final {
102134

103135
ugrpc::impl::MethodStatistics& GetGenericStatistics(std::string_view call_name) const;
104136

105-
const utils::FixedArray<std::shared_ptr<grpc::Channel>>& GetChannels() { return channels_; }
106-
107137
std::string_view GetClientName() const { return dependencies_.client_name; }
108138

109139
const Middlewares& GetMiddlewares() const { return dependencies_.mws; }
@@ -114,37 +144,12 @@ class ClientData final {
114144

115145
const dynamic_config::Key<ClientQos>* GetClientQos() const;
116146

117-
std::size_t GetDedicatedChannelCount(std::size_t method_id) const;
147+
rcu::ReadablePtr<StubState> GetStubState() const { return stub_state_->Read(); }
118148

119149
private:
120150
static ChannelFactory CreateChannelFactory(const ClientDependencies& dependencies);
121151

122-
static utils::FixedArray<std::shared_ptr<grpc::Channel>>
123-
CreateChannels(const ChannelFactory& channel_factory, std::size_t channel_count);
124-
125-
using StubDeleterType = void (*)(void*);
126-
using StubPtr = std::unique_ptr<void, StubDeleterType>;
127-
128-
using StubPool = utils::FixedArray<StubPtr>;
129-
130-
template <typename Service>
131-
static void StubDeleter(void* ptr) noexcept {
132-
delete static_cast<Stub<Service>*>(ptr);
133-
}
134-
135-
template <typename Service>
136-
static StubPtr MakeStub(const std::shared_ptr<grpc::Channel>& channel) {
137-
return StubPtr(Service::NewStub(channel).release(), &StubDeleter<Service>);
138-
}
139-
140-
template <typename Service>
141-
static StubPool MakeStubs(const utils::FixedArray<std::shared_ptr<grpc::Channel>>& channels) {
142-
return utils::GenerateFixedArray(channels.size(), [&](std::size_t index) {
143-
return MakeStub<Service>(channels[index]);
144-
});
145-
}
146-
147-
template <typename Service>
152+
template <typename Stub>
148153
static utils::FixedArray<StubPool> MakeDedicatedStubs(
149154
const ChannelFactory& channel_factory,
150155
const ugrpc::impl::StaticServiceMetadata& metadata,
@@ -153,28 +158,45 @@ class ClientData final {
153158
return utils::GenerateFixedArray(GetMethodsCount(metadata), [&](std::size_t method_id) {
154159
const auto method_channel_count =
155160
GetMethodChannelCount(dedicated_methods_config, GetMethodName(metadata, method_id));
156-
return utils::GenerateFixedArray(method_channel_count, [&](std::size_t) {
157-
const auto channel = channel_factory.CreateChannel();
158-
return MakeStub<Service>(channel);
159-
});
161+
return StubPool::Create<Stub>(method_channel_count, channel_factory);
160162
});
161163
}
162164

163-
const StubPtr& NextStubPtr(const StubPool& stubs) const;
164-
165165
ugrpc::impl::ServiceStatistics& GetServiceStatistics();
166166

167+
template <typename Service>
168+
void SubscribeOnConfigUpdate(const dynamic_config::Key<ClientQos>& qos) {
169+
config_subscription_ = dependencies_.config_source.UpdateAndListen(
170+
this, dependencies_.client_name, &ClientData::OnConfigUpdate<Service>, qos
171+
);
172+
}
173+
174+
template <typename Service>
175+
void OnConfigUpdate(const dynamic_config::Snapshot& /*config*/) {
176+
ConstructStubState<typename Service::Stub>();
177+
}
178+
179+
template <typename Stub>
180+
void ConstructStubState() {
181+
auto stubs = StubPool::Create<Stub>(dependencies_.client_factory_settings.channel_count, channel_factory_);
182+
183+
auto dedicated_stubs =
184+
metadata_.has_value()
185+
? MakeDedicatedStubs<Stub>(channel_factory_, *metadata_, dependencies_.dedicated_methods_config)
186+
: utils::FixedArray<StubPool>{};
187+
188+
stub_state_->Assign({std::move(stubs), std::move(dedicated_stubs)});
189+
}
190+
167191
ClientDependencies dependencies_;
168192
std::optional<ugrpc::impl::StaticServiceMetadata> metadata_{std::nullopt};
169193
ugrpc::impl::ServiceStatistics* service_statistics_{nullptr};
170194

171195
ChannelFactory channel_factory_;
196+
std::unique_ptr<rcu::Variable<StubState>> stub_state_;
172197

173-
utils::FixedArray<std::shared_ptr<grpc::Channel>> channels_;
174-
175-
StubPool stubs_;
176-
// method_id -> stub_pool
177-
utils::FixedArray<StubPool> dedicated_stubs_;
198+
// These fields must be the last ones
199+
concurrent::AsyncEventSubscriberScope config_subscription_;
178200
};
179201

180202
template <typename Client>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#pragma once
2+
3+
#include <grpcpp/channel.h>
4+
#include <grpcpp/generic/generic_stub.h>
5+
6+
#include <userver/utils/any_movable.hpp>
7+
8+
USERVER_NAMESPACE_BEGIN
9+
10+
namespace ugrpc::client::impl {
11+
12+
using StubAny = utils::AnyMovable;
13+
14+
template <typename Stub>
15+
StubAny MakeStub(const std::shared_ptr<grpc::Channel>& channel) {
16+
if constexpr (std::is_same_v<grpc::GenericStub, Stub>) {
17+
return StubAny{std::in_place_type<grpc::GenericStub>, std::shared_ptr<grpc::Channel>{channel}};
18+
}
19+
return StubAny{std::in_place_type<Stub>, channel};
20+
}
21+
22+
template <typename Stub>
23+
Stub& StubCast(StubAny& stub) {
24+
return utils::AnyCast<Stub&>(stub);
25+
}
26+
27+
} // namespace ugrpc::client::impl
28+
29+
USERVER_NAMESPACE_END
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#pragma once
2+
3+
#include <grpcpp/channel.h>
4+
5+
#include <userver/utils/fixed_array.hpp>
6+
7+
#include <userver/ugrpc/client/impl/channel_factory.hpp>
8+
#include <userver/ugrpc/client/impl/stub_any.hpp>
9+
10+
USERVER_NAMESPACE_BEGIN
11+
12+
namespace ugrpc::client::impl {
13+
14+
class StubPool final {
15+
public:
16+
StubPool() = default;
17+
18+
std::size_t Size() const { return stubs_.size(); }
19+
20+
StubAny& NextStub() const;
21+
22+
const utils::FixedArray<std::shared_ptr<grpc::Channel>>& GetChannels() const { return channels_; }
23+
24+
const utils::FixedArray<StubAny>& GetStubs() const { return stubs_; }
25+
26+
template <typename Stub>
27+
static StubPool Create(std::size_t size, const ChannelFactory& channel_factory) {
28+
auto channels = utils::GenerateFixedArray(size, [&channel_factory](std::size_t) {
29+
return channel_factory.CreateChannel();
30+
});
31+
auto stubs = utils::GenerateFixedArray(channels.size(), [&channels](std::size_t index) {
32+
return MakeStub<Stub>(channels[index]);
33+
});
34+
return StubPool{std::move(channels), std::move(stubs)};
35+
}
36+
37+
private:
38+
StubPool(utils::FixedArray<std::shared_ptr<grpc::Channel>>&& channels, utils::FixedArray<StubAny>&& stubs)
39+
: channels_{std::move(channels)}, stubs_{std::move(stubs)} {}
40+
41+
utils::FixedArray<std::shared_ptr<grpc::Channel>> channels_;
42+
43+
mutable utils::FixedArray<StubAny> stubs_;
44+
};
45+
46+
} // namespace ugrpc::client::impl
47+
48+
USERVER_NAMESPACE_END

0 commit comments

Comments
 (0)