From 292bafe1fa8556e83f84e67240925ce6ef1027ee Mon Sep 17 00:00:00 2001 From: fanjianye Date: Mon, 5 Jun 2023 14:54:00 +0800 Subject: [PATCH 1/2] do not skip opened ledger in repair not adhering placement ledger --- .../AuditorPlacementPolicyCheckTask.java | 108 +++++++++--------- .../replication/ReplicationWorker.java | 3 - .../replication/TestReplicationWorker.java | 23 +++- 3 files changed, 74 insertions(+), 60 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java index d03911d5835..94cd022d3ca 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java @@ -257,66 +257,66 @@ void doPlacementPolicyCheck(Long ledgerId, LedgerMetadata metadata = metadataVer.getValue(); int writeQuorumSize = metadata.getWriteQuorumSize(); int ackQuorumSize = metadata.getAckQuorumSize(); - if (metadata.isClosed()) { - boolean foundSegmentNotAdheringToPlacementPolicy = false; - boolean foundSegmentSoftlyAdheringToPlacementPolicy = false; - for (Map.Entry> ensemble : metadata - .getAllEnsembles().entrySet()) { - long startEntryIdOfSegment = ensemble.getKey(); - List ensembleOfSegment = ensemble.getValue(); - EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin - .isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize, - ackQuorumSize); - if (segmentAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) { - foundSegmentNotAdheringToPlacementPolicy = true; - LOG.warn( - "For ledger: {}, Segment starting at entry: {}, with ensemble: {} having " - + "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to " - + "EnsemblePlacementPolicy", + if (!metadata.isClosed()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ledger: {} is not yet closed, but do not skipping the placementPolicy" + + "check analysis for now", ledgerId); + } + } + boolean foundSegmentNotAdheringToPlacementPolicy = false; + boolean foundSegmentSoftlyAdheringToPlacementPolicy = false; + for (Map.Entry> ensemble : metadata + .getAllEnsembles().entrySet()) { + long startEntryIdOfSegment = ensemble.getKey(); + List ensembleOfSegment = ensemble.getValue(); + EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin + .isEnsembleAdheringToPlacementPolicy(ensembleOfSegment, writeQuorumSize, + ackQuorumSize); + if (segmentAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) { + foundSegmentNotAdheringToPlacementPolicy = true; + LOG.warn( + "For ledger: {}, Segment starting at entry: {}, with ensemble: {} having " + + "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to " + + "EnsemblePlacementPolicy", + ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize, + ackQuorumSize); + } else if (segmentAdheringToPlacementPolicy + == EnsemblePlacementPolicy.PlacementPolicyAdherence.MEETS_SOFT) { + foundSegmentSoftlyAdheringToPlacementPolicy = true; + if (LOG.isDebugEnabled()) { + LOG.debug( + "For ledger: {}, Segment starting at entry: {}, with ensemble: {}" + + " having writeQuorumSize: {} and ackQuorumSize: {} is" + + " softly adhering to EnsemblePlacementPolicy", ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize, ackQuorumSize); - } else if (segmentAdheringToPlacementPolicy - == EnsemblePlacementPolicy.PlacementPolicyAdherence.MEETS_SOFT) { - foundSegmentSoftlyAdheringToPlacementPolicy = true; - if (LOG.isDebugEnabled()) { - LOG.debug( - "For ledger: {}, Segment starting at entry: {}, with ensemble: {}" - + " having writeQuorumSize: {} and ackQuorumSize: {} is" - + " softly adhering to EnsemblePlacementPolicy", - ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize, - ackQuorumSize); - } - } - } - if (foundSegmentNotAdheringToPlacementPolicy) { - numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet(); - //If user enable repaired, mark this ledger to under replication manager. - if (conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) { - ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, - Collections.emptyList()).whenComplete((res, e) -> { - if (e != null) { - LOG.error("For ledger: {}, the placement policy not adhering bookie " - + "storage, mark it to under replication manager failed.", - ledgerId, e); - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("For ledger: {}, the placement policy not adhering bookie" - + " storage, mark it to under replication manager", ledgerId); - } - }); } - } else if (foundSegmentSoftlyAdheringToPlacementPolicy) { - numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck - .incrementAndGet(); } - numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet(); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Ledger: {} is not yet closed, so skipping the placementPolicy" - + "check analysis for now", ledgerId); + } + if (foundSegmentNotAdheringToPlacementPolicy) { + numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet(); + //If user enable repaired, mark this ledger to under replication manager. + if (conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) { + ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, + Collections.emptyList()).whenComplete((res, e) -> { + if (e != null) { + LOG.error("For ledger: {}, the placement policy not adhering bookie " + + "storage, mark it to under replication manager failed.", + ledgerId, e); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("For ledger: {}, the placement policy not adhering bookie" + + " storage, mark it to under replication manager", ledgerId); + } + }); } + } else if (foundSegmentSoftlyAdheringToPlacementPolicy) { + numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck + .incrementAndGet(); } + numOfClosedLedgersAuditedInPlacementPolicyCheck.incrementAndGet(); + iterCallback.processResult(BKException.Code.OK, null, null); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 942c09d4fa2..f011398a115 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -388,9 +388,6 @@ private Set getNeedRepairedPlacementNotAdheringFragments(LedgerH LedgerMetadata metadata = metadataVer.getValue(); int writeQuorumSize = metadata.getWriteQuorumSize(); int ackQuorumSize = metadata.getAckQuorumSize(); - if (!metadata.isClosed()) { - return; - } Long curEntryId = null; EnsemblePlacementPolicy.PlacementPolicyAdherence previousSegmentAdheringToPlacementPolicy = null; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index fab068ae609..c8e3bdb8f49 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -1239,12 +1239,22 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig //This ledger not adhering placement policy, the combine(0,1,2) rack is 1. LedgerHandle lh = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD); + //This ledger not adhering placement policy, the combine(0,1,2) rack is 1 and this ledger is not closed. + LedgerHandle lh2 = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD); + int entrySize = 10; for (int i = 0; i < entrySize; i++) { lh.addEntry(data); + lh2.addEntry(data); } lh.close(); + LedgerMetadata metadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue(); + LedgerMetadata metadata2 = bkc.getLedgerManager().readLedgerMetadata(lh2.getId()).get().getValue(); + + assertTrue(metadata.isClosed()); + assertFalse(metadata2.isClosed()); + int minNumRacksPerWriteQuorumConfValue = 2; ServerConfiguration servConf = new ServerConfiguration(confByIndex(0)); @@ -1259,7 +1269,7 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig Gauge ledgersNotAdheringToPlacementPolicyGuage = statsLogger .getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY); assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage value", - 1, ledgersNotAdheringToPlacementPolicyGuage.getSample()); + 2, ledgersNotAdheringToPlacementPolicyGuage.getSample()); Gauge ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger .getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY); assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY guage value", @@ -1276,6 +1286,7 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig assertNotNull(stat); baseConf.setRepairedPlacementPolicyNotAdheringBookieEnable(true); + baseConf.setOpenLedgerRereplicationGracePeriod("1000"); BookKeeper bookKeeper = new BookKeeperTestClient(baseClientConf) { @Override protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf, @@ -1310,9 +1321,15 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig } Awaitility.await().untilAsserted(() -> { - LedgerMetadata metadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue(); - List newBookies = metadata.getAllEnsembles().get(0L); + LedgerMetadata lhMetadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue(); + List newBookies = lhMetadata.getAllEnsembles().get(0L); assertTrue(newBookies.contains(newBookieId)); + assertTrue(lhMetadata.isClosed()); + + LedgerMetadata lh2Metadata = bkc.getLedgerManager().readLedgerMetadata(lh2.getId()).get().getValue(); + List newBookies2 = lh2Metadata.getAllEnsembles().get(0L); + assertTrue(newBookies2.contains(newBookieId)); + assertTrue(lh2Metadata.isClosed()); }); Awaitility.await().untilAsserted(() -> { From c0ff39ef218eb6bc030d625c88acece82b60113b Mon Sep 17 00:00:00 2001 From: fanjianye Date: Fri, 16 Jun 2023 14:29:18 +0800 Subject: [PATCH 2/2] skipping check the last segment if the ledger is OPEN --- .../AuditorPlacementPolicyCheckTask.java | 15 +++++++++------ .../bookkeeper/replication/ReplicationWorker.java | 2 +- .../replication/TestReplicationWorker.java | 6 +++--- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java index 94cd022d3ca..d426c5a4170 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java @@ -257,16 +257,19 @@ void doPlacementPolicyCheck(Long ledgerId, LedgerMetadata metadata = metadataVer.getValue(); int writeQuorumSize = metadata.getWriteQuorumSize(); int ackQuorumSize = metadata.getAckQuorumSize(); - if (!metadata.isClosed()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ledger: {} is not yet closed, but do not skipping the placementPolicy" - + "check analysis for now", ledgerId); - } - } boolean foundSegmentNotAdheringToPlacementPolicy = false; boolean foundSegmentSoftlyAdheringToPlacementPolicy = false; + int ensembleIndex = 0; + int ensembleSize = metadata.getAllEnsembles().size(); for (Map.Entry> ensemble : metadata .getAllEnsembles().entrySet()) { + if (++ensembleIndex == ensembleSize && !metadata.isClosed()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ledger: {} last ensemble is not yet closed, so skipping the placementPolicy" + + "check analysis for now", ledgerId); + } + break; + } long startEntryIdOfSegment = ensemble.getKey(); List ensembleOfSegment = ensemble.getValue(); EnsemblePlacementPolicy.PlacementPolicyAdherence segmentAdheringToPlacementPolicy = admin diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index f011398a115..3c02f094366 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -406,7 +406,7 @@ private Set getNeedRepairedPlacementNotAdheringFragments(LedgerH writeQuorumSize, ackQuorumSize); curEntryId = entry.getKey(); } - if (curEntryId != null) { + if (curEntryId != null && metadata.isClosed()) { if (EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL == previousSegmentAdheringToPlacementPolicy) { long lastEntry = lh.getLedgerMetadata().getLastEntryId(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index c8e3bdb8f49..1752992b33e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -1269,7 +1269,7 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig Gauge ledgersNotAdheringToPlacementPolicyGuage = statsLogger .getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY); assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage value", - 2, ledgersNotAdheringToPlacementPolicyGuage.getSample()); + 1, ledgersNotAdheringToPlacementPolicyGuage.getSample()); Gauge ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger .getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY); assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY guage value", @@ -1328,8 +1328,8 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig LedgerMetadata lh2Metadata = bkc.getLedgerManager().readLedgerMetadata(lh2.getId()).get().getValue(); List newBookies2 = lh2Metadata.getAllEnsembles().get(0L); - assertTrue(newBookies2.contains(newBookieId)); - assertTrue(lh2Metadata.isClosed()); + assertTrue(!newBookies2.contains(newBookieId)); + assertTrue(!lh2Metadata.isClosed()); }); Awaitility.await().untilAsserted(() -> {