Skip to content

Recycle serialization buffers on transmission #342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,20 @@ Thus, there is no direct implementation of actions in `rmw_zenoh_cpp`.
## Security

TBD

## Environment variables

### `RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES`

The RMW recycles serialization buffers on transmission using a buffer pool with bounded memory
usage.
These buffers are returned to the pool - without being deallocated - once they cross the
network boundary in host-to-host communication, or after transmission in inter-process
communication, or upon being consumed by subscriptions in intra-process communication, etc.

When the total size of the allocated buffers within the pool exceeds
`RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES`, serialization buffers are allocated using the system
allocator and moved to Zenoh; no recycling is performed in this case to prevent the buffer pool from
growing uncontrollably.

The default value of `RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES` is 8 MiB; this value was chosen since it is roughly the size of the cache in a modern CPU.
14 changes: 14 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
return graph_cache_;
}

std::shared_ptr<rmw_zenoh_cpp::BufferPool> serialization_buffer_pool()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return serialization_buffer_pool_;
}

bool create_node_data(
const rmw_node_t * const node,
const std::string & ns,
Expand Down Expand Up @@ -412,6 +418,8 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
std::optional<zenoh::ShmProvider> shm_provider_;
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// Pool of serialization buffers.
std::shared_ptr<rmw_zenoh_cpp::BufferPool> serialization_buffer_pool_;
// ROS graph liveliness subscriber.
// The graph_subscriber *must* exist in order for anything in this Data class,
// and hence rmw_zenoh_cpp, to work.
Expand Down Expand Up @@ -507,6 +515,12 @@ std::shared_ptr<rmw_zenoh_cpp::GraphCache> rmw_context_impl_s::graph_cache()
return data_->graph_cache();
}

///=============================================================================
std::shared_ptr<rmw_zenoh_cpp::BufferPool> rmw_context_impl_s::serialization_buffer_pool()
{
return data_->serialization_buffer_pool();
}

///=============================================================================
bool rmw_context_impl_s::create_node_data(
const rmw_node_t * const node,
Expand Down
4 changes: 4 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "graph_cache.hpp"
#include "rmw_node_data.hpp"
#include "zenoh_utils.hpp"

#include "rmw/ret_types.h"
#include "rmw/types.h"
Expand Down Expand Up @@ -74,6 +75,9 @@ struct rmw_context_impl_s final
/// Return a shared_ptr to the GraphCache stored in this context.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache();

/// Return a shared_ptr to the Serialization buffer pool stored in this context.
std::shared_ptr<rmw_zenoh_cpp::BufferPool> serialization_buffer_pool();

/// Create a NodeData and store it within this context. The NodeData can be
/// retrieved using get_node().
/// Returns false if parameters are invalid.
Expand Down
51 changes: 35 additions & 16 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cinttypes>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -216,24 +217,33 @@ rmw_ret_t PublisherData::publish(
type_support_impl_);

// To store serialized message byte array.
char * msg_bytes = nullptr;

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;
uint8_t * msg_bytes = nullptr;

rmw_context_impl_s *context_impl = static_cast<rmw_context_impl_s *>(rmw_node_->data);

rcutils_allocator_t allocator = rcutils_get_default_allocator();

// Try to get memory from the serialization buffer pool.
BufferPool::Buffer serialization_buffer =
context_impl->serialization_buffer_pool()->allocate(max_data_length);
if (serialization_buffer.data == nullptr) {
void * data = allocator.allocate(max_data_length, allocator.state);
RMW_CHECK_FOR_NULL_WITH_MSG(
data, "failed to allocate serialization buffer", return RMW_RET_BAD_ALLOC);
msg_bytes = static_cast<uint8_t *>(data);
} else {
msg_bytes = serialization_buffer.data;
}

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator]() {
if (msg_bytes) {
allocator->deallocate(msg_bytes, allocator->state);
[&msg_bytes, &allocator, &serialization_buffer]() {
if (serialization_buffer.data == nullptr) {
allocator.deallocate(msg_bytes, allocator.state);
}
});

// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast<char *>(msg_bytes), max_data_length);

// Object that serializes the data
rmw_zenoh_cpp::Cdr ser(fastbuffer);
Expand All @@ -258,10 +268,19 @@ rmw_ret_t PublisherData::publish(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

// TODO(ahcorde): shmbuf
std::vector<uint8_t> raw_data(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(std::move(raw_data));
zenoh::Bytes payload;
if (serialization_buffer.data == nullptr) {
std::vector<uint8_t> raw_data(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
payload = zenoh::Bytes(std::move(raw_data));
} else {
auto deleter = [buffer_pool = context_impl->serialization_buffer_pool(),
buffer = serialization_buffer](uint8_t *){
buffer_pool->deallocate(buffer);
};
payload = zenoh::Bytes(msg_bytes, data_length, deleter);
}

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp);
Expand Down
76 changes: 76 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,80 @@ bool Payload::empty() const
return std::holds_alternative<Empty>(bytes_);
}

///=============================================================================
BufferPool::BufferPool()
: buffers_(), mutex_()
{
const char * env_value;
const char * error_str = rcutils_get_env("RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES", &env_value);
if (error_str != nullptr) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"Unable to read maximum buffer pool size, falling back to default.");
max_size_ = DEFAULT_MAX_SIZE;
} else if (strcmp(env_value, "") == 0) {
max_size_ = DEFAULT_MAX_SIZE;
} else {
max_size_ = std::atoll(env_value);
}
size_ = 0;
}

///=============================================================================
BufferPool::~BufferPool()
{
rcutils_allocator_t allocator = rcutils_get_default_allocator();

for (Buffer & buffer : buffers_) {
allocator.deallocate(buffer.data, allocator.state);
}
}

///=============================================================================
BufferPool::Buffer BufferPool::allocate(size_t size)
{
std::lock_guard<std::mutex> guard(mutex_);

rcutils_allocator_t allocator = rcutils_get_default_allocator();

if (buffers_.empty()) {
if (size_ + size > max_size_) {
return {};
} else {
size_ += size;
}
uint8_t * data = static_cast<uint8_t *>(allocator.allocate(size, allocator.state));
if (data == nullptr) {
return {};
} else {
return Buffer {data, size};
}
} else {
Buffer buffer = buffers_.back();
buffers_.pop_back();
if (buffer.size < size) {
size_t size_diff = size - buffer.size;
if (size_ + size_diff > max_size_) {
return {};
}
uint8_t * data = static_cast<uint8_t *>(allocator.reallocate(
buffer.data, size, allocator.state));
if (data == nullptr) {
return {};
}
size_ += size_diff;
buffer.data = data;
buffer.size = size;
}
return buffer;
}
}

///=============================================================================
void BufferPool::deallocate(BufferPool::Buffer buffer)
{
std::lock_guard<std::mutex> guard(mutex_);
buffers_.push_back(buffer);
}

} // namespace rmw_zenoh_cpp
36 changes: 36 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@

#include <array>
#include <chrono>
#include <cstddef>
#include <cstdlib>
#include <functional>
#include <mutex>
#include <optional>
#include <utility>
#include <variant>
#include <vector>

#include "rcutils/allocator.h"
#include "rcutils/env.h"
#include "rmw/types.h"
#include "logging_macros.hpp"

namespace rmw_zenoh_cpp
{
Expand Down Expand Up @@ -92,6 +98,36 @@ class Payload
// and `zenoh::Slice` plus a `zenoh::Bytes` otherwise.
std::variant<NonContiguous, Contiguous, Empty> bytes_;
};

///=============================================================================
class BufferPool
{
public:
struct Buffer
{
uint8_t * data;
size_t size;
};

BufferPool();

~BufferPool();

Buffer allocate(size_t size);

void deallocate(Buffer buffer);

private:
std::vector<Buffer> buffers_;
std::mutex mutex_;
size_t max_size_;
size_t size_;
// NOTE(fuzzypixelz): Pooled buffers are recycled with the expectation that they would reside in
// cache, thus this this value should be comparable to the size of a modern CPU cache. The default
// value (8 MiB) is relatively conservative as CPU cache sizes range from a few MiB to a few
// hundred MiB.
const size_t DEFAULT_MAX_SIZE = 8 * 1024 * 1024;
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ZENOH_UTILS_HPP_