Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ class IntervalTestInfoSender(
}

override fun stopSendingTests() {
sendTests(collectTests())
messageSender.shutdown()
scheduledThreadPool.shutdown()
if (!scheduledThreadPool.awaitTermination(1, TimeUnit.SECONDS)) {
logger.error("Failed to send some tests prior to shutdown")
scheduledThreadPool.shutdownNow();
}
sendTests(collectTests())
messageSender.shutdown()
logger.info { "Test sending job is stopped." }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.epam.drill.agent.transport

import mu.KotlinLogging
import com.epam.drill.agent.common.transport.AgentMessage
import com.epam.drill.agent.common.transport.AgentMessageDestination
import com.epam.drill.agent.common.transport.AgentMessageSender
import kotlinx.serialization.KSerializer
Expand Down Expand Up @@ -52,13 +51,9 @@ open class QueuedAgentMessageSender(
}
}

override fun <T>send(destination: AgentMessageDestination, message: T, serializer: KSerializer<T>) {
override fun <T> send(destination: AgentMessageDestination, message: T, serializer: KSerializer<T>) {
val mappedDestination = destinationMapper.map(destination)
val serializedMessage = messageSerializer.serialize(message, serializer)
if (!isRunning.get()) {
handleUnsent(mappedDestination, serializedMessage, "sender is not running")
return
}
if (!messageQueue.offer(Pair(mappedDestination, serializedMessage))) {
handleUnsent(mappedDestination, serializedMessage, "queue capacity limit reached")
return
Expand All @@ -69,10 +64,10 @@ open class QueuedAgentMessageSender(
}

override fun shutdown() {
isRunning.set(false)
if (!isRunning.compareAndSet(true, false)) return
executor.shutdown()
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
executor.shutdownNow()
}
} catch (e: InterruptedException) {
Expand Down Expand Up @@ -144,6 +139,7 @@ open class QueuedAgentMessageSender(
tryToSend(destination, message) || handleUnsent(destination, message, reason)
}
} while (message != null)
logger.info { "Finished unloading a message queue." }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ class IntervalCoverageSender(
}

override fun stopSendingCoverage() {
sendProbes(collectReleasedProbes())
sendProbes(collectUnreleasedProbes())
sender.shutdown()
scheduledThreadPool.shutdown()
if (!scheduledThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
if (!scheduledThreadPool.awaitTermination(1, TimeUnit.SECONDS)) {
logger.error("Failed to send some coverage data prior to shutdown")
scheduledThreadPool.shutdownNow();
}
sendProbes(collectReleasedProbes())
sendProbes(collectUnreleasedProbes())
sender.shutdown()
logger.info { "Coverage sending job is stopped." }
}

Expand Down
Loading