Skip to content

Commit

Permalink
Support skip invalid journal record in replying journal stage (#3956)
Browse files Browse the repository at this point in the history
Co-authored-by: zhiyuanlei <[email protected]>
  • Loading branch information
hangc0276 and zhiyuanlei authored Jun 19, 2023
1 parent 2baee36 commit 5e9fdc2
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException
logPosition = markedLog.getLogFileOffset();
}
LOG.info("Replaying journal {} from position {}", id, logPosition);
long scanOffset = journal.scanJournal(id, logPosition, scanner);
long scanOffset = journal.scanJournal(id, logPosition, scanner, conf.isSkipReplayJournalInvalidRecord());
// Update LastLogMark after completely replaying journal
// scanOffset will point to EOF position
// After LedgerStorage flush, SyncThread should persist this to disk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,13 +792,14 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
/**
* Scan the journal.
*
* @param journalId Journal Log Id
* @param journalPos Offset to start scanning
* @param scanner Scanner to handle entries
* @param journalId Journal Log Id
* @param journalPos Offset to start scanning
* @param scanner Scanner to handle entries
* @param skipInvalidRecord when invalid record,should we skip it or not
* @return scanOffset - represents the byte till which journal was read
* @throws IOException
*/
public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
public long scanJournal(long journalId, long journalPos, JournalScanner scanner, boolean skipInvalidRecord)
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
Expand Down Expand Up @@ -862,6 +863,13 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
}
}
return recLog.fc.position();
} catch (IOException e) {
if (skipInvalidRecord) {
LOG.warn("Failed to parse journal file, and skipInvalidRecord is true, skip this journal file reply");
} else {
throw e;
}
return recLog.fc.position();
} finally {
recLog.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void process(int journalVersion, long offset, ByteBuffer entry) {
LOG.info("Found ledger {} in journal", ledgerId);
}
}
});
}, false);
}

private void delete(Path path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
// Used for location index, lots of writes and much bigger dataset
protected static final String LEDGER_METADATA_ROCKSDB_CONF = "ledgerMetadataRocksdbConf";

protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord";

/**
* Construct a default configuration object.
*/
Expand Down Expand Up @@ -4010,6 +4012,25 @@ public boolean isDataIntegrityStampMissingCookiesEnabled() {
return this.getBoolean(DATA_INTEGRITY_COOKIE_STAMPING_ENABLED, false);
}


/**
* When this config is set to true,if we replay journal failed, we will skip.
* @param skipReplayJournalInvalidRecord
* @return
*/
public ServerConfiguration setSkipReplayJournalInvalidRecord(boolean skipReplayJournalInvalidRecord) {
this.setProperty(SKIP_REPLAY_JOURNAL_INVALID_RECORD,
Boolean.toString(skipReplayJournalInvalidRecord));
return this;
}

/**
* @see #isSkipReplayJournalInvalidRecord .
*/
public boolean isSkipReplayJournalInvalidRecord() {
return this.getBoolean(SKIP_REPLAY_JOURNAL_INVALID_RECORD, false);
}

/**
* Get default rocksdb conf.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,6 @@ public void process(int journalVersion, long offset, ByteBuffer entry) throws IO
}

private void scanJournal(Journal journal, long journalId, Journal.JournalScanner scanner) throws IOException {
journal.scanJournal(journalId, 0L, scanner);
journal.scanJournal(journalId, 0L, scanner, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,52 @@ private JournalChannel writeV4Journal(File journalDir, int numEntries, byte[] ma
return jc;
}

private JournalChannel writeV4JournalWithInvalidRecord(File journalDir,
int numEntries, byte[] masterKey) throws Exception {
long logId = System.currentTimeMillis();
JournalChannel jc = new JournalChannel(journalDir, logId);

moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE);

BufferedChannel bc = jc.getBufferedChannel();

byte[] data = new byte[1024];
Arrays.fill(data, (byte) 'X');
long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
for (int i = 0; i <= numEntries; i++) {
ByteBuf packet;
if (i == 0) {
packet = generateMetaEntry(1, masterKey);
} else {
packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data);
}
lastConfirmed = i;
ByteBuffer lenBuff = ByteBuffer.allocate(4);
if (i == numEntries - 1) {
//mock when flush data to file ,it writes an invalid entry to journal
lenBuff.putInt(-1);
} else {
lenBuff.putInt(packet.readableBytes());
}
lenBuff.flip();
bc.write(Unpooled.wrappedBuffer(lenBuff));
bc.write(packet);
packet.release();
}

// write fence key
ByteBuf packet = generateFenceEntry(1);
ByteBuf lenBuf = Unpooled.buffer();
lenBuf.writeInt(packet.readableBytes());
//mock
bc.write(lenBuf);
bc.write(packet);
bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V4);

return jc;
}

static JournalChannel writeV5Journal(File journalDir, int numEntries,
byte[] masterKey) throws Exception {
return writeV5Journal(journalDir, numEntries, masterKey, false);
Expand Down Expand Up @@ -844,7 +890,7 @@ public void testJournalScanIOException() throws Exception {
assertEquals(journalIds.size(), 1);

try {
journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner);
journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner, false);
fail("Should not have been able to scan the journal");
} catch (Exception e) {
// Expected
Expand All @@ -854,7 +900,74 @@ public void testJournalScanIOException() throws Exception {
b.shutdown();
}

private class DummyJournalScan implements Journal.JournalScanner {
/**
* Test for invalid record data during read of Journal.
*/
@Test
public void testJournalScanInvalidRecordWithSkipFlag() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));

File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));

try {
writeV4JournalWithInvalidRecord(BookieImpl.getCurrentDirectory(journalDir),
100, "testPasswd".getBytes());
} catch (Exception e) {
fail();
}


ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
// Disabled skip broken journal files by default
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null)
.setSkipReplayJournalInvalidRecord(true);

Journal.JournalScanner journalScanner = new DummyJournalScan();

BookieImpl b = new TestBookieImpl(conf);

for (Journal journal : b.journals) {
List<Long> journalIds = Journal.listJournalIds(journal.getJournalDirectory(), null);
assertEquals(journalIds.size(), 1);
try {
journal.scanJournal(journalIds.get(0), 0, journalScanner, conf.isSkipReplayJournalInvalidRecord());
} catch (Exception e) {
fail("Should pass the journal scanning because we enabled skip flag by default.");
}
}

b.shutdown();

// Disabled skip broken journal files by default
conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null);

journalScanner = new DummyJournalScan();

b = new TestBookieImpl(conf);

for (Journal journal : b.journals) {
List<Long> journalIds = Journal.listJournalIds(journal.getJournalDirectory(), null);
assertEquals(journalIds.size(), 1);
try {
journal.scanJournal(journalIds.get(0), 0, journalScanner, conf.isSkipReplayJournalInvalidRecord());
fail("Should fail the journal scanning because of disabled skip flag");
} catch (Exception e) {
// expected.
}
}

b.shutdown();
}


static class DummyJournalScan implements Journal.JournalScanner {

@Override
public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {
Expand Down

0 comments on commit 5e9fdc2

Please sign in to comment.