Skip to content

Commit 97d71d1

Browse files
committed
Removes async_append_some.
1 parent 76129bb commit 97d71d1

19 files changed

+405
-185
lines changed

include/boost/redis/config.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,20 @@ struct config {
8888
*/
8989
std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1};
9090

91-
/** @brief Maximum size of a socket read, in bytes.
91+
/** @brief Maximum size of the read-buffer in bytes.
9292
*
9393
* Sets a limit on how much data is allowed to be read into the
9494
* read buffer. It can be used to prevent DDOS.
9595
*/
9696
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
97+
98+
/** @brief read_buffer_append_size
99+
*
100+
* The size by which the read buffer grows when more space is
101+
* needed. There is no need to set this too high because memory is
102+
* reused and the growth will tend to zero.
103+
*/
104+
std::size_t read_buffer_append_size = 4096;
97105
};
98106

99107
} // namespace boost::redis

include/boost/redis/connection.hpp

Lines changed: 9 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -57,56 +57,6 @@
5757
namespace boost::redis {
5858
namespace detail {
5959

60-
template <class AsyncReadStream, class DynamicBuffer>
61-
class append_some_op {
62-
private:
63-
AsyncReadStream& stream_;
64-
DynamicBuffer buf_;
65-
std::size_t size_ = 0;
66-
std::size_t tmp_ = 0;
67-
asio::coroutine coro_{};
68-
69-
public:
70-
append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
71-
: stream_{stream}
72-
, buf_{std::move(buf)}
73-
, size_{size}
74-
{ }
75-
76-
template <class Self>
77-
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
78-
{
79-
BOOST_ASIO_CORO_REENTER(coro_)
80-
{
81-
tmp_ = buf_.size();
82-
buf_.grow(size_);
83-
84-
BOOST_ASIO_CORO_YIELD
85-
stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
86-
if (ec) {
87-
self.complete(ec, 0);
88-
return;
89-
}
90-
91-
buf_.shrink(buf_.size() - tmp_ - n);
92-
self.complete({}, n);
93-
}
94-
}
95-
};
96-
97-
template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
98-
auto async_append_some(
99-
AsyncReadStream& stream,
100-
DynamicBuffer buffer,
101-
std::size_t size,
102-
CompletionToken&& token)
103-
{
104-
return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
105-
append_some_op<AsyncReadStream, DynamicBuffer>{stream, buffer, size},
106-
token,
107-
stream);
108-
}
109-
11060
template <class Executor>
11161
using exec_notifier_type = asio::experimental::channel<
11262
Executor,
@@ -209,33 +159,18 @@ struct writer_op {
209159

210160
template <class Conn>
211161
struct reader_op {
212-
using dyn_buffer_type = asio::dynamic_string_buffer<
213-
char,
214-
std::char_traits<char>,
215-
std::allocator<char>>;
216-
217-
// TODO: Move this to config so the user can fine tune?
218-
static constexpr std::size_t buffer_growth_hint = 4096;
219-
220162
Conn* conn_;
221-
detail::reader_fsm fsm_;
222163

223164
public:
224165
reader_op(Conn& conn) noexcept
225166
: conn_{&conn}
226-
, fsm_{conn.mpx_}
227167
{ }
228168

229169
template <class Self>
230170
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
231171
{
232-
using dyn_buffer_type = asio::dynamic_string_buffer<
233-
char,
234-
std::char_traits<char>,
235-
std::allocator<char>>;
236-
237172
for (;;) {
238-
auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
173+
auto act = conn_->read_fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
239174

240175
conn_->logger_.on_fsm_resume(act);
241176

@@ -245,11 +180,10 @@ struct reader_op {
245180
continue;
246181
case reader_fsm::action::type::needs_more:
247182
case reader_fsm::action::type::append_some:
248-
async_append_some(
249-
conn_->stream_,
250-
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
251-
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
252-
std::move(self));
183+
{
184+
auto const buf = conn_->read_fsm_.get_append_buffer();
185+
conn_->stream_.async_read_some(asio::buffer(buf), std::move(self));
186+
}
253187
return;
254188
case reader_fsm::action::type::notify_push_receiver:
255189
if (conn_->receive_channel_.try_send(ec, act.push_size_)) {
@@ -343,6 +277,7 @@ class run_op {
343277
// If we were successful, run all the connection tasks
344278
if (!ec) {
345279
conn_->mpx_.reset();
280+
conn_->read_fsm_.reset();
346281

347282
// Note: Order is important here because the writer might
348283
// trigger an async_write before the async_hello thereby
@@ -450,6 +385,7 @@ class basic_connection {
450385
, reconnect_timer_{ex}
451386
, receive_channel_{ex, 256}
452387
, health_checker_{ex}
388+
, read_fsm_{mpx_}
453389
, logger_{std::move(lgr)}
454390
{
455391
set_receive_response(ignore);
@@ -553,6 +489,7 @@ class basic_connection {
553489
cfg_ = cfg;
554490
health_checker_.set_config(cfg);
555491
handshaker_.set_config(cfg);
492+
read_fsm_.set_config({cfg_.read_buffer_append_size, cfg_.max_read_size});
556493

557494
return asio::async_compose<CompletionToken, void(system::error_code)>(
558495
detail::run_op<this_type>{this},
@@ -951,6 +888,7 @@ class basic_connection {
951888

952889
config cfg_;
953890
detail::multiplexer mpx_;
891+
detail::reader_fsm read_fsm_;
954892
detail::connection_logger logger_;
955893
};
956894

include/boost/redis/detail/multiplexer.hpp

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
#define BOOST_REDIS_MULTIPLEXER_HPP
99

1010
#include <boost/redis/adapter/adapt.hpp>
11-
#include <boost/redis/adapter/any_adapter.hpp>
12-
#include <boost/redis/config.hpp>
13-
#include <boost/redis/operation.hpp>
11+
#include <boost/redis/detail/read_buffer.hpp>
12+
#include <boost/redis/resp3/node.hpp>
13+
#include <boost/redis/resp3/parser.hpp>
1414
#include <boost/redis/resp3/type.hpp>
1515
#include <boost/redis/usage.hpp>
1616

17-
#include <boost/asio/experimental/channel.hpp>
17+
#include <boost/system.hpp>
1818

1919
#include <algorithm>
2020
#include <deque>
@@ -32,7 +32,8 @@ namespace detail {
3232

3333
using tribool = std::optional<bool>;
3434

35-
struct multiplexer {
35+
class multiplexer {
36+
public:
3637
using adapter_type = std::function<void(resp3::node_view const&, system::error_code&)>;
3738
using pipeline_adapter_type = std::function<
3839
void(std::size_t, resp3::node_view const&, system::error_code&)>;
@@ -127,7 +128,8 @@ struct multiplexer {
127128
// If the tribool contains no value more data is needed, otherwise
128129
// if the value is true the message consumed is a push.
129130
[[nodiscard]]
130-
auto consume_next(system::error_code& ec) -> std::pair<tribool, std::size_t>;
131+
auto consume_next(std::string_view data, system::error_code& ec)
132+
-> std::pair<tribool, std::size_t>;
131133

132134
auto add(std::shared_ptr<elem> const& ptr) -> void;
133135
auto reset() -> void;
@@ -156,18 +158,6 @@ struct multiplexer {
156158
return std::string_view{write_buffer_};
157159
}
158160

159-
[[nodiscard]]
160-
auto get_read_buffer() noexcept -> std::string&
161-
{
162-
return read_buffer_;
163-
}
164-
165-
[[nodiscard]]
166-
auto get_read_buffer() const noexcept -> std::string const&
167-
{
168-
return read_buffer_;
169-
}
170-
171161
// TODO: Change signature to receive an adapter instead of a
172162
// response.
173163
template <class Response>
@@ -191,17 +181,18 @@ struct multiplexer {
191181
[[nodiscard]]
192182
auto is_waiting_response() const noexcept -> bool;
193183

194-
[[nodiscard]]
195-
auto on_finish_parsing(bool is_push) -> std::size_t;
184+
void commit_usage(bool is_push, std::size_t size);
196185

197186
[[nodiscard]]
198-
auto is_next_push() const noexcept -> bool;
187+
auto is_next_push(std::string_view data) const noexcept -> bool;
199188

200189
// Releases the number of requests that have been released.
201190
[[nodiscard]]
202191
auto release_push_requests() -> std::size_t;
203192

204-
std::string read_buffer_;
193+
[[nodiscard]]
194+
tribool consume_next_impl(std::string_view data, system::error_code& ec);
195+
205196
std::string write_buffer_;
206197
std::deque<std::shared_ptr<elem>> reqs_;
207198
resp3::parser parser_{};
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva ([email protected])
2+
*
3+
* Distributed under the Boost Software License, Version 1.0. (See
4+
* accompanying file LICENSE.txt)
5+
*/
6+
7+
#ifndef BOOST_REDIS_READ_BUFFER_HPP
8+
#define BOOST_REDIS_READ_BUFFER_HPP
9+
10+
#include <boost/core/span.hpp>
11+
12+
#include <cstddef>
13+
#include <vector>
14+
#include <string_view>
15+
#include <utility>
16+
17+
namespace boost::redis::detail {
18+
19+
class read_buffer {
20+
public:
21+
using span_type = span<char>;
22+
23+
[[nodiscard]]
24+
system::error_code prepare_append(std::size_t append_size, std::size_t max_buffer_size);
25+
26+
void commit_append(std::size_t read_size);
27+
28+
[[nodiscard]]
29+
auto get_append_buffer() noexcept -> span_type;
30+
31+
[[nodiscard]]
32+
auto get_committed_buffer() const noexcept -> std::string_view;
33+
34+
[[nodiscard]]
35+
auto get_committed_size() const noexcept -> std::size_t;
36+
37+
void clear();
38+
39+
// Consume committed data.
40+
auto consume_committed(std::size_t size) -> std::size_t;
41+
42+
void reserve(std::size_t n);
43+
44+
friend
45+
bool operator==(read_buffer const& lhs, read_buffer const& rhs);
46+
47+
friend
48+
bool operator!=(read_buffer const& lhs, read_buffer const& rhs);
49+
50+
private:
51+
std::vector<char> buffer_;
52+
std::size_t append_buf_begin_ = 0;
53+
};
54+
55+
} // namespace boost::redis::detail
56+
57+
#endif // BOOST_REDIS_READ_BUFFER_HPP

include/boost/redis/detail/reader_fsm.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ namespace boost::redis::detail {
1818

1919
class reader_fsm {
2020
public:
21+
// See config.hpp for the meaning of these parameters.
22+
struct config {
23+
std::size_t read_buffer_append_size = 4096;
24+
std::size_t max_read_size = -1;
25+
};
26+
2127
struct action {
2228
enum class type
2329
{
@@ -41,8 +47,20 @@ class reader_fsm {
4147
system::error_code ec,
4248
asio::cancellation_type_t /*cancel_state*/);
4349

50+
void set_config(config const& cfg) noexcept { cfg_ = cfg; };
51+
52+
void reset();
53+
54+
[[nodiscard]]
55+
auto get_append_buffer() noexcept
56+
{
57+
return read_buffer_.get_append_buffer();
58+
}
59+
4460
private:
4561
int resume_point_{0};
62+
read_buffer read_buffer_;
63+
config cfg_;
4664
action action_after_resume_;
4765
action::type next_read_type_ = action::type::append_some;
4866
multiplexer* mpx_ = nullptr;

include/boost/redis/error.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ enum class error
8888

8989
/// The configuration specified UNIX sockets with SSL, which is not supported.
9090
unix_sockets_ssl_unsupported,
91+
92+
/// The size of the read buffer would exceed it maximum configured value.
93+
exceeds_maximum_read_buffer_size,
9194
};
9295

9396
/**

include/boost/redis/impl/error.ipp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ struct error_category_impl : system::error_category {
5050
"supported by the system.";
5151
case error::unix_sockets_ssl_unsupported:
5252
return "The configuration specified UNIX sockets with SSL, which is not supported.";
53+
case error::exceeds_maximum_read_buffer_size:
54+
return "The size of the read buffer would exceed it maximum configured value";
5355
default: BOOST_ASSERT(false); return "Boost.Redis error.";
5456
}
5557
}

0 commit comments

Comments
 (0)