Skip to content

Commit

Permalink
Make translog fsync configurable (#108242)
Browse files Browse the repository at this point in the history
  • Loading branch information
idegtiarenko committed May 6, 2024
1 parent 8449794 commit 0d5b80a
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,14 @@ public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint
}
}

public static void write(FileChannel fileChannel, Path checkpointFile, Checkpoint checkpoint) throws IOException {
public static void write(FileChannel fileChannel, Path checkpointFile, Checkpoint checkpoint, boolean fsync) throws IOException {
byte[] bytes = createCheckpointBytes(checkpointFile, checkpoint);
Channels.writeToChannel(bytes, fileChannel, 0);
// no need to force metadata, file size stays the same and we did the full fsync
// when we first created the file, so the directory entry doesn't change as well
fileChannel.force(false);
if (fsync) {
fileChannel.force(false);
}
}

private static byte[] createCheckpointBytes(Path checkpointFile, Checkpoint checkpoint) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ public Translog(
this.operationListener = config.getOperationListener();
this.deletionPolicy = deletionPolicy;
this.translogUUID = translogUUID;
bigArrays = config.getBigArrays();
diskIoBufferPool = config.getDiskIoBufferPool();
this.bigArrays = config.getBigArrays();
this.diskIoBufferPool = config.getDiskIoBufferPool();
ReadWriteLock rwl = new ReentrantReadWriteLock();
readLock = new ReleasableLock(rwl.readLock());
writeLock = new ReleasableLock(rwl.writeLock());
this.readLock = new ReleasableLock(rwl.readLock());
this.writeLock = new ReleasableLock(rwl.writeLock());
this.location = config.getTranslogPath();
Files.createDirectories(this.location);

Expand Down Expand Up @@ -556,7 +556,8 @@ TranslogWriter createWriter(
persistedSequenceNumberConsumer,
bigArrays,
diskIoBufferPool,
operationListener
operationListener,
config.fsync()
);
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
Expand Down Expand Up @@ -1929,7 +1930,8 @@ public static String createEmptyTranslog(
},
BigArrays.NON_RECYCLING_INSTANCE,
DiskIoBufferPool.INSTANCE,
(d, s, l) -> {}
TranslogConfig.NOOP_OPERATION_LISTENER,
true
);
writer.close();
return uuid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ public final class TranslogConfig {

public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
public static final ByteSizeValue EMPTY_TRANSLOG_BUFFER_SIZE = ByteSizeValue.ofBytes(10);
private final BigArrays bigArrays;
private final DiskIoBufferPool diskIoBufferPool;
private final IndexSettings indexSettings;
public static final OperationListener NOOP_OPERATION_LISTENER = (d, s, l) -> {};

private final ShardId shardId;
private final Path translogPath;
private final IndexSettings indexSettings;
private final BigArrays bigArrays;
private final ByteSizeValue bufferSize;
private final DiskIoBufferPool diskIoBufferPool;
private final OperationListener operationListener;
private final boolean fsync;

/**
* Creates a new TranslogConfig instance
Expand All @@ -42,18 +45,28 @@ public final class TranslogConfig {
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
*/
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, DiskIoBufferPool.INSTANCE);
this(
shardId,
translogPath,
indexSettings,
bigArrays,
DEFAULT_BUFFER_SIZE,
DiskIoBufferPool.INSTANCE,
NOOP_OPERATION_LISTENER,
true
);
}

TranslogConfig(
public TranslogConfig(
ShardId shardId,
Path translogPath,
IndexSettings indexSettings,
BigArrays bigArrays,
ByteSizeValue bufferSize,
DiskIoBufferPool diskIoBufferPool
DiskIoBufferPool diskIoBufferPool,
OperationListener operationListener
) {
this(shardId, translogPath, indexSettings, bigArrays, bufferSize, diskIoBufferPool, (d, s, l) -> {});
this(shardId, translogPath, indexSettings, bigArrays, bufferSize, diskIoBufferPool, operationListener, true);
}

public TranslogConfig(
Expand All @@ -63,7 +76,8 @@ public TranslogConfig(
BigArrays bigArrays,
ByteSizeValue bufferSize,
DiskIoBufferPool diskIoBufferPool,
OperationListener operationListener
OperationListener operationListener,
boolean fsync
) {
this.bufferSize = bufferSize;
this.indexSettings = indexSettings;
Expand All @@ -72,6 +86,7 @@ public TranslogConfig(
this.bigArrays = bigArrays;
this.diskIoBufferPool = diskIoBufferPool;
this.operationListener = operationListener;
this.fsync = fsync;
}

/**
Expand Down Expand Up @@ -120,4 +135,11 @@ public DiskIoBufferPool getDiskIoBufferPool() {
public OperationListener getOperationListener() {
return operationListener;
}

/**
* @return true if translog writes need to be followed by fsync
*/
public boolean fsync() {
return fsync;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil
/**
* Writes this header with the latest format into the file channel
*/
void write(final FileChannel channel) throws IOException {
void write(final FileChannel channel, boolean fsync) throws IOException {
final byte[] buffer = Arrays.copyOf(TRANSLOG_HEADER, headerSizeInBytes);
// Write uuid and leave 4 bytes for its length
final int uuidOffset = TRANSLOG_HEADER.length + Integer.BYTES;
Expand All @@ -183,7 +183,9 @@ void write(final FileChannel channel) throws IOException {
// Checksum header
ByteUtils.writeIntBE((int) crc32.getValue(), buffer, offset);
Channels.writeToChannel(buffer, channel);
channel.force(true);
if (fsync) {
channel.force(true);
}
assert channel.position() == headerSizeInBytes
: "Header is not fully written; header size [" + headerSizeInBytes + "], channel position [" + channel.position() + "]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// callback that's called whenever an operation with a given sequence number is successfully persisted.
private final LongConsumer persistedSequenceNumberConsumer;
private final OperationListener operationListener;
private final boolean fsync;

protected final AtomicBoolean closed = new AtomicBoolean(false);
// lock order try(Releasable lock = writeLock.acquire()) -> synchronized(this)
Expand All @@ -91,21 +92,22 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
record LastModifiedTimeCache(long lastModifiedTime, long totalOffset, long syncedOffset) {}

private TranslogWriter(
final ShardId shardId,
final Checkpoint initialCheckpoint,
final FileChannel channel,
final FileChannel checkpointChannel,
final Path path,
final Path checkpointPath,
final ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier,
ShardId shardId,
Checkpoint initialCheckpoint,
FileChannel channel,
FileChannel checkpointChannel,
Path path,
Path checkpointPath,
ByteSizeValue bufferSize,
LongSupplier globalCheckpointSupplier,
LongSupplier minTranslogGenerationSupplier,
TranslogHeader header,
final TragicExceptionHolder tragedy,
final LongConsumer persistedSequenceNumberConsumer,
final BigArrays bigArrays,
final DiskIoBufferPool diskIoBufferPool,
final OperationListener operationListener
TragicExceptionHolder tragedy,
LongConsumer persistedSequenceNumberConsumer,
BigArrays bigArrays,
DiskIoBufferPool diskIoBufferPool,
OperationListener operationListener,
boolean fsync
) throws IOException {
super(initialCheckpoint.generation, channel, path, header);
assert initialCheckpoint.offset == channel.position()
Expand Down Expand Up @@ -133,6 +135,7 @@ private TranslogWriter(
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
this.tragedy = tragedy;
this.operationListener = operationListener;
this.fsync = fsync;
this.lastModifiedTimeCache = new LastModifiedTimeCache(-1, -1, -1);
}

Expand All @@ -143,17 +146,17 @@ public static TranslogWriter create(
Path file,
ChannelFactory channelFactory,
ByteSizeValue bufferSize,
final long initialMinTranslogGen,
long initialMinTranslogGen,
long initialGlobalCheckpoint,
final LongSupplier globalCheckpointSupplier,
final LongSupplier minTranslogGenerationSupplier,
final long primaryTerm,
LongSupplier globalCheckpointSupplier,
LongSupplier minTranslogGenerationSupplier,
long primaryTerm,
TragicExceptionHolder tragedy,
final LongConsumer persistedSequenceNumberConsumer,
final BigArrays bigArrays,
LongConsumer persistedSequenceNumberConsumer,
BigArrays bigArrays,
DiskIoBufferPool diskIoBufferPool,
final OperationListener operationListener

OperationListener operationListener,
boolean fsync
) throws IOException {
final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME);

Expand All @@ -162,14 +165,14 @@ public static TranslogWriter create(
try {
checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE);
final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm);
header.write(channel);
header.write(channel, fsync);
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(
header.sizeInBytes(),
fileGeneration,
initialGlobalCheckpoint,
initialMinTranslogGen
);
Checkpoint.write(checkpointChannel, checkpointFile, checkpoint);
Checkpoint.write(checkpointChannel, checkpointFile, checkpoint, fsync);
final LongSupplier writerGlobalCheckpointSupplier;
if (Assertions.ENABLED) {
writerGlobalCheckpointSupplier = () -> {
Expand All @@ -196,7 +199,8 @@ public static TranslogWriter create(
persistedSequenceNumberConsumer,
bigArrays,
diskIoBufferPool,
operationListener
operationListener,
fsync
);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
Expand Down Expand Up @@ -504,10 +508,10 @@ && syncNeeded()) {
// we can continue writing to the buffer etc.
try {
assert lastSyncedCheckpoint.offset != checkpointToSync.offset || toWrite.length() == 0;
if (lastSyncedCheckpoint.offset != checkpointToSync.offset) {
if (lastSyncedCheckpoint.offset != checkpointToSync.offset && fsync) {
channel.force(false);
}
Checkpoint.write(checkpointChannel, checkpointPath, checkpointToSync);
Checkpoint.write(checkpointChannel, checkpointPath, checkpointToSync, fsync);
} catch (final Exception ex) {
closeWithTragicEvent(ex);
throw ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private static void writeEmptyCheckpoint(Path filename, int translogLength, long
private static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException {
try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) {
TranslogHeader header = new TranslogHeader(translogUUID, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
header.write(fc);
header.write(fc, true);
return header.sizeInBytes();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private Tuple<List<TranslogReader>, TranslogWriter> createReadersAndWriter() thr
seqNo -> {},
BigArrays.NON_RECYCLING_INSTANCE,
TranslogTests.RANDOMIZING_IO_BUFFERS,
(d, s, l) -> {}
TranslogConfig.NOOP_OPERATION_LISTENER,
true
);
writer = Mockito.spy(writer);
byte[] bytes = new byte[4];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void testCurrentHeaderVersion() throws Exception {
final long generation = randomNonNegativeLong();
final Path translogFile = createTempDir().resolve(Translog.getFilename(generation));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
outHeader.write(channel);
outHeader.write(channel, true);
assertThat(outHeader.sizeInBytes(), equalTo((int) channel.position()));
}
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
Expand Down Expand Up @@ -83,7 +83,7 @@ public void testCorruptTranslogHeader() throws Exception {
final Path translogLocation = createTempDir();
final Path translogFile = translogLocation.resolve(Translog.getFilename(generation));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
outHeader.write(channel);
outHeader.write(channel, true);
assertThat(outHeader.sizeInBytes(), equalTo((int) channel.position()));
}
TestTranslog.corruptFile(logger, random(), translogFile, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting
NON_RECYCLING_INSTANCE,
bufferSize,
randomBoolean() ? DiskIoBufferPool.INSTANCE : RANDOMIZING_IO_BUFFERS,
Objects.requireNonNullElse(listener, (d, s, l) -> {})
Objects.requireNonNullElse(listener, (d, s, l) -> {}),
true
);
}

Expand Down Expand Up @@ -1390,7 +1391,9 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB),
randomBoolean() ? DiskIoBufferPool.INSTANCE : RANDOMIZING_IO_BUFFERS
randomBoolean() ? DiskIoBufferPool.INSTANCE : RANDOMIZING_IO_BUFFERS,
TranslogConfig.NOOP_OPERATION_LISTENER,
true
);

final Set<Long> persistedSeqNos = new HashSet<>();
Expand Down Expand Up @@ -3996,4 +3999,50 @@ static boolean hasCircularReference(Exception cause) {
}
return false;
}

public void testDisabledFsync() throws IOException {
var config = new TranslogConfig(
shardId,
translogDir,
IndexSettingsModule.newIndexSettings(shardId.getIndex(), Settings.EMPTY),
NON_RECYCLING_INSTANCE,
new ByteSizeValue(1, ByteSizeUnit.KB),
randomBoolean() ? DiskIoBufferPool.INSTANCE : RANDOMIZING_IO_BUFFERS,
TranslogConfig.NOOP_OPERATION_LISTENER,
false
);
var translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
primaryTerm.get()
);

try (
var translog = new Translog(
config,
translogUUID,
new TranslogDeletionPolicy(),
() -> SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm::get,
getPersistedSeqNoConsumer()
) {
@Override
ChannelFactory getChannelFactory() {
return (file, openOption) -> new FilterFileChannel(FileChannel.open(file, openOption)) {
@Override
public void force(boolean metaData) {
throw new AssertionError("fsync should be disabled");
}
};
}
}
) {
if (randomBoolean()) {
translog.rollGeneration();
}
var location = translog.add(indexOp(randomUUID(), 1, primaryTerm.get(), "source"));
assertTrue("sync needs to happen", translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO));
}
}
}

0 comments on commit 0d5b80a

Please sign in to comment.