From bf4f42cb3416408596cb9549cda681ddeb873e47 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Sat, 20 May 2023 14:48:05 +0800 Subject: [PATCH 1/4] Force ReSchedule Auditor tasks --- .../meta/LedgerUnderreplicationManager.java | 33 ++++++ .../meta/NullMetadataBookieDriver.java | 10 ++ .../meta/ZkLedgerUnderreplicationManager.java | 103 ++++++++++++++++++ .../bookkeeper/replication/Auditor.java | 89 ++++++++++++++- .../bookkeeper/util/BookKeeperConstants.java | 1 + 5 files changed, 233 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java index 3cd1ceb4b15..9e8657befc3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java @@ -158,6 +158,39 @@ boolean isLedgerReplicationEnabled() void notifyLedgerReplicationEnabled(GenericCallback cb) throws ReplicationException.UnavailableException; + /** + * Emit Auditor tasks re-schedule. + * + */ + void emitScheduleAuditorTasks() + throws ReplicationException.UnavailableException; + + /** + * Finishing Auditor tasks re-schedule. + * + */ + void finishedScheduleAuditorTasks() + throws ReplicationException.UnavailableException; + + /** + * Check whether the Auditor tasks re-schedule is emitted or not. This will return + * true if the schedule task is emitted, otherwise return false. + * + * @return - return true if it is emitted otherwise return false + */ + boolean isAuditorTasksReScheduleEmit() + throws ReplicationException.UnavailableException; + + /** + * Receive notification asynchronously when the schedule auditor tasks + * is emitted. + * + * @param cb + * - callback implementation to receive the notification + */ + void notifyReScheduleAuditorTasksChanged(GenericCallback cb) + throws ReplicationException.UnavailableException; + /** * Creates the zNode for lostBookieRecoveryDelay with the specified value and returns true. * If the node is already existing, then it returns false. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java index 7d2d84381da..40aade18daa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java @@ -359,6 +359,16 @@ public boolean isLedgerReplicationEnabled() { @Override public void notifyLedgerReplicationEnabled(GenericCallback cb) {} @Override + public void emitScheduleAuditorTasks() {} + @Override + public void finishedScheduleAuditorTasks() {} + @Override + public boolean isAuditorTasksReScheduleEmit() { + return false; + } + @Override + public void notifyReScheduleAuditorTasksChanged(GenericCallback cb) {} + @Override public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) { return false; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index 8587ac0ca0f..7e6e6e3bf8d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -772,6 +772,109 @@ public void process(WatchedEvent e) { } } + @Override + public void emitScheduleAuditorTasks() + throws ReplicationException.UnavailableException { + List zkAcls = ZkUtils.getACLs(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("emitScheduleAuditorTasks()"); + } + try { + String znode = basePath + '/' + BookKeeperConstants.SCHEDULE_AUDITOR_NODE; + zkc.create(znode, "".getBytes(UTF_8), zkAcls, CreateMode.PERSISTENT); + LOG.info("Auto Schedule auditor tasks emitted!"); + } catch (KeeperException.NodeExistsException ke) { + LOG.warn("Schedule auditor tasks is already emitted!", ke); + throw new ReplicationException.UnavailableException( + "Schedule auditor tasks is already emitted!", ke); + } catch (KeeperException ke) { + LOG.error("Exception while emitting auto schedule auditor tasks", ke); + throw ReplicationException.fromKeeperException("Exception while emitting auto schedule auditor tasks", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException( + "Interrupted while emitting auto schedule auditor tasks", ie); + } + } + + @Override + public void finishedScheduleAuditorTasks() + throws ReplicationException.UnavailableException { + if (LOG.isDebugEnabled()) { + LOG.debug("finishedScheduleAuditorTasks()"); + } + try { + zkc.delete(basePath + '/' + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, -1); + LOG.info("Finished automatic schedule auditor tasks"); + } catch (KeeperException.NoNodeException ke) { + LOG.warn("Schedule auditor tasks is already finished!", ke); + throw new ReplicationException.UnavailableException( + "Schedule auditor tasks is already finished!", ke); + } catch (KeeperException ke) { + LOG.error("Exception while finishing schedule auditor tasks", ke); + throw ReplicationException.fromKeeperException("Exception while finishing schedule auditor tasks", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException( + "Interrupted while finishing schedule auditor tasks", ie); + } + } + + @Override + public boolean isAuditorTasksReScheduleEmit() + throws ReplicationException.UnavailableException { + if (LOG.isDebugEnabled()) { + LOG.debug("isAuditorTasksReScheduleEmit()"); + } + try { + return null == zkc.exists(basePath + '/' + + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, false); + } catch (KeeperException ke) { + LOG.error("Error while checking the state of " + + "auditor tasks schedule", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException( + "Interrupted while contacting zookeeper", ie); + } + } + + @Override + public void notifyReScheduleAuditorTasksChanged(final GenericCallback cb) + throws ReplicationException.UnavailableException { + if (LOG.isDebugEnabled()) { + LOG.debug("notifyReScheduleAuditorTasksChanged()"); + } + Watcher w = new Watcher() { + @Override + public void process(WatchedEvent e) { + if (e.getType() == Event.EventType.NodeCreated) { + LOG.info("Schedule auditor tasks is emitted externally through Zookeeper, " + + "since SCHEDULE_AUDITOR_NODE ZNode is created"); + cb.operationComplete(0, null); + } + } + }; + try { + if (null == zkc.exists(basePath + '/' + + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, w)) { + LOG.info("Schedule auditor tasks is emitted externally through Zookeeper, " + + "since SCHEDULE_AUDITOR_NODE ZNode is created"); + cb.operationComplete(0, null); + return; + } + } catch (KeeperException ke) { + LOG.error("Error while checking the state of " + + "schedule auditor tasks", ke); + throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new ReplicationException.UnavailableException( + "Interrupted while contacting zookeeper", ie); + } + } + /** * Check whether the ledger is being replicated by any bookie. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index ceac7c56f86..e8aa3e3258c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -55,6 +56,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.collections4.CollectionUtils; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +90,9 @@ public class Auditor implements AutoCloseable { protected AuditorTask auditorPlacementPolicyCheckTask; protected AuditorTask auditorReplicasCheckTask; private final List allAuditorTasks = Lists.newArrayList(); + protected volatile ScheduledFuture auditorCheckAllLedgersTaskFuture; + protected volatile ScheduledFuture auditorPlacementPolicyCheckTaskFuture; + protected volatile ScheduledFuture auditorReplicasCheckTaskFuture; private final AuditorStats auditorStats; @@ -379,6 +384,62 @@ synchronized Future submitLostBookieRecoveryDelayChangedEvent() { }); } + synchronized Future submitReScheduleAuditorTasksChangedEvent() { + if (executor.isShutdown()) { + SettableFuture f = SettableFuture.create(); + f.setException(new BKAuditException("Auditor shutting down")); + return f; + } + return executor.submit(() -> { + boolean reScheduleEmit = true; + try { + waitIfLedgerReplicationDisabled(); + reScheduleEmit = Auditor.this.ledgerUnderreplicationManager.isAuditorTasksReScheduleEmit(); + if (reScheduleEmit) { + if (auditorCheckAllLedgersTaskFuture != null) { + LOG.info("ReScheduleAuditorTasks has been emitted so canceling the pending auditorCheckAllLedgersTask"); + auditorCheckAllLedgersTaskFuture.cancel(false); + } + if (auditorPlacementPolicyCheckTaskFuture != null) { + LOG.info("ReScheduleAuditorTasks has been emitted so canceling the pending auditorPlacementPolicyCheckTask"); + auditorPlacementPolicyCheckTaskFuture.cancel(false); + } + if (auditorReplicasCheckTaskFuture != null) { + LOG.info("ReScheduleAuditorTasks has been emitted so canceling the pending auditorReplicasCheckTask"); + auditorReplicasCheckTaskFuture.cancel(false); + } + + scheduleCheckAllLedgersTask(); + schedulePlacementPolicyCheckTask(); + scheduleReplicasCheckTask(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted while for LedgersReplication to be enabled ", ie); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + } catch (UnavailableException ue) { + LOG.error("Exception while reading from ZK", ue); + } finally { + if (reScheduleEmit) { + try { + Auditor.this.ledgerUnderreplicationManager.finishedScheduleAuditorTasks(); + } catch (UnavailableException e) { + if (e.getCause() != null && e.getCause() instanceof KeeperException.NoNodeException) { + if (LOG.isDebugEnabled()) { + LOG.debug("NoNode is ok while finished schedule tasks."); + } + } else { + LOG.error("Exception while finished schedule tasks ", e); + submitShutdownTask(); + } + } + } + } + }); + } + public void start() { LOG.info("I'm starting as Auditor Bookie. ID: {}", bookieIdentifier); // on startup watching available bookie and based on the @@ -395,6 +456,8 @@ public void start() { .notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb()); this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged( new UnderReplicatedLedgersChangedCb()); + this.ledgerUnderreplicationManager + .notifyReScheduleAuditorTasksChanged(new ReScheduleAuditorTasksChangedCb()); } catch (BKException bke) { LOG.error("Couldn't get bookie list, so exiting", bke); submitShutdownTask(); @@ -466,7 +529,8 @@ private void scheduleCheckAllLedgersTask() { + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval); - executor.scheduleAtFixedRate(auditorCheckAllLedgersTask, initialDelay, interval, TimeUnit.SECONDS); + auditorCheckAllLedgersTaskFuture = + executor.scheduleAtFixedRate(auditorCheckAllLedgersTask, initialDelay, interval, TimeUnit.SECONDS); } else { LOG.info("Periodic checking disabled"); } @@ -510,7 +574,9 @@ private void schedulePlacementPolicyCheckTask() { + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", placementPolicyCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval); - executor.scheduleAtFixedRate(auditorPlacementPolicyCheckTask, initialDelay, interval, TimeUnit.SECONDS); + auditorPlacementPolicyCheckTaskFuture = + executor.scheduleAtFixedRate( + auditorPlacementPolicyCheckTask, initialDelay, interval, TimeUnit.SECONDS); } else { LOG.info("Periodic placementPolicy check disabled"); } @@ -555,7 +621,8 @@ private void scheduleReplicasCheckTask() { + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}", replicasCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval); - executor.scheduleAtFixedRate(auditorReplicasCheckTask, initialDelay, interval, TimeUnit.SECONDS); + auditorReplicasCheckTaskFuture = + executor.scheduleAtFixedRate(auditorReplicasCheckTask, initialDelay, interval, TimeUnit.SECONDS); } private class UnderReplicatedLedgersChangedCb implements GenericCallback { @@ -584,6 +651,22 @@ public void operationComplete(int rc, Void result) { } } + private class ReScheduleAuditorTasksChangedCb implements GenericCallback { + @Override + public void operationComplete(int rc, Void result) { + try { + Auditor.this.ledgerUnderreplicationManager + .notifyReScheduleAuditorTasksChanged(ReScheduleAuditorTasksChangedCb.this); + } catch (ReplicationException.NonRecoverableReplicationException nre) { + LOG.error("Non Recoverable Exception while reading from ZK", nre); + submitShutdownTask(); + } catch (UnavailableException ae) { + LOG.error("Exception while registering for a ReScheduleAuditorTasks notification", ae); + } + Auditor.this.submitReScheduleAuditorTasksChangedEvent(); + } + } + private void waitIfLedgerReplicationDisabled() throws UnavailableException, InterruptedException { ReplicationEnableCb cb = new ReplicationEnableCb(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java index 107708092f6..3e3ebb5f2aa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java @@ -50,6 +50,7 @@ public class BookKeeperConstants { public static final String LAYOUT_ZNODE = "LAYOUT"; public static final String INSTANCEID = "INSTANCEID"; public static final String DISABLE_HEALTH_CHECK = "disableHealthCheck"; + public static final String SCHEDULE_AUDITOR_NODE = "scheduleAuditor"; /** * Set the max log size limit to 1GB. It makes extra room for entry log file before From d522d78a3bd615d82d27f7c0ab7f99030906b8ea Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Wed, 24 May 2023 22:54:57 +0800 Subject: [PATCH 2/4] Improve the remaining functions & add metric & add trigger cmd & add unit tests --- .../apache/bookkeeper/bookie/BookieShell.java | 8 ++- .../meta/LedgerUnderreplicationManager.java | 8 +-- .../meta/NullMetadataBookieDriver.java | 8 +-- .../meta/ZkLedgerUnderreplicationManager.java | 27 ++++------ .../bookkeeper/replication/Auditor.java | 35 +++++++----- .../bookkeeper/replication/AuditorStats.java | 7 +++ .../replication/ReplicationStats.java | 1 + .../bookie/ForceAuditorChecksCmdTest.java | 8 ++- .../replication/AuditorPeriodicCheckTest.java | 53 +++++++++++++++++++ 9 files changed, 115 insertions(+), 40 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 944b97e79dc..476d5c4b18d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -1854,6 +1854,7 @@ class ForceAuditorChecksCmd extends MyCommand { + "upon next Auditor startup "); opts.addOption("rc", "replicascheck", false, "Force replicasCheck audit " + "upon next Auditor startup "); + opts.addOption("f", "force", false, "Force re-schedule auditor task to run."); } @Override @@ -1870,7 +1871,7 @@ String getDescription() { @Override String getUsage() { - return "forceauditchecks [-checkallledgerscheck [-placementpolicycheck] [-replicascheck]"; + return "forceauditchecks [-checkallledgerscheck] [-placementpolicycheck] [-replicascheck] [-force]"; } @Override @@ -1878,6 +1879,7 @@ int runCmd(CommandLine cmdLine) throws Exception { boolean checkAllLedgersCheck = cmdLine.hasOption("calc"); boolean placementPolicyCheck = cmdLine.hasOption("ppc"); boolean replicasCheck = cmdLine.hasOption("rc"); + boolean forceScheduleTask = cmdLine.hasOption("f"); if (checkAllLedgersCheck || placementPolicyCheck || replicasCheck) { runFunctionWithLedgerManagerFactory(bkConf, mFactory -> { @@ -1898,6 +1900,10 @@ int runCmd(CommandLine cmdLine) throws Exception { LOG.info("Resetting ReplicasCheckCTime to : " + new Timestamp(time)); underreplicationManager.setReplicasCheckCTime(time); } + if (forceScheduleTask) { + LOG.info("Emitting Reschedule Auditor check tasks execution."); + underreplicationManager.emitRescheduleAuditorTasks(); + } } } catch (InterruptedException | ReplicationException e) { LOG.error("Exception while trying to reset last run time ", e); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java index 9e8657befc3..a61c0fdc28a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java @@ -162,14 +162,14 @@ void notifyLedgerReplicationEnabled(GenericCallback cb) * Emit Auditor tasks re-schedule. * */ - void emitScheduleAuditorTasks() + void emitRescheduleAuditorTasks() throws ReplicationException.UnavailableException; /** * Finishing Auditor tasks re-schedule. * */ - void finishedScheduleAuditorTasks() + void finishedRescheduleAuditorTasks() throws ReplicationException.UnavailableException; /** @@ -178,7 +178,7 @@ void finishedScheduleAuditorTasks() * * @return - return true if it is emitted otherwise return false */ - boolean isAuditorTasksReScheduleEmit() + boolean isAuditorTasksRescheduleEmit() throws ReplicationException.UnavailableException; /** @@ -188,7 +188,7 @@ boolean isAuditorTasksReScheduleEmit() * @param cb * - callback implementation to receive the notification */ - void notifyReScheduleAuditorTasksChanged(GenericCallback cb) + void notifyRescheduleAuditorTasksChanged(GenericCallback cb) throws ReplicationException.UnavailableException; /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java index 40aade18daa..ae8c40d5fb1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/NullMetadataBookieDriver.java @@ -359,15 +359,15 @@ public boolean isLedgerReplicationEnabled() { @Override public void notifyLedgerReplicationEnabled(GenericCallback cb) {} @Override - public void emitScheduleAuditorTasks() {} + public void emitRescheduleAuditorTasks() {} @Override - public void finishedScheduleAuditorTasks() {} + public void finishedRescheduleAuditorTasks() {} @Override - public boolean isAuditorTasksReScheduleEmit() { + public boolean isAuditorTasksRescheduleEmit() { return false; } @Override - public void notifyReScheduleAuditorTasksChanged(GenericCallback cb) {} + public void notifyRescheduleAuditorTasksChanged(GenericCallback cb) {} @Override public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) { return false; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index 7e6e6e3bf8d..5f57e2bd913 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -773,11 +773,11 @@ public void process(WatchedEvent e) { } @Override - public void emitScheduleAuditorTasks() + public void emitRescheduleAuditorTasks() throws ReplicationException.UnavailableException { List zkAcls = ZkUtils.getACLs(conf); if (LOG.isDebugEnabled()) { - LOG.debug("emitScheduleAuditorTasks()"); + LOG.debug("emitRescheduleAuditorTasks()"); } try { String znode = basePath + '/' + BookKeeperConstants.SCHEDULE_AUDITOR_NODE; @@ -798,10 +798,10 @@ public void emitScheduleAuditorTasks() } @Override - public void finishedScheduleAuditorTasks() + public void finishedRescheduleAuditorTasks() throws ReplicationException.UnavailableException { if (LOG.isDebugEnabled()) { - LOG.debug("finishedScheduleAuditorTasks()"); + LOG.debug("finishedRescheduleAuditorTasks()"); } try { zkc.delete(basePath + '/' + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, -1); @@ -821,13 +821,13 @@ public void finishedScheduleAuditorTasks() } @Override - public boolean isAuditorTasksReScheduleEmit() + public boolean isAuditorTasksRescheduleEmit() throws ReplicationException.UnavailableException { if (LOG.isDebugEnabled()) { - LOG.debug("isAuditorTasksReScheduleEmit()"); + LOG.debug("isAuditorTasksRescheduleEmit()"); } try { - return null == zkc.exists(basePath + '/' + return null != zkc.exists(basePath + '/' + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, false); } catch (KeeperException ke) { LOG.error("Error while checking the state of " @@ -841,10 +841,10 @@ public boolean isAuditorTasksReScheduleEmit() } @Override - public void notifyReScheduleAuditorTasksChanged(final GenericCallback cb) + public void notifyRescheduleAuditorTasksChanged(final GenericCallback cb) throws ReplicationException.UnavailableException { if (LOG.isDebugEnabled()) { - LOG.debug("notifyReScheduleAuditorTasksChanged()"); + LOG.debug("notifyRescheduleAuditorTasksChanged()"); } Watcher w = new Watcher() { @Override @@ -857,13 +857,8 @@ public void process(WatchedEvent e) { } }; try { - if (null == zkc.exists(basePath + '/' - + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, w)) { - LOG.info("Schedule auditor tasks is emitted externally through Zookeeper, " - + "since SCHEDULE_AUDITOR_NODE ZNode is created"); - cb.operationComplete(0, null); - return; - } + zkc.addWatch(basePath + "/" + + BookKeeperConstants.SCHEDULE_AUDITOR_NODE, w, AddWatchMode.PERSISTENT); } catch (KeeperException ke) { LOG.error("Error while checking the state of " + "schedule auditor tasks", ke); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index e8aa3e3258c..29aeb6e883a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -384,7 +384,7 @@ synchronized Future submitLostBookieRecoveryDelayChangedEvent() { }); } - synchronized Future submitReScheduleAuditorTasksChangedEvent() { + synchronized Future submitRescheduleAuditorTasksChangedEvent() { if (executor.isShutdown()) { SettableFuture f = SettableFuture.create(); f.setException(new BKAuditException("Auditor shutting down")); @@ -394,18 +394,24 @@ synchronized Future submitReScheduleAuditorTasksChangedEvent() { boolean reScheduleEmit = true; try { waitIfLedgerReplicationDisabled(); - reScheduleEmit = Auditor.this.ledgerUnderreplicationManager.isAuditorTasksReScheduleEmit(); + reScheduleEmit = Auditor.this.ledgerUnderreplicationManager.isAuditorTasksRescheduleEmit(); if (reScheduleEmit) { - if (auditorCheckAllLedgersTaskFuture != null) { - LOG.info("ReScheduleAuditorTasks has been emitted so canceling the pending auditorCheckAllLedgersTask"); + if (auditorCheckAllLedgersTaskFuture != null + && !auditorCheckAllLedgersTaskFuture.isCancelled()) { + LOG.info("RescheduleAuditorTasks has been emitted " + + "so canceling the pending auditorCheckAllLedgersTask"); auditorCheckAllLedgersTaskFuture.cancel(false); } - if (auditorPlacementPolicyCheckTaskFuture != null) { - LOG.info("ReScheduleAuditorTasks has been emitted so canceling the pending auditorPlacementPolicyCheckTask"); + if (auditorPlacementPolicyCheckTaskFuture != null + && !auditorPlacementPolicyCheckTaskFuture.isCancelled()) { + LOG.info("RescheduleAuditorTasks has been emitted " + + "so canceling the pending auditorPlacementPolicyCheckTask"); auditorPlacementPolicyCheckTaskFuture.cancel(false); } - if (auditorReplicasCheckTaskFuture != null) { - LOG.info("ReScheduleAuditorTasks has been emitted so canceling the pending auditorReplicasCheckTask"); + if (auditorReplicasCheckTaskFuture != null + && !auditorReplicasCheckTaskFuture.isCancelled()) { + LOG.info("RescheduleAuditorTasks has been emitted " + + "so canceling the pending auditorReplicasCheckTask"); auditorReplicasCheckTaskFuture.cancel(false); } @@ -423,8 +429,9 @@ synchronized Future submitReScheduleAuditorTasksChangedEvent() { LOG.error("Exception while reading from ZK", ue); } finally { if (reScheduleEmit) { + auditorStats.getNumAuditorTasksRescheduleEmitted().inc(); try { - Auditor.this.ledgerUnderreplicationManager.finishedScheduleAuditorTasks(); + Auditor.this.ledgerUnderreplicationManager.finishedRescheduleAuditorTasks(); } catch (UnavailableException e) { if (e.getCause() != null && e.getCause() instanceof KeeperException.NoNodeException) { if (LOG.isDebugEnabled()) { @@ -457,7 +464,7 @@ public void start() { this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged( new UnderReplicatedLedgersChangedCb()); this.ledgerUnderreplicationManager - .notifyReScheduleAuditorTasksChanged(new ReScheduleAuditorTasksChangedCb()); + .notifyRescheduleAuditorTasksChanged(new RescheduleAuditorTasksChangedCb()); } catch (BKException bke) { LOG.error("Couldn't get bookie list, so exiting", bke); submitShutdownTask(); @@ -651,19 +658,19 @@ public void operationComplete(int rc, Void result) { } } - private class ReScheduleAuditorTasksChangedCb implements GenericCallback { + private class RescheduleAuditorTasksChangedCb implements GenericCallback { @Override public void operationComplete(int rc, Void result) { try { Auditor.this.ledgerUnderreplicationManager - .notifyReScheduleAuditorTasksChanged(ReScheduleAuditorTasksChangedCb.this); + .notifyRescheduleAuditorTasksChanged(RescheduleAuditorTasksChangedCb.this); } catch (ReplicationException.NonRecoverableReplicationException nre) { LOG.error("Non Recoverable Exception while reading from ZK", nre); submitShutdownTask(); } catch (UnavailableException ae) { - LOG.error("Exception while registering for a ReScheduleAuditorTasks notification", ae); + LOG.error("Exception while registering for a RescheduleAuditorTasks notification", ae); } - Auditor.this.submitReScheduleAuditorTasksChangedEvent(); + Auditor.this.submitRescheduleAuditorTasksChangedEvent(); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorStats.java index 286ec388b30..c1a560e2df3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorStats.java @@ -21,6 +21,7 @@ import static org.apache.bookkeeper.replication.ReplicationStats.AUDIT_BOOKIES_TIME; import static org.apache.bookkeeper.replication.ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME; import static org.apache.bookkeeper.replication.ReplicationStats.CHECK_ALL_LEDGERS_TIME; +import static org.apache.bookkeeper.replication.ReplicationStats.NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIES_PER_LEDGER; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED; @@ -177,6 +178,11 @@ public class AuditorStats { help = "the times of auditor check task skipped" ) private final Counter numSkippingCheckTaskTimes; + @StatsDoc( + name = NUM_BOOKIE_AUDITS_DELAYED, + help = "the number of auditor check tasks reschedule emitted" + ) + private final Counter numAuditorTasksRescheduleEmitted; public AuditorStats(StatsLogger statsLogger) { this.statsLogger = statsLogger; @@ -205,6 +211,7 @@ public AuditorStats(StatsLogger statsLogger) { .getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED); numReplicatedLedgers = this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS); numSkippingCheckTaskTimes = this.statsLogger.getCounter(NUM_SKIPPING_CHECK_TASK_TIMES); + numAuditorTasksRescheduleEmitted = this.statsLogger.getCounter(NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED); numLedgersNotAdheringToPlacementPolicy = new Gauge() { @Override public Integer getDefaultValue() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java index d6814edf094..9e81e1e26a6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java @@ -68,4 +68,5 @@ public interface ReplicationStats { String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS"; String NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED = "NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED"; String NUM_SKIPPING_CHECK_TASK_TIMES = "NUM_SKIPPING_CHECK_TASK_TIMES"; + String NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED = "NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED"; } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java index 18785a58b48..e4edad9d4da 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java @@ -47,7 +47,7 @@ public ForceAuditorChecksCmdTest() { */ @Test public void verifyAuditCTimeReset() throws Exception { - String[] argv = new String[] { "forceauditchecks", "-calc", "-ppc", "-rc" }; + String[] argv = new String[] { "forceauditchecks", "-calc", "-ppc", "-rc", "-f" }; long curTime = System.currentTimeMillis(); final ServerConfiguration conf = confByIndex(0); @@ -61,6 +61,9 @@ public void verifyAuditCTimeReset() throws Exception { urM.setCheckAllLedgersCTime(curTime); urM.setPlacementPolicyCheckCTime(curTime); urM.setReplicasCheckCTime(curTime); + + // check current not reschedule auditor task + Assert.assertFalse(urM.isAuditorTasksRescheduleEmit()); } catch (InterruptedException | ReplicationException e) { throw new UncheckedExecutionException(e); } @@ -86,6 +89,9 @@ public void verifyAuditCTimeReset() throws Exception { if (replicasCheckCTime > (curTime - (20 * 24 * 60 * 60 * 1000))) { Assert.fail("The replicasCheckCTime should have been reset to atleast 20 days old"); } + if (!urm.isAuditorTasksRescheduleEmit()) { + Assert.fail("The rescheduleTasks should have been emitted."); + } } catch (InterruptedException | ReplicationException e) { throw new UncheckedExecutionException(e); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java index 53e139b19e8..3d921a08ca8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java @@ -898,6 +898,59 @@ public void testDelayBookieAuditOfReplicasCheck() throws Exception { auditor.close(); } + @Test + public void testRescheduleAuditorCheckTasks() throws Exception { + for (AuditorElector e : auditorElectors.values()) { + e.shutdown(); + } + + LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); + LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager(); + + ServerConfiguration servConf = new ServerConfiguration(confByIndex(0)); + + TestStatsProvider statsProvider = new TestStatsProvider(); + TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE); + Counter numAuditorTasksRescheduleEmitted = + statsLogger.getCounter(ReplicationStats.NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED); + + urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE); + + AtomicBoolean canRun = new AtomicBoolean(true); + + final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, + false, statsLogger, canRun); + + auditor.start(); + + // verify before emit reschedule auditor tasks + assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 0, (long) numAuditorTasksRescheduleEmitted.get()); + assertFalse(urm.isAuditorTasksRescheduleEmit()); + + // emit reschedule auditor check tasks + urm.emitRescheduleAuditorTasks(); + + Awaitility.await().untilAsserted( () -> { + assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 1, + (long) numAuditorTasksRescheduleEmitted.get()); + }); + + // check finishedRescheduleAuditorTasks + assertFalse(urm.isAuditorTasksRescheduleEmit()); + + // Verify multiple emit reschedule auditor check tasks + urm.emitRescheduleAuditorTasks(); + Awaitility.await().untilAsserted( () -> { + assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 2, + (long) numAuditorTasksRescheduleEmitted.get()); + }); + + // check finishedRescheduleAuditorTasks + assertFalse(urm.isAuditorTasksRescheduleEmit()); + + auditor.close(); + } + static class TestAuditor extends Auditor { final AtomicReference latchRef = new AtomicReference(new CountDownLatch(1)); From e8606e3cd65e40f831ecf64f9c178c8e64a01176 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Thu, 25 May 2023 22:27:14 +0800 Subject: [PATCH 3/4] fix checkstyle --- .../bookkeeper/replication/AuditorPeriodicCheckTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java index 3d921a08ca8..ab8a7401fcc 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java @@ -930,7 +930,7 @@ public void testRescheduleAuditorCheckTasks() throws Exception { // emit reschedule auditor check tasks urm.emitRescheduleAuditorTasks(); - Awaitility.await().untilAsserted( () -> { + Awaitility.await().untilAsserted(() -> { assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 1, (long) numAuditorTasksRescheduleEmitted.get()); }); @@ -940,7 +940,7 @@ public void testRescheduleAuditorCheckTasks() throws Exception { // Verify multiple emit reschedule auditor check tasks urm.emitRescheduleAuditorTasks(); - Awaitility.await().untilAsserted( () -> { + Awaitility.await().untilAsserted(() -> { assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 2, (long) numAuditorTasksRescheduleEmitted.get()); }); From 2ca9abb6925caca1a2ed7ab9f86cdd9e00b2391b Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Mon, 10 Jul 2023 14:27:17 +0800 Subject: [PATCH 4/4] ok finished reschedule auditor tasks when Auditor starting --- .../bookkeeper/replication/Auditor.java | 29 +++++++++++++------ .../replication/AuditorPeriodicCheckTest.java | 13 +++++++-- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 29aeb6e883a..af0efc91ae7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -431,16 +431,10 @@ synchronized Future submitRescheduleAuditorTasksChangedEvent() { if (reScheduleEmit) { auditorStats.getNumAuditorTasksRescheduleEmitted().inc(); try { - Auditor.this.ledgerUnderreplicationManager.finishedRescheduleAuditorTasks(); + okFinishedRescheduleAuditorTasks(); } catch (UnavailableException e) { - if (e.getCause() != null && e.getCause() instanceof KeeperException.NoNodeException) { - if (LOG.isDebugEnabled()) { - LOG.debug("NoNode is ok while finished schedule tasks."); - } - } else { - LOG.error("Exception while finished schedule tasks ", e); - submitShutdownTask(); - } + LOG.error("Exception while finished schedule tasks ", e); + submitShutdownTask(); } } } @@ -463,6 +457,9 @@ public void start() { .notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb()); this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged( new UnderReplicatedLedgersChangedCb()); + // We always finished earlier triggered Reschedule auditor tasks when Auditor starting. + // Because we can directly schedule tasks when the Auditor starts. + okFinishedRescheduleAuditorTasks(); this.ledgerUnderreplicationManager .notifyRescheduleAuditorTasksChanged(new RescheduleAuditorTasksChangedCb()); } catch (BKException bke) { @@ -482,6 +479,20 @@ public void start() { } } + private void okFinishedRescheduleAuditorTasks() throws UnavailableException { + try { + Auditor.this.ledgerUnderreplicationManager.finishedRescheduleAuditorTasks(); + } catch (UnavailableException e) { + if (e.getCause() != null && e.getCause() instanceof KeeperException.NoNodeException) { + if (LOG.isDebugEnabled()) { + LOG.debug("NoNode is ok while finished schedule tasks."); + } + } else { + throw e; + } + } + } + protected void submitBookieCheckTask() { executor.submit(auditorBookieCheckTask); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java index ab8a7401fcc..8a55fce09ec 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java @@ -921,10 +921,19 @@ public void testRescheduleAuditorCheckTasks() throws Exception { final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false, statsLogger, canRun); + // before auditor start we emit an old reschedule tasks. + urm.emitRescheduleAuditorTasks(); + assertTrue(urm.isAuditorTasksRescheduleEmit()); auditor.start(); - // verify before emit reschedule auditor tasks - assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 0, (long) numAuditorTasksRescheduleEmitted.get()); + // verify before emit reschedule auditor tasks can be ok finished. + Awaitility.await().untilAsserted(() -> { + assertFalse(urm.isAuditorTasksRescheduleEmit()); + }); + + // verify before emit reschedule auditor tasks. + assertEquals("NUM_AUDITOR_TASKS_RESCHEDULE_EMITTED", 0, + (long) numAuditorTasksRescheduleEmitted.get()); assertFalse(urm.isAuditorTasksRescheduleEmit()); // emit reschedule auditor check tasks