diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index c742e62c04d..e89b2771d39 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -117,6 +117,23 @@ protected String parseBookieRegion(BookieId addr) { @Override public void handleBookiesThatLeft(Set leftBookies) { + // case 1: (In some situation, eg, Broker and bookie restart concurrently.) + //1. Bookie X join cluster for the first time, encounters a region exception, and `address2Region` record X's + // region as default-region. + //2. Bookie X left cluster and is removed from knownBookies, but address2Region retains the information of + // bookie X. + //3. update Bookie X's rack info, and calling `onBookieRackChange` will only update address2Region for + // addresses present in knownBookies; therefore, bookie X's region info is not updated. + //4. Bookie X join cluster again, since address2Region contains the previous default-region information, + // getRegion will directly use cached data, resulting of an incorrect region. + + // The bookie region is initialized to "default-region" in address2Region. + // We should ensure that when a bookie leaves the cluster, + // we also clean up the corresponding region information for that bookie in address2Region, + // so that it can update the correct region for the bookie during onBookieRackChange and + // handleBookiesThatJoined. + // to avoid traffic skew in ensemble selection. + leftBookies.forEach(address2Region::remove); super.handleBookiesThatLeft(leftBookies); for (TopologyAwareEnsemblePlacementPolicy policy: perRegionPlacement.values()) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index 8f2562763d3..89e34d28b94 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -29,6 +29,8 @@ import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; @@ -2052,4 +2054,121 @@ public void testNewEnsemblePickLocalRegionBookies() LOG.info("Bookie1 Count: {}, Bookie8 Count: {}, Bookie9 Count: {}", bookie1Count, bookie8Count, bookie9Count); } + + @Test + public void testBookieLeftThenJoinWithDNSResolveFailed() throws Exception { + + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181); + + // init dns mapping + // 2. mock dns resolver failed, use default region and rack. + // addr1 rack info. /region-1/default-rack -> /default-region/default-rack. + + // 1. mock addr1 dns resolver failed and use default region and rack. + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/default-rack"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region-1/default-rack"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region-2/default-rack"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region-3/default-rack"); + + // init cluster + Set addrs = Sets.newHashSet(addr1.toBookieId(), + addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<>()); + + assertEquals(4, repp.knownBookies.size()); + assertEquals("/default-region/default-rack", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + assertEquals("/region-1/default-rack", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + assertEquals("/region-2/default-rack", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + assertEquals("/region-3/default-rack", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + assertEquals(4, repp.perRegionPlacement.size()); + TopologyAwareEnsemblePlacementPolicy unknownRegionPlacement = repp.perRegionPlacement.get("UnknownRegion"); + assertEquals(1, unknownRegionPlacement.knownBookies.keySet().size()); + assertEquals("/default-region/default-rack", + unknownRegionPlacement.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + + TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region-1"); + assertEquals(1, region1Placement.knownBookies.keySet().size()); + assertEquals("/region-1/default-rack", + region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + + TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region-2"); + assertEquals(1, region2Placement.knownBookies.keySet().size()); + assertEquals("/region-2/default-rack", + region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + + TopologyAwareEnsemblePlacementPolicy region3Placement = repp.perRegionPlacement.get("region-3"); + assertEquals(1, region3Placement.knownBookies.keySet().size()); + assertEquals("/region-3/default-rack", + region3Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + assertEquals("UnknownRegion", repp.address2Region.get(addr1.toBookieId())); + assertEquals("region-1", repp.address2Region.get(addr2.toBookieId())); + assertEquals("region-2", repp.address2Region.get(addr3.toBookieId())); + assertEquals("region-3", repp.address2Region.get(addr4.toBookieId())); + + // 2. addr1 bookie shutdown and decommission + addrs.remove(addr1.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<>()); + + assertEquals(3, repp.knownBookies.size()); + assertNull(repp.knownBookies.get(addr1.toBookieId())); + assertEquals("/region-1/default-rack", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + assertEquals("/region-2/default-rack", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + assertEquals("/region-3/default-rack", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + // UnknownRegion,region-1,region-2,region-3 + assertEquals(4, repp.perRegionPlacement.size()); + // after addr1 bookie left, it should remove from locally address2Region + assertNull(repp.address2Region.get(addr1.toBookieId())); + assertEquals("region-1", repp.address2Region.get(addr2.toBookieId())); + assertEquals("region-2", repp.address2Region.get(addr3.toBookieId())); + assertEquals("region-3", repp.address2Region.get(addr4.toBookieId())); + + + // 3. addr1 bookie start and join + addrs.add(addr1.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<>()); + + assertEquals(4, repp.knownBookies.size()); + assertEquals("/default-region/default-rack", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + assertEquals("/region-1/default-rack", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + assertEquals("/region-2/default-rack", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + assertEquals("/region-3/default-rack", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + // UnknownRegion,region-1,region-2,region-3 + assertEquals(4, repp.perRegionPlacement.size()); + assertEquals("UnknownRegion", repp.address2Region.get(addr1.toBookieId())); + // addr1 bookie belongs to unknown region + unknownRegionPlacement = repp.perRegionPlacement.get("UnknownRegion"); + assertEquals(1, unknownRegionPlacement.knownBookies.keySet().size()); + assertEquals("/default-region/default-rack", + unknownRegionPlacement.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + + // 4. Update the correct rack. + // change addr1 rack info. /default-region/default-rack -> /region-1/default-rack. + List bookieAddressList = new ArrayList<>(); + List rackList = new ArrayList<>(); + bookieAddressList.add(addr1); + rackList.add("/region-1/default-rack"); + // onBookieRackChange + StaticDNSResolver.changeRack(bookieAddressList, rackList); + + assertEquals(4, repp.perRegionPlacement.size()); + // addr1 bookie, oldRegion=default-region, newRegion=region-1 + assertEquals("region-1", repp.address2Region.get(addr1.toBookieId())); + + unknownRegionPlacement = repp.perRegionPlacement.get("UnknownRegion"); + assertEquals(0, unknownRegionPlacement.knownBookies.keySet().size()); + assertNotNull(unknownRegionPlacement.historyBookies.get(addr1.toBookieId())); + + + region1Placement = repp.perRegionPlacement.get("region-1"); + assertEquals(2, region1Placement.knownBookies.keySet().size()); + assertEquals("/region-1/default-rack", + region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + } }