From c18aa62d44072aed94e5326194d352604c27053c Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sat, 23 Nov 2024 22:55:33 +0800 Subject: [PATCH 01/23] [fix][broker]If there is a deadlock in the service, the probe should return a failure because the service may be unavailable --- .../pulsar/common/configuration/VipStatus.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 5e6a31b323f55..90342fe9c59d5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -27,6 +27,8 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.Response.Status; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.util.ThreadDumpUtil; /** * Web resource used by the VIP service to check to availability of the service instance. @@ -52,7 +54,15 @@ public String checkStatus() { if (statusFilePath != null) { File statusFile = new File(statusFilePath); if (isReady && statusFile.exists() && statusFile.isFile()) { - return "OK"; + // check deadlock + String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); + if (StringUtils.isBlank(diagnosticResult)) { + return "OK"; + } else { + log.warn("Deadlock detected, service may be unavailable, " + + "thread stack details are as follows: {}.", diagnosticResult); + throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); + } } } log.warn("Failed to access \"status.html\". The service is not ready"); From 90ff72032ff6aecfd2c12632e217c47c2509be78 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Mon, 25 Nov 2024 22:22:15 +0800 Subject: [PATCH 02/23] [fix][broker]If there is a deadlock in the service, the probe should return a failure because the service may be unavailable --- .../common/configuration/VipStatus.java | 37 +++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 90342fe9c59d5..70cc6485383bf 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -19,7 +19,12 @@ package org.apache.pulsar.common.configuration; import java.io.File; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Arrays; import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.servlet.ServletContext; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -27,7 +32,6 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.Response.Status; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.ThreadDumpUtil; /** @@ -40,6 +44,11 @@ public class VipStatus { public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe"; + // log a full thread dump when a deadlock is detected in status check once every 10 minutes + // to prevent excessive logging + private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L; + private static volatile long threadDumpLoggedTimestamp; + @Context protected ServletContext servletContext; @@ -55,13 +64,27 @@ public String checkStatus() { File statusFile = new File(statusFilePath); if (isReady && statusFile.exists() && statusFile.isFile()) { // check deadlock - String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); - if (StringUtils.isBlank(diagnosticResult)) { - return "OK"; - } else { - log.warn("Deadlock detected, service may be unavailable, " - + "thread stack details are as follows: {}.", diagnosticResult); + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + long[] threadIds = threadBean.findDeadlockedThreads(); + if (threadIds != null && threadIds.length > 0) { + ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, + false); + String threadNames = Arrays.stream(threadInfos) + .map(threadInfo -> threadInfo.getThreadName() + + "(tid=" + threadInfo.getThreadId() + ")") + .collect(Collectors.joining(", ")); + if (System.currentTimeMillis() - threadDumpLoggedTimestamp + > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { + String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); + log.error("Deadlock detected, service may be unavailable, " + + "thread stack details are as follows: {}.", diagnosticResult); + threadDumpLoggedTimestamp = System.currentTimeMillis(); + } else { + log.error("Deadlocked threads detected. {}", threadNames); + } throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); + } else { + return "OK"; } } } From 2b1815677453501e75d58dcd107147febe4542fb Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Tue, 26 Nov 2024 23:45:15 +0800 Subject: [PATCH 03/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable --- .../common/configuration/VipStatus.java | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 70cc6485383bf..91b0e225bfdc0 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -47,13 +47,33 @@ public class VipStatus { // log a full thread dump when a deadlock is detected in status check once every 10 minutes // to prevent excessive logging private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L; - private static volatile long threadDumpLoggedTimestamp; + private static volatile long lastCheckStatusTimestamp; + + // Since the status endpoint doesn't have authentication, it will be necessary to have a solution to prevent + // introducing a new DoS vulnerability where calling the status endpoint in a tight loop could introduce + // significant load to the system. One way would be to check that the deadlock check is executed only + // when there's more than 1 seconds from the previous check. + // If it's less than that, the previous result of the deadlock check would be reused. + private static final long DEADLOCK_DETECTED_INTERVAL = 1000L; + private static volatile boolean brokerIsHealthy = true; @Context protected ServletContext servletContext; @GET public String checkStatus() { + synchronized (VipStatus.class) { + if (System.currentTimeMillis() - lastCheckStatusTimestamp < DEADLOCK_DETECTED_INTERVAL) { + lastCheckStatusTimestamp = System.currentTimeMillis(); + if (brokerIsHealthy) { + return "OK"; + } else { + throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); + } + } + lastCheckStatusTimestamp = System.currentTimeMillis(); + } + String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); @SuppressWarnings("unchecked") Supplier isReadyProbe = (Supplier) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); @@ -73,21 +93,23 @@ public String checkStatus() { .map(threadInfo -> threadInfo.getThreadName() + "(tid=" + threadInfo.getThreadId() + ")") .collect(Collectors.joining(", ")); - if (System.currentTimeMillis() - threadDumpLoggedTimestamp + if (System.currentTimeMillis() - lastCheckStatusTimestamp > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); log.error("Deadlock detected, service may be unavailable, " + "thread stack details are as follows: {}.", diagnosticResult); - threadDumpLoggedTimestamp = System.currentTimeMillis(); } else { log.error("Deadlocked threads detected. {}", threadNames); } + brokerIsHealthy = false; throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); } else { + brokerIsHealthy = true; return "OK"; } } } + brokerIsHealthy = false; log.warn("Failed to access \"status.html\". The service is not ready"); throw new WebApplicationException(Status.NOT_FOUND); } From 70325c0c1d7548a79df5ee62458caadafc8f933f Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Wed, 27 Nov 2024 00:06:00 +0800 Subject: [PATCH 04/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable --- .../common/configuration/VipStatus.java | 65 ++++++++++--------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 91b0e225bfdc0..05deb19640c8b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -62,6 +62,7 @@ public class VipStatus { @GET public String checkStatus() { + // Locking classes to avoid deadlock detection in multi-thread concurrent requests. synchronized (VipStatus.class) { if (System.currentTimeMillis() - lastCheckStatusTimestamp < DEADLOCK_DETECTED_INTERVAL) { lastCheckStatusTimestamp = System.currentTimeMillis(); @@ -72,46 +73,46 @@ public String checkStatus() { } } lastCheckStatusTimestamp = System.currentTimeMillis(); - } - String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); - @SuppressWarnings("unchecked") - Supplier isReadyProbe = (Supplier) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); + String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); + @SuppressWarnings("unchecked") + Supplier isReadyProbe = (Supplier) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); - boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; + boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; - if (statusFilePath != null) { - File statusFile = new File(statusFilePath); - if (isReady && statusFile.exists() && statusFile.isFile()) { - // check deadlock - ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - long[] threadIds = threadBean.findDeadlockedThreads(); - if (threadIds != null && threadIds.length > 0) { - ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, - false); - String threadNames = Arrays.stream(threadInfos) - .map(threadInfo -> threadInfo.getThreadName() - + "(tid=" + threadInfo.getThreadId() + ")") - .collect(Collectors.joining(", ")); - if (System.currentTimeMillis() - lastCheckStatusTimestamp - > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { - String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); - log.error("Deadlock detected, service may be unavailable, " - + "thread stack details are as follows: {}.", diagnosticResult); + if (statusFilePath != null) { + File statusFile = new File(statusFilePath); + if (isReady && statusFile.exists() && statusFile.isFile()) { + // check deadlock + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + long[] threadIds = threadBean.findDeadlockedThreads(); + if (threadIds != null && threadIds.length > 0) { + ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, + false); + String threadNames = Arrays.stream(threadInfos) + .map(threadInfo -> threadInfo.getThreadName() + + "(tid=" + threadInfo.getThreadId() + ")") + .collect(Collectors.joining(", ")); + if (System.currentTimeMillis() - lastCheckStatusTimestamp + > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { + String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); + log.error("Deadlock detected, service may be unavailable, " + + "thread stack details are as follows: {}.", diagnosticResult); + } else { + log.error("Deadlocked threads detected. {}", threadNames); + } + brokerIsHealthy = false; + throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); } else { - log.error("Deadlocked threads detected. {}", threadNames); + brokerIsHealthy = true; + return "OK"; } - brokerIsHealthy = false; - throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); - } else { - brokerIsHealthy = true; - return "OK"; } } + brokerIsHealthy = false; + log.warn("Failed to access \"status.html\". The service is not ready"); + throw new WebApplicationException(Status.NOT_FOUND); } - brokerIsHealthy = false; - log.warn("Failed to access \"status.html\". The service is not ready"); - throw new WebApplicationException(Status.NOT_FOUND); } } From c230aa5fd2b92d75df89c85dc94268245237be04 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Wed, 27 Nov 2024 00:14:53 +0800 Subject: [PATCH 05/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable --- .../apache/pulsar/common/configuration/VipStatus.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 05deb19640c8b..4ccc935014ce5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -55,7 +55,7 @@ public class VipStatus { // when there's more than 1 seconds from the previous check. // If it's less than that, the previous result of the deadlock check would be reused. private static final long DEADLOCK_DETECTED_INTERVAL = 1000L; - private static volatile boolean brokerIsHealthy = true; + private static volatile boolean lastCheckStatusResult = true; @Context protected ServletContext servletContext; @@ -66,7 +66,7 @@ public String checkStatus() { synchronized (VipStatus.class) { if (System.currentTimeMillis() - lastCheckStatusTimestamp < DEADLOCK_DETECTED_INTERVAL) { lastCheckStatusTimestamp = System.currentTimeMillis(); - if (brokerIsHealthy) { + if (lastCheckStatusResult) { return "OK"; } else { throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); @@ -101,15 +101,15 @@ public String checkStatus() { } else { log.error("Deadlocked threads detected. {}", threadNames); } - brokerIsHealthy = false; + lastCheckStatusResult = false; throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); } else { - brokerIsHealthy = true; + lastCheckStatusResult = true; return "OK"; } } } - brokerIsHealthy = false; + lastCheckStatusResult = false; log.warn("Failed to access \"status.html\". The service is not ready"); throw new WebApplicationException(Status.NOT_FOUND); } From 2c8819bdc0433997b330ab6db6128b518425477d Mon Sep 17 00:00:00 2001 From: yyj <1012293987@qq.com> Date: Wed, 27 Nov 2024 14:17:12 +0800 Subject: [PATCH 06/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. --- .../pulsar/common/configuration/VipStatus.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 4ccc935014ce5..8342fe2439ae4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -49,13 +49,9 @@ public class VipStatus { private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L; private static volatile long lastCheckStatusTimestamp; - // Since the status endpoint doesn't have authentication, it will be necessary to have a solution to prevent - // introducing a new DoS vulnerability where calling the status endpoint in a tight loop could introduce - // significant load to the system. One way would be to check that the deadlock check is executed only - // when there's more than 1 seconds from the previous check. - // If it's less than that, the previous result of the deadlock check would be reused. - private static final long DEADLOCK_DETECTED_INTERVAL = 1000L; - private static volatile boolean lastCheckStatusResult = true; + // Rate limit status checks to every 500ms to prevent DoS + private static final long CHECK_STATUS_INTERVAL = 500L; + private static volatile boolean lastCheckStatusResult; @Context protected ServletContext servletContext; @@ -64,7 +60,7 @@ public class VipStatus { public String checkStatus() { // Locking classes to avoid deadlock detection in multi-thread concurrent requests. synchronized (VipStatus.class) { - if (System.currentTimeMillis() - lastCheckStatusTimestamp < DEADLOCK_DETECTED_INTERVAL) { + if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { lastCheckStatusTimestamp = System.currentTimeMillis(); if (lastCheckStatusResult) { return "OK"; From f17da50996baa384f9dddd4bde230b5fe049a2b9 Mon Sep 17 00:00:00 2001 From: yyj <1012293987@qq.com> Date: Wed, 27 Nov 2024 17:10:15 +0800 Subject: [PATCH 07/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. --- .../java/org/apache/pulsar/common/configuration/VipStatus.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 8342fe2439ae4..e3330772cab00 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -61,7 +61,6 @@ public String checkStatus() { // Locking classes to avoid deadlock detection in multi-thread concurrent requests. synchronized (VipStatus.class) { if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { - lastCheckStatusTimestamp = System.currentTimeMillis(); if (lastCheckStatusResult) { return "OK"; } else { From dc5304010c389d60c574faa1204c2a51a507e769 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Wed, 4 Dec 2024 21:42:08 +0800 Subject: [PATCH 08/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Add lastPrintThreadDumpTimestamp field to control the interval time for printing complete thread stack information. --- .../org/apache/pulsar/common/configuration/VipStatus.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index e3330772cab00..21158359fc086 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -48,6 +48,7 @@ public class VipStatus { // to prevent excessive logging private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L; private static volatile long lastCheckStatusTimestamp; + private static volatile long lastPrintThreadDumpTimestamp; // Rate limit status checks to every 500ms to prevent DoS private static final long CHECK_STATUS_INTERVAL = 500L; @@ -88,11 +89,12 @@ public String checkStatus() { .map(threadInfo -> threadInfo.getThreadName() + "(tid=" + threadInfo.getThreadId() + ")") .collect(Collectors.joining(", ")); - if (System.currentTimeMillis() - lastCheckStatusTimestamp + if (System.currentTimeMillis() - lastPrintThreadDumpTimestamp > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); log.error("Deadlock detected, service may be unavailable, " + "thread stack details are as follows: {}.", diagnosticResult); + lastPrintThreadDumpTimestamp = System.currentTimeMillis(); } else { log.error("Deadlocked threads detected. {}", threadNames); } From 532e69fecf646ca350c59f258bc11740f0e94e5f Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Wed, 4 Dec 2024 22:10:46 +0800 Subject: [PATCH 09/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Add unit testing code. --- .../common/configuration/VipStatusTest.java | 503 ++++++++++++++++++ 1 file changed, 503 insertions(+) create mode 100644 pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java new file mode 100644 index 0000000000000..d2d016614bbb5 --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.configuration; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.EventListener; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.servlet.Filter; +import javax.servlet.FilterRegistration; +import javax.servlet.RequestDispatcher; +import javax.servlet.Servlet; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.ServletRegistration; +import javax.servlet.SessionCookieConfig; +import javax.servlet.SessionTrackingMode; +import javax.servlet.descriptor.JspConfigDescriptor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.util.ThreadDumpUtil; +import org.eclipse.jetty.util.AttributesMap; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +@Slf4j +public class VipStatusTest { + + public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; + public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe"; + + // log a full thread dump when a deadlock is detected in status check once every 10 minutes + // to prevent excessive logging + private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 10000L; + private static volatile long lastCheckStatusTimestamp; + private static volatile long lastPrintThreadDumpTimestamp; + + // Rate limit status checks to every 500ms to prevent DoS + private static final long CHECK_STATUS_INTERVAL = 500L; + private static volatile boolean lastCheckStatusResult; + private static String CHECK_RESULT_OK = "OK"; + private static String CHECK_RESULT_NOT_OK = "NOT_OK"; + private int checkThrottlingCount; + private int checkNoThrottlingCount; + private int printDeadlockThreadDumpCount; + + private MockServletContext mockServletContext = new MockServletContext(); + @BeforeTest + public void setup() throws IOException { + String statusFilePath = "/tmp/status.html"; + File file = new File(statusFilePath); + file.createNewFile(); + mockServletContext.setAttribute(ATTRIBUTE_STATUS_FILE_PATH, statusFilePath); + Supplier isReadyProbe = () -> true; + mockServletContext.setAttribute(ATTRIBUTE_IS_READY_PROBE, isReadyProbe); + } + + @Test + public void testVipStatusCheckStatus() throws InterruptedException { + // No deadlocks + testVipStatusCheckStatusWithoutDeadlock(); + // There is a deadlock + testVipStatusCheckStatusWithDeadlock(); + } + + @AfterTest + public void release() throws IOException { + String statusFilePath = "/tmp/status.html"; + File file = new File(statusFilePath); + file.deleteOnExit(); + } + + public void testVipStatusCheckStatusWithoutDeadlock() throws InterruptedException { + //1.No DOS attacks + for (int i = 0; i < 10; i++) { + assertEquals(checkStatus(), CHECK_RESULT_OK); + Thread.sleep(CHECK_STATUS_INTERVAL + 10); + } + assertTrue(checkNoThrottlingCount == 10); + assertTrue(checkThrottlingCount == 0); + checkNoThrottlingCount = 0; + checkThrottlingCount = 0; + + //2.There are DOS attacks + for (int i = 0; i < 10; i++) { + assertEquals(checkStatus(), CHECK_RESULT_OK); + } + assertTrue(checkNoThrottlingCount >= 1); + assertTrue(checkThrottlingCount >= 1); + checkNoThrottlingCount = 0; + checkThrottlingCount = 0; + } + + public void testVipStatusCheckStatusWithDeadlock() throws InterruptedException { + MockDeadlock.startDeadlock(); + //1.No DOS attacks + for (int i = 0; i < 10; i++) { + assertEquals(checkStatus(), CHECK_RESULT_NOT_OK); + Thread.sleep(CHECK_STATUS_INTERVAL + 10); + } + assertTrue(checkNoThrottlingCount == 10); + assertTrue(checkThrottlingCount == 0); + checkNoThrottlingCount = 0; + checkThrottlingCount = 0; + + //2.There are DOS attacks + for (int i = 0; i < 10; i++) { + assertEquals(checkStatus(), CHECK_RESULT_NOT_OK); + } + assertTrue(checkNoThrottlingCount >= 1); + assertTrue(checkThrottlingCount >= 1); + checkNoThrottlingCount = 0; + checkThrottlingCount = 0; + + //3.print deadlock info + for (int i = 0; i < 10; i++) { + assertEquals(checkStatus(), CHECK_RESULT_NOT_OK); + Thread.sleep(LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED / 2); + } + assertTrue(printDeadlockThreadDumpCount >= 5); + } + + public String checkStatus() { + // Locking classes to avoid deadlock detection in multi-thread concurrent requests. + synchronized (VipStatus.class) { + if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { + checkThrottlingCount ++; + if (lastCheckStatusResult) { + return CHECK_RESULT_OK; + } else { + return CHECK_RESULT_NOT_OK; + } + } + lastCheckStatusTimestamp = System.currentTimeMillis(); + + String statusFilePath = (String) mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); + @SuppressWarnings("unchecked") + Supplier isReadyProbe = (Supplier) mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); + + boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; + + if (statusFilePath != null) { + File statusFile = new File(statusFilePath); + if (isReady && statusFile.exists() && statusFile.isFile()) { + // check deadlock + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + long[] threadIds = threadBean.findDeadlockedThreads(); + if (threadIds != null && threadIds.length > 0) { + ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, + false); + String threadNames = Arrays.stream(threadInfos) + .map(threadInfo -> threadInfo.getThreadName() + + "(tid=" + threadInfo.getThreadId() + ")") + .collect(Collectors.joining(", ")); + if (System.currentTimeMillis() - lastPrintThreadDumpTimestamp + > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { + String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); + log.error("Deadlock detected, service may be unavailable, " + + "thread stack details are as follows: {}.", diagnosticResult); + lastPrintThreadDumpTimestamp = System.currentTimeMillis(); + printDeadlockThreadDumpCount ++; + } else { + log.error("Deadlocked threads detected. {}", threadNames); + } + lastCheckStatusResult = false; + checkNoThrottlingCount ++; + return CHECK_RESULT_NOT_OK; + } else { + checkNoThrottlingCount ++; + lastCheckStatusResult = true; + return CHECK_RESULT_OK; + } + } + } + checkNoThrottlingCount ++; + lastCheckStatusResult = false; + log.warn("Failed to access \"status.html\". The service is not ready"); + return CHECK_RESULT_OK; + } + } + + public class MockServletContext extends AttributesMap implements ServletContext { + + @Override + public String getContextPath() { + return null; + } + + @Override + public ServletContext getContext(String s) { + return null; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public int getEffectiveMajorVersion() { + return 0; + } + + @Override + public int getEffectiveMinorVersion() { + return 0; + } + + @Override + public String getMimeType(String s) { + return null; + } + + @Override + public Set getResourcePaths(String s) { + return null; + } + + @Override + public URL getResource(String s) throws MalformedURLException { + return null; + } + + @Override + public InputStream getResourceAsStream(String s) { + return null; + } + + @Override + public RequestDispatcher getRequestDispatcher(String s) { + return null; + } + + @Override + public RequestDispatcher getNamedDispatcher(String s) { + return null; + } + + @Override + public Servlet getServlet(String s) throws ServletException { + return null; + } + + @Override + public Enumeration getServlets() { + return null; + } + + @Override + public Enumeration getServletNames() { + return null; + } + + @Override + public void log(String s) { + + } + + @Override + public void log(Exception e, String s) { + + } + + @Override + public void log(String s, Throwable throwable) { + + } + + @Override + public String getRealPath(String s) { + return null; + } + + @Override + public String getServerInfo() { + return null; + } + + @Override + public String getInitParameter(String s) { + return null; + } + + @Override + public Enumeration getInitParameterNames() { + return null; + } + + @Override + public boolean setInitParameter(String s, String s1) { + return false; + } + + @Override + public String getServletContextName() { + return null; + } + + @Override + public ServletRegistration.Dynamic addServlet(String s, String s1) { + return null; + } + + @Override + public ServletRegistration.Dynamic addServlet(String s, Servlet servlet) { + return null; + } + + @Override + public ServletRegistration.Dynamic addServlet(String s, Class aClass) { + return null; + } + + @Override + public T createServlet(Class aClass) throws ServletException { + return null; + } + + @Override + public ServletRegistration getServletRegistration(String s) { + return null; + } + + @Override + public Map getServletRegistrations() { + return null; + } + + @Override + public FilterRegistration.Dynamic addFilter(String s, String s1) { + return null; + } + + @Override + public FilterRegistration.Dynamic addFilter(String s, Filter filter) { + return null; + } + + @Override + public FilterRegistration.Dynamic addFilter(String s, Class aClass) { + return null; + } + + @Override + public T createFilter(Class aClass) throws ServletException { + return null; + } + + @Override + public FilterRegistration getFilterRegistration(String s) { + return null; + } + + @Override + public Map getFilterRegistrations() { + return null; + } + + @Override + public SessionCookieConfig getSessionCookieConfig() { + return null; + } + + @Override + public void setSessionTrackingModes(Set set) { + + } + + @Override + public Set getDefaultSessionTrackingModes() { + return null; + } + + @Override + public Set getEffectiveSessionTrackingModes() { + return null; + } + + @Override + public void addListener(String s) { + + } + + @Override + public void addListener(T t) { + + } + + @Override + public void addListener(Class aClass) { + + } + + @Override + public T createListener(Class aClass) throws ServletException { + return null; + } + + @Override + public JspConfigDescriptor getJspConfigDescriptor() { + return null; + } + + @Override + public ClassLoader getClassLoader() { + return null; + } + + @Override + public void declareRoles(String... strings) { + + } + + @Override + public String getVirtualServerName() { + return null; + } + } + + public class MockDeadlock { + private static Object lockA = new Object(); + private static Object lockB = new Object(); + private static Thread t1 = new Thread(new ThreadOne()); + private static Thread t2 = new Thread(new ThreadTwo()); + + @SneakyThrows + public static void startDeadlock() { + // 启动两个线程来模拟死锁 + t1.start(); + t2.start(); + Thread.sleep(CHECK_STATUS_INTERVAL); + } + + private static class ThreadOne implements Runnable { + @Override + public void run() { + synchronized (lockA) { + System.out.println("ThreadOne acquired lockA"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + synchronized (lockB) { + System.out.println("ThreadOne acquired lockB"); + } + } + } + } + + private static class ThreadTwo implements Runnable { + @Override + public void run() { + synchronized (lockB) { + System.out.println("ThreadTwo acquired lockB"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + synchronized (lockA) { + System.out.println("ThreadTwo acquired lockA"); + } + } + } + } + } +} From e1a1dd5cc9e44ea40b74a15e66bbe13220b64bd2 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Wed, 4 Dec 2024 22:59:17 +0800 Subject: [PATCH 10/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Add unit testing code. --- .../org/apache/pulsar/common/configuration/VipStatusTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index d2d016614bbb5..1cafbd26141af 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -460,7 +460,6 @@ public class MockDeadlock { @SneakyThrows public static void startDeadlock() { - // 启动两个线程来模拟死锁 t1.start(); t2.start(); Thread.sleep(CHECK_STATUS_INTERVAL); From 69aba3edf3859f2926916e4b8535f5c914c52763 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sun, 8 Dec 2024 11:58:31 +0800 Subject: [PATCH 11/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Add unit testing code. --- .../common/configuration/VipStatus.java | 36 +- .../configuration/MockServletContext.java | 255 +++++++++++ .../common/configuration/VipStatusTest.java | 403 +----------------- 3 files changed, 300 insertions(+), 394 deletions(-) create mode 100644 pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 21158359fc086..203d03e974062 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.time.Clock; import java.util.Arrays; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -31,6 +32,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response.Status; +import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.util.ThreadDumpUtil; @@ -47,33 +49,47 @@ public class VipStatus { // log a full thread dump when a deadlock is detected in status check once every 10 minutes // to prevent excessive logging private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L; - private static volatile long lastCheckStatusTimestamp; - private static volatile long lastPrintThreadDumpTimestamp; - // Rate limit status checks to every 500ms to prevent DoS private static final long CHECK_STATUS_INTERVAL = 500L; + + private static volatile long lastCheckStatusTimestamp; + private static volatile long lastPrintThreadDumpTimestamp; private static volatile boolean lastCheckStatusResult; + private long printThreadDumpIntervalMs; + private Clock clock; + @Context protected ServletContext servletContext; + public VipStatus() { + this.clock = Clock.systemUTC(); + this.printThreadDumpIntervalMs = LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED; + } + + @VisibleForTesting + public VipStatus(ServletContext servletContext, long printThreadDumpIntervalMs) { + this.servletContext = servletContext; + this.printThreadDumpIntervalMs = printThreadDumpIntervalMs; + this.clock = Clock.systemUTC(); + } + @GET public String checkStatus() { // Locking classes to avoid deadlock detection in multi-thread concurrent requests. synchronized (VipStatus.class) { - if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { + if (clock.millis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { if (lastCheckStatusResult) { return "OK"; } else { throw new WebApplicationException(Status.SERVICE_UNAVAILABLE); } } - lastCheckStatusTimestamp = System.currentTimeMillis(); + lastCheckStatusTimestamp = clock.millis(); String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); @SuppressWarnings("unchecked") Supplier isReadyProbe = (Supplier) servletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); - boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; if (statusFilePath != null) { @@ -89,12 +105,11 @@ public String checkStatus() { .map(threadInfo -> threadInfo.getThreadName() + "(tid=" + threadInfo.getThreadId() + ")") .collect(Collectors.joining(", ")); - if (System.currentTimeMillis() - lastPrintThreadDumpTimestamp - > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { + if (clock.millis() - lastPrintThreadDumpTimestamp > printThreadDumpIntervalMs) { String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); log.error("Deadlock detected, service may be unavailable, " + "thread stack details are as follows: {}.", diagnosticResult); - lastPrintThreadDumpTimestamp = System.currentTimeMillis(); + lastPrintThreadDumpTimestamp = clock.millis(); } else { log.error("Deadlocked threads detected. {}", threadNames); } @@ -111,5 +126,4 @@ public String checkStatus() { throw new WebApplicationException(Status.NOT_FOUND); } } - -} +} \ No newline at end of file diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java new file mode 100644 index 0000000000000..93df67c93d820 --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java @@ -0,0 +1,255 @@ +package org.apache.pulsar.common.configuration; + +import org.eclipse.jetty.util.AttributesMap; + +import javax.servlet.*; +import javax.servlet.descriptor.JspConfigDescriptor; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Enumeration; +import java.util.EventListener; +import java.util.Map; +import java.util.Set; + +public class MockServletContext extends AttributesMap implements ServletContext { + @Override + public String getContextPath() { + return null; + } + + @Override + public ServletContext getContext(String s) { + return null; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public int getEffectiveMajorVersion() { + return 0; + } + + @Override + public int getEffectiveMinorVersion() { + return 0; + } + + @Override + public String getMimeType(String s) { + return null; + } + + @Override + public Set getResourcePaths(String s) { + return null; + } + + @Override + public URL getResource(String s) throws MalformedURLException { + return null; + } + + @Override + public InputStream getResourceAsStream(String s) { + return null; + } + + @Override + public RequestDispatcher getRequestDispatcher(String s) { + return null; + } + + @Override + public RequestDispatcher getNamedDispatcher(String s) { + return null; + } + + @Override + public Servlet getServlet(String s) throws ServletException { + return null; + } + + @Override + public Enumeration getServlets() { + return null; + } + + @Override + public Enumeration getServletNames() { + return null; + } + + @Override + public void log(String s) { + + } + + @Override + public void log(Exception e, String s) { + + } + + @Override + public void log(String s, Throwable throwable) { + + } + + @Override + public String getRealPath(String s) { + return null; + } + + @Override + public String getServerInfo() { + return null; + } + + @Override + public String getInitParameter(String s) { + return null; + } + + @Override + public Enumeration getInitParameterNames() { + return null; + } + + @Override + public boolean setInitParameter(String s, String s1) { + return false; + } + + @Override + public String getServletContextName() { + return null; + } + + @Override + public ServletRegistration.Dynamic addServlet(String s, String s1) { + return null; + } + + @Override + public ServletRegistration.Dynamic addServlet(String s, Servlet servlet) { + return null; + } + + @Override + public ServletRegistration.Dynamic addServlet(String s, Class aClass) { + return null; + } + + @Override + public T createServlet(Class aClass) throws ServletException { + return null; + } + + @Override + public ServletRegistration getServletRegistration(String s) { + return null; + } + + @Override + public Map getServletRegistrations() { + return null; + } + + @Override + public FilterRegistration.Dynamic addFilter(String s, String s1) { + return null; + } + + @Override + public FilterRegistration.Dynamic addFilter(String s, Filter filter) { + return null; + } + + @Override + public FilterRegistration.Dynamic addFilter(String s, Class aClass) { + return null; + } + + @Override + public T createFilter(Class aClass) throws ServletException { + return null; + } + + @Override + public FilterRegistration getFilterRegistration(String s) { + return null; + } + + @Override + public Map getFilterRegistrations() { + return null; + } + + @Override + public SessionCookieConfig getSessionCookieConfig() { + return null; + } + + @Override + public void setSessionTrackingModes(Set set) { + + } + + @Override + public Set getDefaultSessionTrackingModes() { + return null; + } + + @Override + public Set getEffectiveSessionTrackingModes() { + return null; + } + + @Override + public void addListener(String s) { + + } + + @Override + public void addListener(T t) { + + } + + @Override + public void addListener(Class aClass) { + + } + + @Override + public T createListener(Class aClass) throws ServletException { + return null; + } + + @Override + public JspConfigDescriptor getJspConfigDescriptor() { + return null; + } + + @Override + public ClassLoader getClassLoader() { + return null; + } + + @Override + public void declareRoles(String... strings) { + + } + + @Override + public String getVirtualServerName() { + return null; + } +} diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index 1cafbd26141af..c47336a794511 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -19,36 +19,13 @@ package org.apache.pulsar.common.configuration; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Arrays; -import java.util.Enumeration; -import java.util.EventListener; -import java.util.Map; -import java.util.Set; import java.util.function.Supplier; -import java.util.stream.Collectors; -import javax.servlet.Filter; -import javax.servlet.FilterRegistration; -import javax.servlet.RequestDispatcher; -import javax.servlet.Servlet; -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.ServletRegistration; -import javax.servlet.SessionCookieConfig; -import javax.servlet.SessionTrackingMode; -import javax.servlet.descriptor.JspConfigDescriptor; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.common.util.ThreadDumpUtil; -import org.eclipse.jetty.util.AttributesMap; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -58,35 +35,27 @@ public class VipStatusTest { public static final String ATTRIBUTE_STATUS_FILE_PATH = "statusFilePath"; public static final String ATTRIBUTE_IS_READY_PROBE = "isReadyProbe"; - - // log a full thread dump when a deadlock is detected in status check once every 10 minutes - // to prevent excessive logging private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 10000L; - private static volatile long lastCheckStatusTimestamp; - private static volatile long lastPrintThreadDumpTimestamp; - // Rate limit status checks to every 500ms to prevent DoS private static final long CHECK_STATUS_INTERVAL = 500L; - private static volatile boolean lastCheckStatusResult; - private static String CHECK_RESULT_OK = "OK"; - private static String CHECK_RESULT_NOT_OK = "NOT_OK"; - private int checkThrottlingCount; - private int checkNoThrottlingCount; - private int printDeadlockThreadDumpCount; private MockServletContext mockServletContext = new MockServletContext(); + private VipStatus vipStatus; + @BeforeTest public void setup() throws IOException { String statusFilePath = "/tmp/status.html"; File file = new File(statusFilePath); file.createNewFile(); + mockServletContext.setAttribute(ATTRIBUTE_STATUS_FILE_PATH, statusFilePath); Supplier isReadyProbe = () -> true; mockServletContext.setAttribute(ATTRIBUTE_IS_READY_PROBE, isReadyProbe); + vipStatus = new VipStatus(mockServletContext, LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED); } @Test - public void testVipStatusCheckStatus() throws InterruptedException { + public void testVipStatusCheckStatus() { // No deadlocks testVipStatusCheckStatusWithoutDeadlock(); // There is a deadlock @@ -100,355 +69,23 @@ public void release() throws IOException { file.deleteOnExit(); } - public void testVipStatusCheckStatusWithoutDeadlock() throws InterruptedException { - //1.No DOS attacks - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_OK); - Thread.sleep(CHECK_STATUS_INTERVAL + 10); - } - assertTrue(checkNoThrottlingCount == 10); - assertTrue(checkThrottlingCount == 0); - checkNoThrottlingCount = 0; - checkThrottlingCount = 0; - - //2.There are DOS attacks - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_OK); - } - assertTrue(checkNoThrottlingCount >= 1); - assertTrue(checkThrottlingCount >= 1); - checkNoThrottlingCount = 0; - checkThrottlingCount = 0; + public void testVipStatusCheckStatusWithoutDeadlock() { + assertEquals(vipStatus.checkStatus(), "OK"); } - public void testVipStatusCheckStatusWithDeadlock() throws InterruptedException { + public void testVipStatusCheckStatusWithDeadlock() { MockDeadlock.startDeadlock(); - //1.No DOS attacks - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_NOT_OK); - Thread.sleep(CHECK_STATUS_INTERVAL + 10); - } - assertTrue(checkNoThrottlingCount == 10); - assertTrue(checkThrottlingCount == 0); - checkNoThrottlingCount = 0; - checkThrottlingCount = 0; - - //2.There are DOS attacks - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_NOT_OK); - } - assertTrue(checkNoThrottlingCount >= 1); - assertTrue(checkThrottlingCount >= 1); - checkNoThrottlingCount = 0; - checkThrottlingCount = 0; - - //3.print deadlock info - for (int i = 0; i < 10; i++) { - assertEquals(checkStatus(), CHECK_RESULT_NOT_OK); - Thread.sleep(LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED / 2); - } - assertTrue(printDeadlockThreadDumpCount >= 5); - } - - public String checkStatus() { - // Locking classes to avoid deadlock detection in multi-thread concurrent requests. - synchronized (VipStatus.class) { - if (System.currentTimeMillis() - lastCheckStatusTimestamp < CHECK_STATUS_INTERVAL) { - checkThrottlingCount ++; - if (lastCheckStatusResult) { - return CHECK_RESULT_OK; - } else { - return CHECK_RESULT_NOT_OK; - } - } - lastCheckStatusTimestamp = System.currentTimeMillis(); - - String statusFilePath = (String) mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH); - @SuppressWarnings("unchecked") - Supplier isReadyProbe = (Supplier) mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE); - - boolean isReady = isReadyProbe != null ? isReadyProbe.get() : true; - - if (statusFilePath != null) { - File statusFile = new File(statusFilePath); - if (isReady && statusFile.exists() && statusFile.isFile()) { - // check deadlock - ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - long[] threadIds = threadBean.findDeadlockedThreads(); - if (threadIds != null && threadIds.length > 0) { - ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, - false); - String threadNames = Arrays.stream(threadInfos) - .map(threadInfo -> threadInfo.getThreadName() - + "(tid=" + threadInfo.getThreadId() + ")") - .collect(Collectors.joining(", ")); - if (System.currentTimeMillis() - lastPrintThreadDumpTimestamp - > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) { - String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); - log.error("Deadlock detected, service may be unavailable, " - + "thread stack details are as follows: {}.", diagnosticResult); - lastPrintThreadDumpTimestamp = System.currentTimeMillis(); - printDeadlockThreadDumpCount ++; - } else { - log.error("Deadlocked threads detected. {}", threadNames); - } - lastCheckStatusResult = false; - checkNoThrottlingCount ++; - return CHECK_RESULT_NOT_OK; - } else { - checkNoThrottlingCount ++; - lastCheckStatusResult = true; - return CHECK_RESULT_OK; - } - } - } - checkNoThrottlingCount ++; - lastCheckStatusResult = false; - log.warn("Failed to access \"status.html\". The service is not ready"); - return CHECK_RESULT_OK; - } - } - - public class MockServletContext extends AttributesMap implements ServletContext { - - @Override - public String getContextPath() { - return null; - } - - @Override - public ServletContext getContext(String s) { - return null; - } - - @Override - public int getMajorVersion() { - return 0; - } - - @Override - public int getMinorVersion() { - return 0; - } - - @Override - public int getEffectiveMajorVersion() { - return 0; - } - - @Override - public int getEffectiveMinorVersion() { - return 0; - } - - @Override - public String getMimeType(String s) { - return null; - } - - @Override - public Set getResourcePaths(String s) { - return null; - } - - @Override - public URL getResource(String s) throws MalformedURLException { - return null; - } - - @Override - public InputStream getResourceAsStream(String s) { - return null; - } - - @Override - public RequestDispatcher getRequestDispatcher(String s) { - return null; - } - - @Override - public RequestDispatcher getNamedDispatcher(String s) { - return null; - } - - @Override - public Servlet getServlet(String s) throws ServletException { - return null; - } - - @Override - public Enumeration getServlets() { - return null; - } - - @Override - public Enumeration getServletNames() { - return null; - } - - @Override - public void log(String s) { - - } - - @Override - public void log(Exception e, String s) { - - } - - @Override - public void log(String s, Throwable throwable) { - - } - - @Override - public String getRealPath(String s) { - return null; - } - - @Override - public String getServerInfo() { - return null; - } - - @Override - public String getInitParameter(String s) { - return null; - } - - @Override - public Enumeration getInitParameterNames() { - return null; - } - - @Override - public boolean setInitParameter(String s, String s1) { - return false; - } - - @Override - public String getServletContextName() { - return null; - } - - @Override - public ServletRegistration.Dynamic addServlet(String s, String s1) { - return null; - } - - @Override - public ServletRegistration.Dynamic addServlet(String s, Servlet servlet) { - return null; - } - - @Override - public ServletRegistration.Dynamic addServlet(String s, Class aClass) { - return null; - } - - @Override - public T createServlet(Class aClass) throws ServletException { - return null; - } - - @Override - public ServletRegistration getServletRegistration(String s) { - return null; - } - - @Override - public Map getServletRegistrations() { - return null; - } - - @Override - public FilterRegistration.Dynamic addFilter(String s, String s1) { - return null; - } - - @Override - public FilterRegistration.Dynamic addFilter(String s, Filter filter) { - return null; - } - - @Override - public FilterRegistration.Dynamic addFilter(String s, Class aClass) { - return null; - } - - @Override - public T createFilter(Class aClass) throws ServletException { - return null; - } - - @Override - public FilterRegistration getFilterRegistration(String s) { - return null; - } - - @Override - public Map getFilterRegistrations() { - return null; - } - - @Override - public SessionCookieConfig getSessionCookieConfig() { - return null; - } - - @Override - public void setSessionTrackingModes(Set set) { - - } - - @Override - public Set getDefaultSessionTrackingModes() { - return null; - } - - @Override - public Set getEffectiveSessionTrackingModes() { - return null; - } - - @Override - public void addListener(String s) { - - } - - @Override - public void addListener(T t) { - - } - - @Override - public void addListener(Class aClass) { - - } - - @Override - public T createListener(Class aClass) throws ServletException { - return null; - } - - @Override - public JspConfigDescriptor getJspConfigDescriptor() { - return null; - } - - @Override - public ClassLoader getClassLoader() { - return null; - } - - @Override - public void declareRoles(String... strings) { - + boolean asExpected = true; + try { + vipStatus.checkStatus(); + asExpected = false; + System.out.println("Simulated deadlock, no deadlock detected, not as expected."); + } catch (Exception wae) { + System.out.println("Simulated deadlock and detected it, as expected."); } - @Override - public String getVirtualServerName() { - return null; + if (!asExpected) { + throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE); } } @@ -499,4 +136,4 @@ public void run() { } } } -} +} \ No newline at end of file From 5970dcc5d21154ad35214bf1e3452effe67730ca Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sun, 22 Dec 2024 11:25:41 +0800 Subject: [PATCH 12/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Add unit testing code, shutdown deadlock thread. --- .../common/configuration/VipStatusTest.java | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index c47336a794511..4ed212bbae123 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -21,6 +21,10 @@ import static org.testng.Assert.assertEquals; import java.io.File; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; @@ -82,6 +86,8 @@ public void testVipStatusCheckStatusWithDeadlock() { System.out.println("Simulated deadlock, no deadlock detected, not as expected."); } catch (Exception wae) { System.out.println("Simulated deadlock and detected it, as expected."); + } finally { + MockDeadlock.executorService.shutdownNow(); } if (!asExpected) { @@ -90,31 +96,32 @@ public void testVipStatusCheckStatusWithDeadlock() { } public class MockDeadlock { - private static Object lockA = new Object(); - private static Object lockB = new Object(); - private static Thread t1 = new Thread(new ThreadOne()); - private static Thread t2 = new Thread(new ThreadTwo()); + private static ExecutorService executorService = Executors.newCachedThreadPool(); + private static ReentrantLock lockA = new ReentrantLock(); + private static ReentrantLock lockB = new ReentrantLock(); @SneakyThrows public static void startDeadlock() { - t1.start(); - t2.start(); + executorService.execute(new ThreadOne()); + executorService.execute(new ThreadTwo()); Thread.sleep(CHECK_STATUS_INTERVAL); } private static class ThreadOne implements Runnable { @Override public void run() { - synchronized (lockA) { + try { + lockA.lock(); System.out.println("ThreadOne acquired lockA"); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - synchronized (lockB) { + Thread.sleep(100); + while (!lockB.tryLock(1, TimeUnit.SECONDS)) { System.out.println("ThreadOne acquired lockB"); + continue; } + } catch (InterruptedException e) { + //e.printStackTrace(); + } finally { + lockA.unlock(); } } } @@ -122,16 +129,18 @@ public void run() { private static class ThreadTwo implements Runnable { @Override public void run() { - synchronized (lockB) { - System.out.println("ThreadTwo acquired lockB"); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - synchronized (lockA) { - System.out.println("ThreadTwo acquired lockA"); + try { + lockB.lock(); + System.out.println("ThreadOne acquired lockB"); + Thread.sleep(100); + while (!lockA.tryLock(1, TimeUnit.SECONDS)) { + System.out.println("ThreadOne acquired lockA"); + continue; } + } catch (InterruptedException e) { + //e.printStackTrace(); + } finally { + lockB.unlock(); } } } From 830c01a6b10793fead5661b99b78851c968cc313 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sun, 22 Dec 2024 11:29:41 +0800 Subject: [PATCH 13/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Add unit testing code, shutdown deadlock thread. --- .../org/apache/pulsar/common/configuration/VipStatusTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index 4ed212bbae123..5bad149eb7723 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -116,7 +116,6 @@ public void run() { Thread.sleep(100); while (!lockB.tryLock(1, TimeUnit.SECONDS)) { System.out.println("ThreadOne acquired lockB"); - continue; } } catch (InterruptedException e) { //e.printStackTrace(); @@ -135,7 +134,6 @@ public void run() { Thread.sleep(100); while (!lockA.tryLock(1, TimeUnit.SECONDS)) { System.out.println("ThreadOne acquired lockA"); - continue; } } catch (InterruptedException e) { //e.printStackTrace(); From 52ab05038787515003d04160dba22f8f936e1278 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Tue, 24 Dec 2024 21:55:56 +0800 Subject: [PATCH 14/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Modify unit testing code, use org.mockito.Mockito replaces MockServletContext. --- .../configuration/MockServletContext.java | 255 ------------------ .../common/configuration/VipStatusTest.java | 14 +- 2 files changed, 10 insertions(+), 259 deletions(-) delete mode 100644 pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java deleted file mode 100644 index 93df67c93d820..0000000000000 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/MockServletContext.java +++ /dev/null @@ -1,255 +0,0 @@ -package org.apache.pulsar.common.configuration; - -import org.eclipse.jetty.util.AttributesMap; - -import javax.servlet.*; -import javax.servlet.descriptor.JspConfigDescriptor; -import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Enumeration; -import java.util.EventListener; -import java.util.Map; -import java.util.Set; - -public class MockServletContext extends AttributesMap implements ServletContext { - @Override - public String getContextPath() { - return null; - } - - @Override - public ServletContext getContext(String s) { - return null; - } - - @Override - public int getMajorVersion() { - return 0; - } - - @Override - public int getMinorVersion() { - return 0; - } - - @Override - public int getEffectiveMajorVersion() { - return 0; - } - - @Override - public int getEffectiveMinorVersion() { - return 0; - } - - @Override - public String getMimeType(String s) { - return null; - } - - @Override - public Set getResourcePaths(String s) { - return null; - } - - @Override - public URL getResource(String s) throws MalformedURLException { - return null; - } - - @Override - public InputStream getResourceAsStream(String s) { - return null; - } - - @Override - public RequestDispatcher getRequestDispatcher(String s) { - return null; - } - - @Override - public RequestDispatcher getNamedDispatcher(String s) { - return null; - } - - @Override - public Servlet getServlet(String s) throws ServletException { - return null; - } - - @Override - public Enumeration getServlets() { - return null; - } - - @Override - public Enumeration getServletNames() { - return null; - } - - @Override - public void log(String s) { - - } - - @Override - public void log(Exception e, String s) { - - } - - @Override - public void log(String s, Throwable throwable) { - - } - - @Override - public String getRealPath(String s) { - return null; - } - - @Override - public String getServerInfo() { - return null; - } - - @Override - public String getInitParameter(String s) { - return null; - } - - @Override - public Enumeration getInitParameterNames() { - return null; - } - - @Override - public boolean setInitParameter(String s, String s1) { - return false; - } - - @Override - public String getServletContextName() { - return null; - } - - @Override - public ServletRegistration.Dynamic addServlet(String s, String s1) { - return null; - } - - @Override - public ServletRegistration.Dynamic addServlet(String s, Servlet servlet) { - return null; - } - - @Override - public ServletRegistration.Dynamic addServlet(String s, Class aClass) { - return null; - } - - @Override - public T createServlet(Class aClass) throws ServletException { - return null; - } - - @Override - public ServletRegistration getServletRegistration(String s) { - return null; - } - - @Override - public Map getServletRegistrations() { - return null; - } - - @Override - public FilterRegistration.Dynamic addFilter(String s, String s1) { - return null; - } - - @Override - public FilterRegistration.Dynamic addFilter(String s, Filter filter) { - return null; - } - - @Override - public FilterRegistration.Dynamic addFilter(String s, Class aClass) { - return null; - } - - @Override - public T createFilter(Class aClass) throws ServletException { - return null; - } - - @Override - public FilterRegistration getFilterRegistration(String s) { - return null; - } - - @Override - public Map getFilterRegistrations() { - return null; - } - - @Override - public SessionCookieConfig getSessionCookieConfig() { - return null; - } - - @Override - public void setSessionTrackingModes(Set set) { - - } - - @Override - public Set getDefaultSessionTrackingModes() { - return null; - } - - @Override - public Set getEffectiveSessionTrackingModes() { - return null; - } - - @Override - public void addListener(String s) { - - } - - @Override - public void addListener(T t) { - - } - - @Override - public void addListener(Class aClass) { - - } - - @Override - public T createListener(Class aClass) throws ServletException { - return null; - } - - @Override - public JspConfigDescriptor getJspConfigDescriptor() { - return null; - } - - @Override - public ClassLoader getClassLoader() { - return null; - } - - @Override - public void declareRoles(String... strings) { - - } - - @Override - public String getVirtualServerName() { - return null; - } -} diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index 5bad149eb7723..a8eccdeea463a 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -26,10 +26,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; +import javax.servlet.ServletContext; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.mockito.Mock; +import org.mockito.Mockito; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -43,7 +46,8 @@ public class VipStatusTest { // Rate limit status checks to every 500ms to prevent DoS private static final long CHECK_STATUS_INTERVAL = 500L; - private MockServletContext mockServletContext = new MockServletContext(); + @Mock + private ServletContext mockServletContext; private VipStatus vipStatus; @BeforeTest @@ -51,10 +55,12 @@ public void setup() throws IOException { String statusFilePath = "/tmp/status.html"; File file = new File(statusFilePath); file.createNewFile(); - - mockServletContext.setAttribute(ATTRIBUTE_STATUS_FILE_PATH, statusFilePath); Supplier isReadyProbe = () -> true; - mockServletContext.setAttribute(ATTRIBUTE_IS_READY_PROBE, isReadyProbe); + + mockServletContext = Mockito.mock(ServletContext.class); + Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH)).thenReturn(statusFilePath); + Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE)).thenReturn(isReadyProbe); + vipStatus = new VipStatus(mockServletContext, LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED); } From 5b0c2ecf8d58c80916e6cf6a9748b0756fa35277 Mon Sep 17 00:00:00 2001 From: yangyijun <1012293987@qq.com> Date: Fri, 3 Jan 2025 15:09:38 +0800 Subject: [PATCH 15/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Modify unit testing code,delete @Mock annotation. --- .../org/apache/pulsar/common/configuration/VipStatusTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index a8eccdeea463a..a3893f9da63ab 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -46,7 +46,6 @@ public class VipStatusTest { // Rate limit status checks to every 500ms to prevent DoS private static final long CHECK_STATUS_INTERVAL = 500L; - @Mock private ServletContext mockServletContext; private VipStatus vipStatus; @@ -149,4 +148,4 @@ public void run() { } } } -} \ No newline at end of file +} From b1eaedd4a467c26808d655b92380adeaea37c24f Mon Sep 17 00:00:00 2001 From: yangyijun <1012293987@qq.com> Date: Fri, 3 Jan 2025 15:18:22 +0800 Subject: [PATCH 16/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Modify unit testing code,delete @Mock annotation. --- .../org/apache/pulsar/common/configuration/VipStatusTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index a3893f9da63ab..d98af5d2483ec 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -31,7 +31,6 @@ import javax.ws.rs.core.Response; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.mockito.Mock; import org.mockito.Mockito; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; From 1a2f6aaa2ab4910fe34d25470b7726f1c6836da1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 3 Jan 2025 17:20:34 +0200 Subject: [PATCH 17/23] Fix checkstyle --- .../java/org/apache/pulsar/common/configuration/VipStatus.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 203d03e974062..d1fdda65f127a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.configuration; +import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; @@ -32,7 +33,6 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response.Status; -import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.util.ThreadDumpUtil; From 03e6e02e4ca0c262d49925e7ac6bac0d5e193cb0 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sat, 15 Feb 2025 22:58:10 +0800 Subject: [PATCH 18/23] [improvement][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. Modify unit testing code, use org.assertj.core.util.Files create a Temporary files. --- .../pulsar/common/configuration/VipStatusTest.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index d98af5d2483ec..9d5c7d62e9a0d 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -31,6 +31,7 @@ import javax.ws.rs.core.Response; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.assertj.core.util.Files; import org.mockito.Mockito; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; @@ -47,18 +48,15 @@ public class VipStatusTest { private ServletContext mockServletContext; private VipStatus vipStatus; + private File file; @BeforeTest public void setup() throws IOException { - String statusFilePath = "/tmp/status.html"; - File file = new File(statusFilePath); - file.createNewFile(); + file = Files.newTemporaryFile(); Supplier isReadyProbe = () -> true; - mockServletContext = Mockito.mock(ServletContext.class); - Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH)).thenReturn(statusFilePath); + Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH)).thenReturn(file.getAbsolutePath()); Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE)).thenReturn(isReadyProbe); - vipStatus = new VipStatus(mockServletContext, LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED); } @@ -72,8 +70,6 @@ public void testVipStatusCheckStatus() { @AfterTest public void release() throws IOException { - String statusFilePath = "/tmp/status.html"; - File file = new File(statusFilePath); file.deleteOnExit(); } From b8da8adaa0b3524f5dd62024943d4ab45cf2ed66 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sun, 6 Apr 2025 10:52:36 +0800 Subject: [PATCH 19/23] [improve][broker] If there is a deadlock in the service, the probe should return a failure because the service may be unavailable. test case thread sleep 200ms. --- .../org/apache/pulsar/common/configuration/VipStatusTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index 9d5c7d62e9a0d..d98e5d25b2b78 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -113,7 +113,7 @@ public void run() { try { lockA.lock(); System.out.println("ThreadOne acquired lockA"); - Thread.sleep(100); + Thread.sleep(200); while (!lockB.tryLock(1, TimeUnit.SECONDS)) { System.out.println("ThreadOne acquired lockB"); } @@ -131,7 +131,7 @@ public void run() { try { lockB.lock(); System.out.println("ThreadOne acquired lockB"); - Thread.sleep(100); + Thread.sleep(200); while (!lockA.tryLock(1, TimeUnit.SECONDS)) { System.out.println("ThreadOne acquired lockA"); } From 65261a0b93d8ef65a1dc6d36811959cbccf89228 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 18 Sep 2025 19:17:39 +0300 Subject: [PATCH 20/23] Improve log message --- .../org/apache/pulsar/common/configuration/VipStatus.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index d1fdda65f127a..ea735f1cb7559 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -107,8 +107,8 @@ public String checkStatus() { .collect(Collectors.joining(", ")); if (clock.millis() - lastPrintThreadDumpTimestamp > printThreadDumpIntervalMs) { String diagnosticResult = ThreadDumpUtil.buildThreadDiagnosticString(); - log.error("Deadlock detected, service may be unavailable, " - + "thread stack details are as follows: {}.", diagnosticResult); + log.error("Deadlocked threads detected. {}. Service may be unavailable, " + + "thread stack details are as follows:\n{}", threadNames, diagnosticResult); lastPrintThreadDumpTimestamp = clock.millis(); } else { log.error("Deadlocked threads detected. {}", threadNames); From 71bb442cb6b89ce9a029f0242081441c9d8adca6 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 19 Sep 2025 00:34:25 +0300 Subject: [PATCH 21/23] Fix test --- pulsar-broker-common/pom.xml | 6 +++ .../common/configuration/VipStatus.java | 7 +++ .../common/configuration/VipStatusTest.java | 45 ++++++++++++------- 3 files changed, 42 insertions(+), 16 deletions(-) diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index 06b855a3f1843..50ce9f466c32f 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -94,6 +94,12 @@ rest-assured test + + + org.glassfish.jersey.core + jersey-server + test + diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index ea735f1cb7559..31f417dca4a2c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -74,6 +74,13 @@ public VipStatus(ServletContext servletContext, long printThreadDumpIntervalMs) this.clock = Clock.systemUTC(); } + @VisibleForTesting + static void reset() { + lastCheckStatusTimestamp = 0L; + lastPrintThreadDumpTimestamp = 0L; + lastCheckStatusResult = false; + } + @GET public String checkStatus() { // Locking classes to avoid deadlock detection in multi-thread concurrent requests. diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index d98e5d25b2b78..10bd69f0c97cf 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.common.configuration; import static org.testng.Assert.assertEquals; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -33,8 +34,8 @@ import lombok.extern.slf4j.Slf4j; import org.assertj.core.util.Files; import org.mockito.Mockito; -import org.testng.annotations.AfterTest; -import org.testng.annotations.BeforeTest; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Slf4j @@ -50,7 +51,7 @@ public class VipStatusTest { private VipStatus vipStatus; private File file; - @BeforeTest + @BeforeMethod public void setup() throws IOException { file = Files.newTemporaryFile(); Supplier isReadyProbe = () -> true; @@ -58,6 +59,7 @@ public void setup() throws IOException { Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH)).thenReturn(file.getAbsolutePath()); Mockito.when(mockServletContext.getAttribute(ATTRIBUTE_IS_READY_PROBE)).thenReturn(isReadyProbe); vipStatus = new VipStatus(mockServletContext, LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED); + VipStatus.reset(); } @Test @@ -68,26 +70,32 @@ public void testVipStatusCheckStatus() { testVipStatusCheckStatusWithDeadlock(); } - @AfterTest + @AfterMethod(alwaysRun = true) public void release() throws IOException { - file.deleteOnExit(); + if (file != null) { + file.delete(); + file = null; + } } + @Test public void testVipStatusCheckStatusWithoutDeadlock() { assertEquals(vipStatus.checkStatus(), "OK"); } + @Test public void testVipStatusCheckStatusWithDeadlock() { - MockDeadlock.startDeadlock(); + MockDeadlock mockDeadlock = new MockDeadlock(); boolean asExpected = true; try { + mockDeadlock.startDeadlock(); vipStatus.checkStatus(); asExpected = false; System.out.println("Simulated deadlock, no deadlock detected, not as expected."); } catch (Exception wae) { System.out.println("Simulated deadlock and detected it, as expected."); } finally { - MockDeadlock.executorService.shutdownNow(); + mockDeadlock.close(); } if (!asExpected) { @@ -95,19 +103,24 @@ public void testVipStatusCheckStatusWithDeadlock() { } } - public class MockDeadlock { - private static ExecutorService executorService = Executors.newCachedThreadPool(); - private static ReentrantLock lockA = new ReentrantLock(); - private static ReentrantLock lockB = new ReentrantLock(); + static class MockDeadlock implements Closeable { + private ExecutorService executorService = Executors.newCachedThreadPool(); + private ReentrantLock lockA = new ReentrantLock(); + private ReentrantLock lockB = new ReentrantLock(); @SneakyThrows - public static void startDeadlock() { - executorService.execute(new ThreadOne()); - executorService.execute(new ThreadTwo()); + public void startDeadlock() { + executorService.execute(new TaskOne()); + executorService.execute(new TaskTwo()); Thread.sleep(CHECK_STATUS_INTERVAL); } - private static class ThreadOne implements Runnable { + @Override + public void close() { + executorService.shutdownNow(); + } + + private class TaskOne implements Runnable { @Override public void run() { try { @@ -125,7 +138,7 @@ public void run() { } } - private static class ThreadTwo implements Runnable { + private class TaskTwo implements Runnable { @Override public void run() { try { From 81b97cc9e4c63641dd900e78b9f880efffd98415 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 19 Sep 2025 00:38:40 +0300 Subject: [PATCH 22/23] Ensure that test isn't flaky --- .../apache/pulsar/common/configuration/VipStatusTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java index 10bd69f0c97cf..36542d237b5e3 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/VipStatusTest.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -107,6 +108,7 @@ static class MockDeadlock implements Closeable { private ExecutorService executorService = Executors.newCachedThreadPool(); private ReentrantLock lockA = new ReentrantLock(); private ReentrantLock lockB = new ReentrantLock(); + private Phaser phaser = new Phaser(2); @SneakyThrows public void startDeadlock() { @@ -126,7 +128,7 @@ public void run() { try { lockA.lock(); System.out.println("ThreadOne acquired lockA"); - Thread.sleep(200); + phaser.arriveAndAwaitAdvance(); while (!lockB.tryLock(1, TimeUnit.SECONDS)) { System.out.println("ThreadOne acquired lockB"); } @@ -144,7 +146,7 @@ public void run() { try { lockB.lock(); System.out.println("ThreadOne acquired lockB"); - Thread.sleep(200); + phaser.arriveAndAwaitAdvance(); while (!lockA.tryLock(1, TimeUnit.SECONDS)) { System.out.println("ThreadOne acquired lockA"); } From 49af659b36b58acff82fd6e9e6069c2357e8e1ca Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 19 Sep 2025 00:42:23 +0300 Subject: [PATCH 23/23] Fix warning message --- .../java/org/apache/pulsar/common/configuration/VipStatus.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java index 31f417dca4a2c..aa4ec1109a614 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java @@ -129,7 +129,8 @@ public String checkStatus() { } } lastCheckStatusResult = false; - log.warn("Failed to access \"status.html\". The service is not ready"); + log.warn("Status file '{}' doesn't exist or ready probe value ({}) isn't true. The service is not ready", + statusFilePath, isReady); throw new WebApplicationException(Status.NOT_FOUND); } }