Skip to content

Commit

Permalink
Add padding before timestamp size record if it doesn't fit into a block.
Browse files Browse the repository at this point in the history
  • Loading branch information
andlr committed May 3, 2024
1 parent ed01bab commit 9e74314
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
25 changes: 25 additions & 0 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,31 @@ TEST_P(LogTest, RecycleWithTimestampSize) {
ASSERT_EQ("EOF", Read());
}

// Validates that `MaybeAddUserDefinedTimestampSizeRecord`` adds padding to the
// tail of a block and switches to a new block, if there's not enough space for
// the record.
TEST_P(LogTest, TimestampSizeRecordPadding) {
bool recyclable_log = (std::get<0>(GetParam()) != 0);
const size_t header_size =
recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
const size_t data_len = kBlockSize - 2 * header_size;

const auto first_str = BigString("foo", data_len);
Write(first_str);

UnorderedMap<uint32_t, size_t> ts_sz = {
{2, sizeof(uint64_t)},
};
writer_->MaybeAddUserDefinedTimestampSizeRecord(WriteOptions(), ts_sz);
ASSERT_LT(writer_->TEST_block_offset(), kBlockSize);

const auto second_str = BigString("bar", 1000);
Write(second_str);

ASSERT_EQ(first_str, Read());
CheckRecordAndTimestampSize(second_str, ts_sz);
}

// Do NOT enable compression for this instantiation.
INSTANTIATE_TEST_CASE_P(
Log, LogTest,
Expand Down
36 changes: 26 additions & 10 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
block_offset_(0),
log_number_(log_number),
recycle_log_files_(recycle_log_files),
// Header size varies depending on whether we are recycling or not.
header_size_(recycle_log_files ? kRecyclableHeaderSize : kHeaderSize),
manual_flush_(manual_flush),
compression_type_(compression_type),
compress_(nullptr) {
Expand Down Expand Up @@ -80,10 +82,6 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
const char* ptr = slice.data();
size_t left = slice.size();

// Header size varies depending on whether we are recycling or not.
const int header_size =
recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize;

// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Expand All @@ -102,12 +100,12 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
do {
const int64_t leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < header_size) {
if (leftover < header_size_) {
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize and
// kRecyclableHeaderSize being <= 11)
assert(header_size <= 11);
assert(header_size_ <= 11);
s = dest_->Append(opts,
Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
static_cast<size_t>(leftover)),
Expand All @@ -120,9 +118,9 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
}

// Invariant: we never leave < header_size bytes in a block.
assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size);
assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size_);

const size_t avail = kBlockSize - block_offset_ - header_size;
const size_t avail = kBlockSize - block_offset_ - header_size_;

// Compress the record if compression is enabled.
// Compress() is called at least once (compress_start=true) and after the
Expand Down Expand Up @@ -203,8 +201,7 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) {
}
}
// Initialize fields required for compression
const size_t max_output_buffer_len =
kBlockSize - (recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize);
const size_t max_output_buffer_len = kBlockSize - header_size_;
CompressionOptions opts;
constexpr uint32_t compression_format_version = 2;
compress_ = StreamingCompress::Create(compression_type_, opts,
Expand Down Expand Up @@ -244,6 +241,25 @@ IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord(
record.EncodeTo(&encoded);
RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType
: kUserDefinedTimestampSizeType;

// If there's not enough space for this record, switch to a new block.
const int64_t leftover = kBlockSize - block_offset_;
if (leftover < header_size_ + (int)encoded.size()) {
IOOptions opts;
IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (!s.ok()) {
return s;
}

std::vector<char> trailer(leftover, '\x00');
s = dest_->Append(opts, Slice(trailer.data(), trailer.size()));
if (!s.ok()) {
return s;
}

block_offset_ = 0;
}

return EmitPhysicalRecord(write_options, type, encoded.data(),
encoded.size());
}
Expand Down
3 changes: 3 additions & 0 deletions db/log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,14 @@ class Writer {

bool BufferIsEmpty();

size_t TEST_block_offset() const { return block_offset_; }

private:
std::unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
uint64_t log_number_;
bool recycle_log_files_;
int header_size_;

// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
Expand Down

0 comments on commit 9e74314

Please sign in to comment.