Skip to content

Commit

Permalink
[fix] Fix data lost after when writing ledger and deleting legder exe…
Browse files Browse the repository at this point in the history
…cute concurrency (#4462)

### Motivation

| step | `BK client 1` | `BK client 2` |
| --- | --- | --- |
| 1 | create ledger `1` | 
| 2 | | open ledger `1` |
| 3 | | delete ledger `1` |
| 4 | write data to ledger `1` |

At the step `4`, the write should fail, but it succeeds. It leads users to assume the data has been written, but it can not be read.

You can reproduce the issue by `testWriteAfterDeleted`

There is a scenario that will lead to Pulsar loss messages

- `broker-2` created a ledger
- `broker-2`'s ZK session is expired, which will lead the topic it owned to be assigned to other brokers
- `broker-0` owned the topic again
  - it will delete the last empty ledger
- consumers connected to `broker-0`
- producers connected to `broker-2`
  - send messages to the topic
- on `broker-2`, the ledger can not be closed due to the ledger metadata has been deleted 

### Changes

Once the ledger is fenced, it can not be wrote anymore.
  • Loading branch information
poorbarcode committed Sep 13, 2024
1 parent 32115b6 commit 47ef48e
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
masterKeyCache.put(ledgerId, masterKey);

// Force to re-insert the master key in ledger storage
handles.getHandle(ledgerId, masterKey);
handles.getHandle(ledgerId, masterKey, true);
} else {
throw new IOException("Invalid journal. Contains journalKey "
+ " but layout version (" + journalVersion
Expand All @@ -555,7 +555,7 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
LedgerDescriptor handle = handles.getHandle(ledgerId, key);
LedgerDescriptor handle = handles.getHandle(ledgerId, key, true);
handle.setFenced();
} else {
throw new IOException("Invalid journal. Contains fenceKey "
Expand All @@ -573,7 +573,7 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
LedgerDescriptor handle = handles.getHandle(ledgerId, key);
LedgerDescriptor handle = handles.getHandle(ledgerId, key, true);
handle.setExplicitLac(explicitLacBuf);
} else {
throw new IOException("Invalid journal. Contains explicitLAC " + " but layout version ("
Expand All @@ -596,7 +596,7 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
LedgerDescriptor handle = handles.getHandle(ledgerId, key);
LedgerDescriptor handle = handles.getHandle(ledgerId, key, true);

recBuff.rewind();
handle.addEntry(Unpooled.wrappedBuffer(recBuff));
Expand Down Expand Up @@ -933,7 +933,7 @@ LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
throws IOException, BookieException {
final long ledgerId = entry.getLong(entry.readerIndex());

return handles.getHandle(ledgerId, masterKey);
return handles.getHandle(ledgerId, masterKey, false);
}

private Journal getJournal(long ledgerId) {
Expand Down Expand Up @@ -1042,7 +1042,7 @@ public void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ct
ByteBuf explicitLACEntry = null;
try {
long ledgerId = entry.getLong(entry.readerIndex());
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey, false);
synchronized (handle) {
entry.markReaderIndex();
handle.setExplicitLac(entry);
Expand Down Expand Up @@ -1131,7 +1131,7 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj
*/
public CompletableFuture<Boolean> fenceLedger(long ledgerId, byte[] masterKey)
throws IOException, BookieException {
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey, false);
return handle.fenceAndLogInJournal(getJournal(ledgerId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.io.IOException;

interface HandleFactory {
LedgerDescriptor getHandle(long ledgerId, byte[] masterKey)
LedgerDescriptor getHandle(long ledgerId, byte[] masterKey, boolean journalReplay)
throws IOException, BookieException;

LedgerDescriptor getReadOnlyHandle(long ledgerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,25 @@

package org.apache.bookkeeper.bookie;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.time.Duration;
import org.apache.bookkeeper.bookie.LedgerStorage.LedgerDeletionListener;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;

class HandleFactoryImpl implements HandleFactory, LedgerDeletionListener {
private final ConcurrentLongHashMap<LedgerDescriptor> ledgers;
private final ConcurrentLongHashMap<LedgerDescriptor> readOnlyLedgers;

/**
* Once the ledger was marked "fenced" before, the ledger was accessed by multi clients. One client is calling
* "delete" now, and other clients may call "write" continuously later. We mark these ledgers can not be written
* anymore. And maintains the state for 7 days is safety.
*/
private final Cache<Long, Boolean> recentlyFencedAndDeletedLedgers = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofDays(7)).build();

final LedgerStorage ledgerStorage;

HandleFactoryImpl(LedgerStorage ledgerStorage) {
Expand All @@ -40,10 +51,14 @@ class HandleFactoryImpl implements HandleFactory, LedgerDeletionListener {
}

@Override
public LedgerDescriptor getHandle(final long ledgerId, final byte[] masterKey) throws IOException, BookieException {
public LedgerDescriptor getHandle(final long ledgerId, final byte[] masterKey, boolean journalReplay)
throws IOException, BookieException {
LedgerDescriptor handle = ledgers.get(ledgerId);

if (handle == null) {
if (!journalReplay && recentlyFencedAndDeletedLedgers.getIfPresent(ledgerId) != null) {
throw BookieException.create(BookieException.Code.LedgerFencedException);
}
handle = LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage);
ledgers.putIfAbsent(ledgerId, handle);
}
Expand All @@ -64,8 +79,22 @@ public LedgerDescriptor getReadOnlyHandle(final long ledgerId) throws IOExceptio
return handle;
}

private void markIfConflictWritingOccurs(long ledgerId) {
LedgerDescriptor ledgerDescriptor = ledgers.get(ledgerId);
try {
if (ledgerDescriptor != null && ledgerDescriptor.isFenced()) {
recentlyFencedAndDeletedLedgers.put(ledgerId, true);
}
} catch (IOException | BookieException ex) {
// The ledger is in limbo state.
recentlyFencedAndDeletedLedgers.put(ledgerId, true);
}
}

@Override
public void ledgerDeleted(long ledgerId) {
markIfConflictWritingOccurs(ledgerId);
// Do delete.
ledgers.remove(ledgerId);
readOnlyLedgers.remove(ledgerId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public void testV4Journal() throws Exception {
} catch (Bookie.NoEntryException e) {
// correct behaviour
}
assertTrue(b.handles.getHandle(1, "testPasswd".getBytes()).isFenced());
assertTrue(b.handles.getHandle(1, "testPasswd".getBytes(), false).isFenced());

b.shutdown();
}
Expand Down Expand Up @@ -484,7 +484,7 @@ public void testV5Journal() throws Exception {
} catch (Bookie.NoEntryException e) {
// correct behavior
}
assertTrue(b.handles.getHandle(1, "testV5Journal".getBytes()).isFenced());
assertTrue(b.handles.getHandle(1, "testV5Journal".getBytes(), false).isFenced());

b.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,18 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.awaitility.reflect.WhiteboxImpl;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +46,7 @@
* This unit test tests ledger fencing.
*
*/
@Slf4j
public class TestFencing extends BookKeeperClusterTestCase {
private static final Logger LOG = LoggerFactory.getLogger(TestFencing.class);

Expand Down Expand Up @@ -77,6 +86,7 @@ public void testBasicFencing() throws Exception {
fail("Should have thrown an exception when trying to write");
} catch (BKException.BKLedgerFencedException e) {
// correct behaviour
log.info("expected a fenced error", e);
}

/*
Expand All @@ -87,6 +97,61 @@ public void testBasicFencing() throws Exception {
readlh.getLastAddConfirmed() == writelh.getLastAddConfirmed());
}

@Test
public void testWriteAfterDeleted() throws Exception {
LedgerHandle writeLedger;
writeLedger = bkc.createLedger(digestType, "password".getBytes());

String tmp = "BookKeeper is cool!";
for (int i = 0; i < 10; i++) {
long entryId = writeLedger.addEntry(tmp.getBytes());
LOG.info("entryId: {}", entryId);
}

// Fence and delete.
BookKeeperTestClient bkc2 = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
LedgerHandle readLedger = bkc2.openLedger(writeLedger.getId(), digestType, "password".getBytes());
bkc2.deleteLedger(readLedger.ledgerId);

// Waiting for GC.
for (ServerTester server : servers) {
triggerGC(server.getServer().getBookie());
}

try {
long entryId = writeLedger.addEntry(tmp.getBytes());
LOG.info("Not expected: entryId: {}", entryId);
LOG.error("Should have thrown an exception");
fail("Should have thrown an exception when trying to write");
} catch (BKException.BKLedgerFencedException e) {
log.info("expected a fenced error", e);
// correct behaviour
}

/*
* Check it has been recovered properly.
*/
assertTrue("Has not recovered correctly: " + readLedger.getLastAddConfirmed()
+ " original " + writeLedger.getLastAddConfirmed(),
readLedger.getLastAddConfirmed() == writeLedger.getLastAddConfirmed());

// cleanup.
bkc2.close();
}

private void triggerGC(Bookie bookie) {
LedgerStorage ledgerStorage = bookie.getLedgerStorage();
if (ledgerStorage instanceof InterleavedLedgerStorage
|| ledgerStorage instanceof SingleDirectoryDbLedgerStorage) {
Runnable gcThread = WhiteboxImpl.getInternalState(ledgerStorage, "gcThread");
gcThread.run();
} else if (ledgerStorage instanceof SortedLedgerStorage) {
Object actLedgerStorage = WhiteboxImpl.getInternalState(ledgerStorage, "interleavedLedgerStorage");
Runnable gcThread = WhiteboxImpl.getInternalState(actLedgerStorage, "gcThread");
gcThread.run();
}
}

private static int threadCount = 0;

class LedgerOpenThread extends Thread {
Expand Down

0 comments on commit 47ef48e

Please sign in to comment.