Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ private[deploy] class Worker(

private var registerMasterFutures: Array[JFuture[_]] = null
private var registrationRetryTimer: Option[JScheduledFuture[_]] = None
private var heartbeatTask: Option[JScheduledFuture[_]] = None
private var workDirCleanupTask: Option[JScheduledFuture[_]] = None

// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
Expand Down Expand Up @@ -492,16 +494,25 @@ private[deploy] class Worker(
logInfo(log"Successfully registered with master ${MDC(MASTER_URL, preferredMasterAddress)}")
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {

// Only schedule heartbeat task if not already scheduled. The existing task will
// continue running through reconnections, and the SendHeartbeat handler already
// checks the 'connected' flag before sending heartbeats to master.
if (heartbeatTask.isEmpty) {
heartbeatTask = Some(forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
},
0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS))
}
// Only schedule work directory cleanup task if not already scheduled
if (CLEANUP_ENABLED && workDirCleanupTask.isEmpty) {
logInfo(
log"Worker cleanup enabled; old application directories will be deleted in: " +
log"${MDC(PATH, workDir)}")
forwardMessageScheduler.scheduleAtFixedRate(
workDirCleanupTask = Some(forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS))
}

val execs = executors.values.map { e =>
Expand Down Expand Up @@ -852,6 +863,10 @@ private[deploy] class Worker(
cleanupThreadExecutor.shutdownNow()
metricsSystem.report()
cancelLastRegistrationRetry()
heartbeatTask.foreach(_.cancel(true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handleRegisterResponse is a synchronized code block. Don't the operations on heartbeatTask and workDirCleanupTask within onStop also require synchronized protection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronized block was introduced by #9138 to avoid some race conditions in very early implementation with some async call back...

Looks like not be necessary now...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker is a ThreadSafeRpcEndpoint already. The synchronized protection seems to be unnecessary today.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so can we remove that unnecessary synchronized in a separate pr ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a separate task to revisit the synchronized usage here.

heartbeatTask = None
workDirCleanupTask.foreach(_.cancel(true))
workDirCleanupTask = None
forwardMessageScheduler.shutdownNow()
registerMasterThreadPool.shutdownNow()
executors.values.foreach(_.kill())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.worker

import java.io.{File, IOException}
import java.util.concurrent.{ScheduledFuture => JScheduledFuture}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Supplier

Expand All @@ -37,7 +38,7 @@ import org.scalatest.matchers.should.Matchers._
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.TestUtils.{createTempJsonFile, createTempScriptWithExpectedOutput}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, RegisteredWorker, WorkDirCleanup}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.internal.config
import org.apache.spark.internal.config.SHUFFLE_SERVICE_DB_BACKEND
Expand All @@ -46,7 +47,7 @@ import org.apache.spark.network.shuffledb.DBBackend
import org.apache.spark.resource.{ResourceAllocation, ResourceInformation}
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs.{WORKER_FPGA_ID, WORKER_GPU_ID}
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.util.Utils

class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with PrivateMethodTester {
Expand Down Expand Up @@ -405,4 +406,41 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with P
}.getMessage
assert(m.contains("Whitespace is not allowed"))
}

test("SPARK-54312: heartbeat task and workdir cleanup task should only be scheduled once " +
"across multiple registrations") {
val worker = spy(makeWorker())
val masterWebUiUrl = "https://1.2.3.4:8080"
val masterAddress = RpcAddress("1.2.3.4", 1234)
val masterRef = mock(classOf[RpcEndpointRef])
when(masterRef.address).thenReturn(masterAddress)

def getHeartbeatTask(worker: Worker): Option[JScheduledFuture[_]] = {
val _heartbeatTask =
PrivateMethod[Option[JScheduledFuture[_]]](Symbol("heartbeatTask"))
worker.invokePrivate(_heartbeatTask())
}

def getWorkDirCleanupTask(worker: Worker): Option[JScheduledFuture[_]] = {
val _workDirCleanupTask =
PrivateMethod[Option[JScheduledFuture[_]]](Symbol("workDirCleanupTask"))
worker.invokePrivate(_workDirCleanupTask())
}

// Tasks should not be scheduled yet before registration
assert(getHeartbeatTask(worker).isEmpty && getWorkDirCleanupTask(worker).isEmpty)

val msg = RegisteredWorker(masterRef, masterWebUiUrl, masterAddress, duplicate = false)
// Simulate first registration - this should schedule both tasks
worker.receive(msg)
val heartbeatTask = getHeartbeatTask(worker)
val workDirCleanupTask = getWorkDirCleanupTask(worker)
assert(heartbeatTask.isDefined && workDirCleanupTask.isDefined)

// Simulate disconnection and re-registration
worker.receive(msg)
// After re-registration, the task references should be the same (not rescheduled)
assert(getHeartbeatTask(worker) == heartbeatTask)
assert(getWorkDirCleanupTask(worker) == workDirCleanupTask)
}
}