Skip to content

Commit 9cce45e

Browse files
committed
chore: rename unwritten -> pending
1 parent c0bb922 commit 9cce45e

File tree

5 files changed

+39
-41
lines changed

5 files changed

+39
-41
lines changed

loglog/src/std.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ impl RawClient {
179179
size: EntrySize(u32::expect_from(raw_entry.len())),
180180
};
181181

182-
// TODO: instead of copy, use `write_vectored_all` when it stabilizes
182+
// TODO(perf): instead of copy, use `write_vectored_all` when it stabilizes
183183
// https://github.com/rust-lang/rust/issues/70436
184184
args.write(&mut NoSeek::new(&mut buf)).expect("can't fail");
185185

loglogd/src/node.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ impl SealedSegments {
107107
}
108108
}
109109
#[derive(Debug)]
110-
pub struct EntriesInFlight {
110+
pub struct PendingEntries {
111111
/// Next log offset to give out to incoming entry
112112
next_available_log_offset: LogOffset,
113113

114114
/// Entries that were already allocated but were not yet written to storage
115-
unwritten: BTreeSet<LogOffset>,
115+
entries: BTreeSet<LogOffset>,
116116
}
117117

118118
#[derive(Error, Debug)]
@@ -141,7 +141,7 @@ pub struct NodeShared {
141141
entry_buffer_pool: Mutex<Vec<Vec<u8>>>,
142142

143143
/// Entries already received and allocated, but not yet written
144-
entries_in_flight: RwLock<EntriesInFlight>,
144+
pending_entries: RwLock<PendingEntries>,
145145

146146
/// Segment currently being written to
147147
open_segments: RwLock<OpenSegments>,
@@ -192,10 +192,10 @@ impl NodeShared {
192192

193193
/// Allocate a space in the event stream and return allocation id
194194
pub fn allocate_new_entry(self: &Arc<Self>, len: EntrySize) -> AllocationId {
195-
let mut write_in_flight = self.entries_in_flight.write().expect("locking failed");
195+
let mut write_in_flight = self.pending_entries.write().expect("locking failed");
196196

197197
let offset = write_in_flight.next_available_log_offset;
198-
let was_inserted = write_in_flight.unwritten.insert(offset);
198+
let was_inserted = write_in_flight.entries.insert(offset);
199199
debug_assert!(was_inserted);
200200
let alloc = AllocationId {
201201
offset,
@@ -210,24 +210,24 @@ impl NodeShared {
210210
alloc
211211
}
212212

213-
pub fn get_first_unwritten_log_offset(&self) -> LogOffset {
214-
let read_in_flight = self.entries_in_flight.read().expect("Locking failed");
213+
pub fn get_first_pending_log_offset(&self) -> LogOffset {
214+
let read_in_flight = self.pending_entries.read().expect("Locking failed");
215215
read_in_flight
216-
.unwritten
216+
.entries
217217
.iter()
218218
.next()
219219
.copied()
220220
.unwrap_or(read_in_flight.next_available_log_offset)
221221
}
222222

223-
pub fn get_first_unwritten_entry_offset_ge(
223+
pub fn get_first_pending_entry_offset_ge(
224224
&self,
225225
offset_inclusive: LogOffset,
226226
) -> Option<LogOffset> {
227-
self.entries_in_flight
227+
self.pending_entries
228228
.read()
229229
.expect("Locking failed")
230-
.unwritten
230+
.entries
231231
.range(offset_inclusive..)
232232
.next()
233233
.copied()
@@ -335,12 +335,12 @@ impl Node {
335335
current_term: TermId(0),
336336
entry_buffer_pool: Mutex::new(vec![]),
337337
params: params.clone(),
338-
entries_in_flight: RwLock::new(EntriesInFlight {
338+
pending_entries: RwLock::new(PendingEntries {
339339
next_available_log_offset: segments
340340
.last()
341341
.map(|s| s.content_meta.end_log_offset)
342342
.unwrap_or_default(),
343-
unwritten: BTreeSet::new(),
343+
entries: BTreeSet::new(),
344344
}),
345345
sealed_segments: RwLock::new(SealedSegments::from_iter(segments)),
346346
open_segments: RwLock::new(OpenSegments {

loglogd/src/node/request_handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ impl RequestHandlerInner {
413413

414414
let stream_fd = stream.as_raw_fd();
415415
let bytes_written = tokio::task::spawn_blocking(move || -> io::Result<u64> {
416-
// TODO(perf): should we cache these somewhere in some LRU or something?
416+
// TODO(perf): we should cache FDs to open sealed files somewhere in some LRU
417417
let file = std::fs::File::open(segment.file_meta.path())?;
418418
let file_fd = file.as_raw_fd();
419419
send_file_to_stream(file_fd, file_offset, stream_fd, bytes_to_send)

loglogd/src/node/segment_sealer.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ impl SegmentSealer {
3636
let _last_written_entry_log_offset =
3737
last_written_entry_log_offset_rx.wait_timeout(Duration::from_secs(1));
3838

39-
let first_unwritten_log_offset = shared.get_first_unwritten_log_offset();
39+
let first_pending_log_offset = shared.get_first_pending_log_offset();
4040

4141
// Termination condition. We sealed all the pending data and...
42-
if fsynced_log_offset == first_unwritten_log_offset {
42+
if fsynced_log_offset == first_pending_log_offset {
4343
// if we're done with all the pending work, and the writting loop
4444
// is done too, we can finish.
4545
if shared.is_segment_writer_done.load(Ordering::SeqCst) {
@@ -48,19 +48,19 @@ impl SegmentSealer {
4848
continue;
4949
}
5050

51-
Self::seal_segments_up_to(&shared, first_unwritten_log_offset);
51+
Self::seal_segments_up_to(&shared, first_pending_log_offset);
5252

5353
// Only after successfully looping and fsyncing and/or closing all matching open segments
5454
// we update `last_fsynced_log_offset`
55-
fsynced_log_offset = first_unwritten_log_offset;
56-
shared.update_fsynced_log_offset(first_unwritten_log_offset);
55+
fsynced_log_offset = first_pending_log_offset;
56+
shared.update_fsynced_log_offset(first_pending_log_offset);
5757
}
5858
}),
5959
}
6060
}
6161

62-
fn seal_segments_up_to(shared: &Arc<NodeShared>, first_unwritten_log_offset: LogOffset) {
63-
'inner: loop {
62+
fn seal_segments_up_to(shared: &Arc<NodeShared>, first_pending_log_offset: LogOffset) {
63+
loop {
6464
let (first_segment, second_segment_start) = {
6565
let read_open_segments = shared.open_segments.read().expect("Locking failed");
6666

@@ -75,30 +75,31 @@ impl SegmentSealer {
7575
};
7676

7777
let Some((open_segment_start, open_segment)) = first_segment else {
78-
break 'inner;
78+
return;
7979
};
8080

8181
debug!(
8282
segment_id = %open_segment.id,
8383
segment_start_log_offset = %open_segment_start,
84-
first_unwritten_log_offset = %first_unwritten_log_offset,
84+
first_unwritten_log_offset = %first_pending_log_offset,
8585
"fsync"
8686
);
8787
if let Err(e) = nix::unistd::fsync(open_segment.fd.as_raw_fd()) {
8888
warn!(error = %e, "Could not fsync opened segment file");
89-
break 'inner;
89+
return;
9090
}
9191

9292
let Some(open_segment_end ) = second_segment_start else {
93-
// Until we have at lest two open segments, we won't close the first one.
94-
// It's not a big deal, and avoids having to calculate the end of first
95-
// segment.
96-
break 'inner
93+
// Until we have the second open segments, we do not know what is the end
94+
// of the first one.
95+
// It's not a big deal, we can wait. Nothing depends on it and we will
96+
// happily serve data from open segments to the readers.
97+
return;
9798
};
9899

99100
// We can't seal a segment if there are still any pending writes to it.
100-
if first_unwritten_log_offset < open_segment_end {
101-
break 'inner;
101+
if first_pending_log_offset < open_segment_end {
102+
return;
102103
}
103104

104105
let open_segment_offset_size = open_segment_end - open_segment_start;
@@ -129,7 +130,7 @@ impl SegmentSealer {
129130
debug_assert!(removed.is_some());
130131
}
131132

132-
// continue 'inner - there might be more
133+
// continue - there might be more
133134
}
134135
}
135136
}

loglogd/src/node/segment_writer.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,13 @@ impl WriteLoopInner {
177177
let next_segment_start_log_offset = if let Some(last_segment_info) = last_segment_info {
178178
// It would be a mistake to consider current `entry_log_offset` as a beginning of
179179
// next segment as we might have arrived here out of order. Instead - we can use
180-
// `unwritten` to find first unwritten entry for new segment, and thus its starting
180+
// `pending` to find first not yet written entry for new segment, and thus its starting
181181
// byte.
182182
self.shared
183-
.get_first_unwritten_entry_offset_ge(
183+
.get_first_pending_entry_offset_ge(
184184
last_segment_info.end_of_allocation_log_offset,
185185
)
186-
.expect("at very least this entry should be in `unwritten`")
186+
.expect("at very least this entry should be in `pending`")
187187
} else {
188188
self.shared
189189
.get_sealed_segments_end_log_offset()
@@ -244,12 +244,9 @@ impl WriteLoopInner {
244244

245245
/// Mark the entry as written to disk (thought possibly not yet fsynced)
246246
pub fn mark_entry_written_no_notify(self: &Arc<Self>, log_offset: LogOffset) {
247-
let mut write_in_flight = self
248-
.shared
249-
.entries_in_flight
250-
.write()
251-
.expect("Locking failed");
252-
let was_removed = write_in_flight.unwritten.remove(&log_offset);
247+
let mut pending_entries_write =
248+
self.shared.pending_entries.write().expect("Locking failed");
249+
let was_removed = pending_entries_write.entries.remove(&log_offset);
253250
debug_assert!(was_removed);
254251
}
255252

0 commit comments

Comments
 (0)