Skip to content

Commit 832aae2

Browse files
committed
Prevent observer from returning partial block data for erasure coded files
1 parent ad7aa0e commit 832aae2

File tree

2 files changed

+162
-51
lines changed

2 files changed

+162
-51
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2227,11 +2227,20 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
22272227
dir, pc, srcArg, offset, length, true);
22282228
inode = res.getIIp().getLastINode();
22292229
if (isInSafeMode()) {
2230+
int minBlocks = 1;
2231+
2232+
ErasureCodingPolicy ecPolicy = res.blocks.getErasureCodingPolicy();
22302233
for (LocatedBlock b : res.blocks.getLocatedBlocks()) {
2234+
if (ecPolicy != null) {
2235+
// If the file is erasure coded, we need at least the number of data units of
2236+
// blocks available, unless the file is smaller than a full stripe of cells.
2237+
long numCells = (b.getBlockSize() - 1) / (long)ecPolicy.getCellSize() + 1;
2238+
minBlocks = (int)Math.min((long)ecPolicy.getNumDataUnits(), numCells);
2239+
}
22312240
// if safemode & no block locations yet then throw safemodeException
2232-
if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
2241+
if ((b.getLocations() == null) || (b.getLocations().length < minBlocks)) {
22332242
SafeModeException se = newSafemodeException(
2234-
"Zero blocklocations for " + srcArg);
2243+
"Not enough blocklocations for " + srcArg);
22352244
if (haEnabled && haContext != null &&
22362245
(haContext.getState().getServiceState() == ACTIVE ||
22372246
haContext.getState().getServiceState() == OBSERVER)) {
@@ -9207,9 +9216,18 @@ private void checkBlockLocationsWhenObserver(LocatedBlocks blocks, String src)
92079216
}
92089217
List<LocatedBlock> locatedBlockList = blocks.getLocatedBlocks();
92099218
if (locatedBlockList != null) {
9219+
int minBlocks = 1;
9220+
9221+
ErasureCodingPolicy ecPolicy = blocks.getErasureCodingPolicy();
92109222
for (LocatedBlock b : locatedBlockList) {
9211-
if (b.getLocations() == null || b.getLocations().length == 0) {
9212-
throw new ObserverRetryOnActiveException("Zero blocklocations for " + src);
9223+
if (ecPolicy != null) {
9224+
// If the file is erasure coded, we need at least the number of data units of
9225+
// blocks available, unless the file is smaller than a full stripe of cells.
9226+
long numCells = (b.getBlockSize() - 1) / (long)ecPolicy.getCellSize() + 1;
9227+
minBlocks = (int)Math.min((long)ecPolicy.getNumDataUnits(), numCells);
9228+
}
9229+
if (b.getLocations() == null || b.getLocations().length < minBlocks) {
9230+
throw new ObserverRetryOnActiveException("Not enough blocklocations for " + src);
92139231
}
92149232
}
92159233
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java

Lines changed: 140 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
2222
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
2323
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
24-
import static org.junit.Assert.assertEquals;
25-
import static org.junit.Assert.assertNotNull;
26-
import static org.junit.Assert.assertTrue;
27-
import static org.junit.Assert.fail;
24+
import static org.junit.jupiter.api.Assertions.assertEquals;
25+
import static org.junit.jupiter.api.Assertions.assertNotNull;
26+
import static org.junit.jupiter.api.Assertions.assertTrue;
27+
import static org.junit.jupiter.api.Assertions.fail;
2828
import static org.mockito.ArgumentMatchers.any;
2929
import static org.mockito.ArgumentMatchers.anyBoolean;
3030
import static org.mockito.ArgumentMatchers.anyLong;
@@ -58,6 +58,7 @@
5858
import org.apache.hadoop.hdfs.MiniDFSCluster;
5959
import org.apache.hadoop.hdfs.protocol.Block;
6060
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
61+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
6162
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
6263
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
6364
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -69,17 +70,18 @@
6970
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
7071
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
7172
import org.apache.hadoop.hdfs.tools.GetGroups;
73+
import org.apache.hadoop.io.erasurecode.ECSchema;
7274
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
7375
import org.apache.hadoop.ipc.metrics.RpcMetrics;
7476
import org.apache.hadoop.test.GenericTestUtils;
7577
import org.apache.hadoop.test.LambdaTestUtils;
7678
import org.apache.hadoop.util.Time;
7779
import org.apache.hadoop.util.concurrent.HadoopExecutors;
78-
import org.junit.After;
79-
import org.junit.AfterClass;
80-
import org.junit.Before;
81-
import org.junit.BeforeClass;
82-
import org.junit.Test;
80+
import org.junit.jupiter.api.AfterEach;
81+
import org.junit.jupiter.api.AfterAll;
82+
import org.junit.jupiter.api.BeforeEach;
83+
import org.junit.jupiter.api.BeforeAll;
84+
import org.junit.jupiter.api.Test;
8385
import org.mockito.Mockito;
8486
import org.slf4j.Logger;
8587
import org.slf4j.LoggerFactory;
@@ -98,7 +100,7 @@ public class TestObserverNode {
98100

99101
private final Path testPath= new Path("/TestObserverNode");
100102

101-
@BeforeClass
103+
@BeforeAll
102104
public static void startUpCluster() throws Exception {
103105
conf = new Configuration();
104106
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
@@ -110,23 +112,23 @@ public static void startUpCluster() throws Exception {
110112
dfsCluster = qjmhaCluster.getDfsCluster();
111113
}
112114

113-
@Before
115+
@BeforeEach
114116
public void setUp() throws Exception {
115117
setObserverRead(true);
116118
}
117119

118-
@After
120+
@AfterEach
119121
public void cleanUp() throws IOException {
120122
dfs.delete(testPath, true);
121-
assertEquals("NN[0] should be active", HAServiceState.ACTIVE,
122-
getServiceState(dfsCluster.getNameNode(0)));
123-
assertEquals("NN[1] should be standby", HAServiceState.STANDBY,
124-
getServiceState(dfsCluster.getNameNode(1)));
125-
assertEquals("NN[2] should be observer", HAServiceState.OBSERVER,
126-
getServiceState(dfsCluster.getNameNode(2)));
123+
assertEquals(HAServiceState.ACTIVE, getServiceState(dfsCluster.getNameNode(0)),
124+
"NN[0] should be active");
125+
assertEquals(HAServiceState.STANDBY, getServiceState(dfsCluster.getNameNode(1)),
126+
"NN[1] should be standby");
127+
assertEquals(HAServiceState.OBSERVER, getServiceState(dfsCluster.getNameNode(2)),
128+
"NN[2] should be observer");
127129
}
128130

129-
@AfterClass
131+
@AfterAll
130132
public static void shutDownCluster() throws IOException {
131133
if (qjmhaCluster != null) {
132134
qjmhaCluster.shutdown();
@@ -228,8 +230,8 @@ public void testConfigStartup() throws Exception {
228230
}
229231

230232
// Confirm that the namenode at nnIdx is standby
231-
assertTrue("The NameNode is observer despite being transitioned to standby",
232-
dfsCluster.getNameNode(nnIdx).isStandbyState());
233+
assertTrue(dfsCluster.getNameNode(nnIdx).isStandbyState(),
234+
"The NameNode is observer despite being transitioned to standby");
233235

234236
// Restart the NameNode with observer startup option as false
235237
dfsCluster.getConfiguration(nnIdx)
@@ -238,9 +240,9 @@ public void testConfigStartup() throws Exception {
238240

239241
// Verify that the NameNode is not in Observer state
240242
dfsCluster.waitNameNodeUp(nnIdx);
241-
assertTrue("The NameNode started as Observer despite "
242-
+ DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being false",
243-
dfsCluster.getNameNode(nnIdx).isStandbyState());
243+
assertTrue(dfsCluster.getNameNode(nnIdx).isStandbyState(),
244+
"The NameNode started as Observer despite "
245+
+ DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being false");
244246

245247
dfs.mkdir(testPath, FsPermission.getDefault());
246248
assertSentTo(0);
@@ -260,9 +262,9 @@ public void testConfigStartup() throws Exception {
260262

261263
// Check that the NameNode is in Observer state
262264
dfsCluster.waitNameNodeUp(nnIdx);
263-
assertTrue("The NameNode did not start as Observer despite "
264-
+ DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being true",
265-
dfsCluster.getNameNode(nnIdx).isObserverState());
265+
assertTrue(dfsCluster.getNameNode(nnIdx).isObserverState(),
266+
"The NameNode did not start as Observer despite "
267+
+ DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being true");
266268

267269
dfs.mkdir(testPath2, FsPermission.getDefault());
268270
assertSentTo(0);
@@ -437,6 +439,43 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
437439
dfs.open(testPath).close();
438440
assertSentTo(0);
439441

442+
// Test erasure coded files
443+
ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(new ECSchema("rs", 3, 2), 1024);
444+
445+
// Fake a small file that only needs 1 block
446+
doAnswer((invocation) -> {
447+
List<LocatedBlock> fakeBlocks = new ArrayList<>();
448+
// Return a single location, which is enough for the small file but not for the large file
449+
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1, 0));
450+
DatanodeInfo datanodeInfo = new DatanodeInfo.DatanodeInfoBuilder().build();
451+
LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[] {datanodeInfo});
452+
fakeBlocks.add(fakeBlock);
453+
return new LocatedBlocks(1, false, fakeBlocks, null, true, null, ecPolicy);
454+
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
455+
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
456+
Mockito.any(), Mockito.any());
457+
458+
// Small file should suceed with just the one block
459+
dfs.open(testPath).close();
460+
assertSentTo(2);
461+
462+
// Fake a larger file that needs all 3 data shards
463+
doAnswer((invocation) -> {
464+
List<LocatedBlock> fakeBlocks = new ArrayList<>();
465+
// Return a single location, which is enough for the small file but not for the large file
466+
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1024 * 3, 0));
467+
DatanodeInfo datanodeInfo = new DatanodeInfo.DatanodeInfoBuilder().build();
468+
LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[] {datanodeInfo});
469+
fakeBlocks.add(fakeBlock);
470+
return new LocatedBlocks(1024 * 3, false, fakeBlocks, null, true, null, ecPolicy);
471+
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
472+
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
473+
Mockito.any(), Mockito.any());
474+
475+
// Large file should failover to the active
476+
dfs.open(testPath).close();
477+
assertSentTo(0);
478+
440479
Mockito.reset(bmSpy);
441480

442481
// Remove safe mode on observer, request should still go to it.
@@ -471,7 +510,62 @@ public void testObserverNodeBlockMissingRetry() throws Exception {
471510
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
472511
Mockito.any(), Mockito.any());
473512

474-
dfs.open(testPath);
513+
dfs.open(testPath).close();
514+
assertSentTo(0);
515+
516+
dfs.getClient().listPaths("/", new byte[0], true);
517+
assertSentTo(0);
518+
519+
dfs.getClient().getLocatedFileInfo(testPath.toString(), false);
520+
assertSentTo(0);
521+
522+
dfs.getClient().batchedListPaths(new String[]{"/"}, new byte[0], true);
523+
assertSentTo(0);
524+
525+
// Test erasure coded files
526+
ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(new ECSchema("rs", 3, 2), 1024);
527+
528+
// Fake a small file that only needs 1 block
529+
doAnswer((invocation) -> {
530+
List<LocatedBlock> fakeBlocks = new ArrayList<>();
531+
// Return a single location, which is enough for the small file but not for the large file
532+
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1, 0));
533+
DatanodeInfo datanodeInfo = new DatanodeInfo.DatanodeInfoBuilder().build();
534+
LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[] {datanodeInfo});
535+
fakeBlocks.add(fakeBlock);
536+
return new LocatedBlocks(1, false, fakeBlocks, null, true, null, ecPolicy);
537+
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
538+
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
539+
Mockito.any(), Mockito.any());
540+
541+
// The small file should succeed on the observer, while the large file should not
542+
543+
dfs.open(testPath).close();
544+
assertSentTo(2);
545+
546+
dfs.getClient().listPaths("/", new byte[0], true);
547+
assertSentTo(2);
548+
549+
dfs.getClient().getLocatedFileInfo(testPath.toString(), false);
550+
assertSentTo(2);
551+
552+
dfs.getClient().batchedListPaths(new String[]{"/"}, new byte[0], true);
553+
assertSentTo(2);
554+
555+
// Fake a larger file that needs all 3 data shards
556+
doAnswer((invocation) -> {
557+
List<LocatedBlock> fakeBlocks = new ArrayList<>();
558+
// Return a single location, which is enough for the small file but not for the large file
559+
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1024 * 3, 0));
560+
DatanodeInfo datanodeInfo = new DatanodeInfo.DatanodeInfoBuilder().build();
561+
LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[] {datanodeInfo});
562+
fakeBlocks.add(fakeBlock);
563+
return new LocatedBlocks(1024 * 3, false, fakeBlocks, null, true, null, ecPolicy);
564+
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
565+
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
566+
Mockito.any(), Mockito.any());
567+
568+
dfs.open(testPath).close();
475569
assertSentTo(0);
476570

477571
dfs.getClient().listPaths("/", new byte[0], true);
@@ -501,7 +595,7 @@ public void testFsckWithObserver() throws Exception {
501595
/**
502596
* Test that, if a write happens happens to go to Observer,
503597
* Observer would throw {@link ObserverRetryOnActiveException},
504-
* to inform client to retry on Active
598+
* to inform client to retry on Active.
505599
*
506600
* @throws Exception
507601
*/
@@ -563,16 +657,15 @@ public void testStickyActive() throws Exception {
563657
dfsCluster.rollEditLogAndTail(0);
564658
// No Observers present, should still go to Active
565659
dfsCluster.transitionToStandby(2);
566-
assertEquals("NN[2] should be standby", HAServiceState.STANDBY,
567-
getServiceState(dfsCluster.getNameNode(2)));
660+
assertEquals(HAServiceState.STANDBY, getServiceState(dfsCluster.getNameNode(2)),
661+
"NN[2] should be standby");
568662
newFs.open(testFile).close();
569663
assertSentTo(0);
570664
// Restore Observer
571665
int newObserver = 1;
572666
dfsCluster.transitionToObserver(newObserver);
573-
assertEquals("NN[" + newObserver + "] should be observer",
574-
HAServiceState.OBSERVER,
575-
getServiceState(dfsCluster.getNameNode(newObserver)));
667+
assertEquals(HAServiceState.OBSERVER, getServiceState(dfsCluster.getNameNode(newObserver)),
668+
"NN[" + newObserver + "] should be observer");
576669
long startTime = Time.monotonicNow();
577670
try {
578671
while(Time.monotonicNow() - startTime <= 5000) {
@@ -661,19 +754,19 @@ public void testMkdirsRaceWithObserverRead() throws Exception {
661754
LOG.warn("MkDirRunner thread failed", e.getCause());
662755
}
663756
}
664-
assertTrue("Not all threads finished", finished);
757+
assertTrue(finished, "Not all threads finished");
665758
threadPool.shutdown();
666759

667-
assertEquals("Active and Observer stateIds don't match",
668-
dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(),
669-
dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId());
760+
assertEquals(dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(),
761+
dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId(),
762+
"Active and Observer stateIds don't match");
670763
for (int i = 0; i < numThreads; i++) {
671-
assertTrue("Client #" + i
764+
assertTrue(clientStates[i].lastSeenStateId >= activStateId &&
765+
clientStates[i].fnfe == null,
766+
"Client #" + i
672767
+ " lastSeenStateId=" + clientStates[i].lastSeenStateId
673768
+ " activStateId=" + activStateId
674-
+ "\n" + clientStates[i].fnfe,
675-
clientStates[i].lastSeenStateId >= activStateId &&
676-
clientStates[i].fnfe == null);
769+
+ "\n" + clientStates[i].fnfe);
677770
}
678771

679772
// Restore edit log
@@ -707,7 +800,7 @@ public void run() {
707800

708801
FileStatus stat = fs.getFileStatus(DIR_PATH);
709802
assertSentTo(fs, 2);
710-
assertTrue("Should be a directory", stat.isDirectory());
803+
assertTrue(stat.isDirectory(), "Should be a directory");
711804
} catch (FileNotFoundException ioe) {
712805
clientState.fnfe = ioe;
713806
} catch (Exception e) {
@@ -752,13 +845,13 @@ public void testSimpleReadEmptyDirOrFile() throws IOException {
752845

753846
private static void assertSentTo(DistributedFileSystem fs, int nnIdx)
754847
throws IOException {
755-
assertTrue("Request was not sent to the expected namenode " + nnIdx,
756-
HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx));
848+
assertTrue(HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx),
849+
"Request was not sent to the expected namenode " + nnIdx);
757850
}
758851

759852
private void assertSentTo(int nnIdx) throws IOException {
760-
assertTrue("Request was not sent to the expected namenode " + nnIdx,
761-
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
853+
assertTrue(HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx),
854+
"Request was not sent to the expected namenode " + nnIdx);
762855
}
763856

764857
private static void setObserverRead(boolean flag) throws Exception {

0 commit comments

Comments
 (0)