diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java index d03911d5835..d426c5a4170 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java @@ -257,66 +257,69 @@ void doPlacementPolicyCheck(Long ledgerId, LedgerMetadata metadata = metadataVer.getValue(); int writeQuorumSize = metadata.getWriteQuorumSize(); int ackQuorumSize = metadata.getAckQuorumSize(); - if (metadata.isClosed()) { - boolean foundSegmentNotAdheringToPlacementPolicy = false; - boolean foundSegmentSoftlyAdheringToPlacementPolicy = false; - for (Map.Entry> ensemble : metadata - .getAllEnsembles().entrySet()) { - long startEntryIdOfSegment = ensemble.getKey(); - List ensembleOfSegment = ensemble.getValue(); - EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin - .isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize, - ackQuorumSize); - if (segmentAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) { - foundSegmentNotAdheringToPlacementPolicy = true; - LOG.warn( - "For ledger: {}, Segment starting at entry: {}, with ensemble: {} having " - + "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to " - + "EnsemblePlacementPolicy", - ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize, - ackQuorumSize); - } else if (segmentAdheringToPlacementPolicy - == EnsemblePlacementPolicy.PlacementPolicyAdherence.MEETS_SOFT) { - foundSegmentSoftlyAdheringToPlacementPolicy = true; - if (LOG.isDebugEnabled()) { - LOG.debug( - "For ledger: {}, Segment starting at entry: {}, with ensemble: {}" - + " having writeQuorumSize: {} and ackQuorumSize: {} is" - + " softly adhering to EnsemblePlacementPolicy", - ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize, - ackQuorumSize); - } + boolean foundSegmentNotAdheringToPlacementPolicy = false; + boolean foundSegmentSoftlyAdheringToPlacementPolicy = false; + int ensembleIndex = 0; + int ensembleSize = metadata.getAllEnsembles().size(); + for (Map.Entry> ensemble : metadata + .getAllEnsembles().entrySet()) { + if (++ensembleIndex == ensembleSize && !metadata.isClosed()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ledger: {} last ensemble is not yet closed, so skipping the placementPolicy" + + "check analysis for now", ledgerId); } + break; } - if (foundSegmentNotAdheringToPlacementPolicy) { - numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet(); - //If user enable repaired, mark this ledger to under replication manager. - if (conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) { - ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, - Collections.emptyList()).whenComplete((res, e) -> { - if (e != null) { - LOG.error("For ledger: {}, the placement policy not adhering bookie " - + "storage, mark it to under replication manager failed.", - ledgerId, e); - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("For ledger: {}, the placement policy not adhering bookie" - + " storage, mark it to under replication manager", ledgerId); - } - }); + long startEntryIdOfSegment = ensemble.getKey(); + List ensembleOfSegment = ensemble.getValue(); + EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin + .isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize, + ackQuorumSize); + if (segmentAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) { + foundSegmentNotAdheringToPlacementPolicy = true; + LOG.warn( + "For ledger: {}, Segment starting at entry: {}, with ensemble: {} having " + + "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to " + + "EnsemblePlacementPolicy", + ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize, + ackQuorumSize); + } else if (segmentAdheringToPlacementPolicy + == EnsemblePlacementPolicy.PlacementPolicyAdherence.MEETS_SOFT) { + foundSegmentSoftlyAdheringToPlacementPolicy = true; + if (LOG.isDebugEnabled()) { + LOG.debug( + "For ledger: {}, Segment starting at entry: {}, with ensemble: {}" + + " having writeQuorumSize: {} and ackQuorumSize: {} is" + + " softly adhering to EnsemblePlacementPolicy", + ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize, + ackQuorumSize); } - } else if (foundSegmentSoftlyAdheringToPlacementPolicy) { - numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck - .incrementAndGet(); } - numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet(); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Ledger: {} is not yet closed, so skipping the placementPolicy" - + "check analysis for now", ledgerId); + } + if (foundSegmentNotAdheringToPlacementPolicy) { + numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet(); + //If user enable repaired, mark this ledger to under replication manager. + if (conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) { + ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, + Collections.emptyList()).whenComplete((res, e) -> { + if (e != null) { + LOG.error("For ledger: {}, the placement policy not adhering bookie " + + "storage, mark it to under replication manager failed.", + ledgerId, e); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("For ledger: {}, the placement policy not adhering bookie" + + " storage, mark it to under replication manager", ledgerId); + } + }); } + } else if (foundSegmentSoftlyAdheringToPlacementPolicy) { + numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck + .incrementAndGet(); } + numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet(); + iterCallback.processResult(BKException.Code.OK, null, null); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 942c09d4fa2..3c02f094366 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -388,9 +388,6 @@ private Set getNeedRepairedPlacementNotAdheringFragments(LedgerH LedgerMetadata metadata = metadataVer.getValue(); int writeQuorumSize = metadata.getWriteQuorumSize(); int ackQuorumSize = metadata.getAckQuorumSize(); - if (!metadata.isClosed()) { - return; - } Long curEntryId = null; EnsemblePlacementPolicy.PlacementPolicyAdherence previousSegmentAdheringToPlacementPolicy = null; @@ -409,7 +406,7 @@ private Set getNeedRepairedPlacementNotAdheringFragments(LedgerH writeQuorumSize, ackQuorumSize); curEntryId = entry.getKey(); } - if (curEntryId != null) { + if (curEntryId != null && metadata.isClosed()) { if (EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL == previousSegmentAdheringToPlacementPolicy) { long lastEntry = lh.getLedgerMetadata().getLastEntryId(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index fab068ae609..1752992b33e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -1239,12 +1239,22 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig //This ledger not adhering placement policy, the combine(0,1,2) rack is 1. LedgerHandle lh = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD); + //This ledger not adhering placement policy, the combine(0,1,2) rack is 1 and this ledger is not closed. + LedgerHandle lh2 = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD); + int entrySize = 10; for (int i = 0; i < entrySize; i++) { lh.addEntry(data); + lh2.addEntry(data); } lh.close(); + LedgerMetadata metadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue(); + LedgerMetadata metadata2 = bkc.getLedgerManager().readLedgerMetadata(lh2.getId()).get().getValue(); + + assertTrue(metadata.isClosed()); + assertFalse(metadata2.isClosed()); + int minNumRacksPerWriteQuorumConfValue = 2; ServerConfiguration servConf = new ServerConfiguration(confByIndex(0)); @@ -1276,6 +1286,7 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig assertNotNull(stat); baseConf.setRepairedPlacementPolicyNotAdheringBookieEnable(true); + baseConf.setOpenLedgerRereplicationGracePeriod("1000"); BookKeeper bookKeeper = new BookKeeperTestClient(baseClientConf) { @Override protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf, @@ -1310,9 +1321,15 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig } Awaitility.await().untilAsserted(() -> { - LedgerMetadata metadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue(); - List newBookies = metadata.getAllEnsembles().get(0L); + LedgerMetadata lhMetadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue(); + List newBookies = lhMetadata.getAllEnsembles().get(0L); assertTrue(newBookies.contains(newBookieId)); + assertTrue(lhMetadata.isClosed()); + + LedgerMetadata lh2Metadata = bkc.getLedgerManager().readLedgerMetadata(lh2.getId()).get().getValue(); + List newBookies2 = lh2Metadata.getAllEnsembles().get(0L); + assertTrue(!newBookies2.contains(newBookieId)); + assertTrue(!lh2Metadata.isClosed()); }); Awaitility.await().untilAsserted(() -> {