From 71f9cdb40af0ebbb9c8a5db811ff59e43dadae60 Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Mon, 14 Apr 2025 22:20:13 +0200 Subject: [PATCH 01/20] Added logging --- .../org/wagham/kabotapi/components/InstanceInactivityManager.kt | 2 +- .../org/wagham/kabotapi/components/socket/CommandComponent.kt | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 019f1ad..2708f32 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -51,7 +51,7 @@ class InstanceInactivityManager( private fun listenForZombies() = managerScope.launch { delay(instanceTtl) - doInfinity("0 0 * * * *") { + doInfinity("0 30 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { it.name !in excludedInstances diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt index 8d057b3..48a85c9 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt @@ -25,6 +25,7 @@ class CommandComponent( override fun handlePacket(packet: String) { val parts = packet.split('|') + logger.info("Received packet: $parts") packetChannel.trySend( ParsedPacket( parts[0].toLong(), From 5e7645d5a11013915215f90c61c7faecd50548c1 Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Mon, 14 Apr 2025 22:32:33 +0200 Subject: [PATCH 02/20] updated cron --- .../org/wagham/kabotapi/components/InstanceInactivityManager.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 2708f32..a87c5dc 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -51,7 +51,7 @@ class InstanceInactivityManager( private fun listenForZombies() = managerScope.launch { delay(instanceTtl) - doInfinity("0 30 * * * *") { + doInfinity("0 50 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { it.name !in excludedInstances From 1f7312e1ac759626ab5039e5e32247766e6309f2 Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Mon, 14 Apr 2025 22:50:51 +0200 Subject: [PATCH 03/20] Removed wait --- .../wagham/kabotapi/components/InstanceInactivityManager.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index a87c5dc..424601c 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -50,8 +50,8 @@ class InstanceInactivityManager( } private fun listenForZombies() = managerScope.launch { - delay(instanceTtl) - doInfinity("0 50 * * * *") { +// delay(instanceTtl) + doInfinity("0 0 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { it.name !in excludedInstances From 6ee83f8342a99cf7eb777f92559479aa0b4f0bec Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Mon, 14 Apr 2025 23:04:09 +0200 Subject: [PATCH 04/20] Added trim --- .../wagham/kabotapi/components/InstanceInactivityManager.kt | 2 +- .../org/wagham/kabotapi/components/socket/CommandComponent.kt | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 424601c..20bd0fc 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -51,7 +51,7 @@ class InstanceInactivityManager( private fun listenForZombies() = managerScope.launch { // delay(instanceTtl) - doInfinity("0 0 * * * *") { + doInfinity("0 15 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { it.name !in excludedInstances diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt index 48a85c9..1bcc23c 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt @@ -24,8 +24,7 @@ class CommandComponent( private val socketMutex = Mutex() override fun handlePacket(packet: String) { - val parts = packet.split('|') - logger.info("Received packet: $parts") + val parts = packet.trim('\n').split('|') packetChannel.trySend( ParsedPacket( parts[0].toLong(), From 5018c33f4898acda19d42d8e16d4668f01af002f Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Mon, 14 Apr 2025 23:19:42 +0200 Subject: [PATCH 05/20] Added limit on split --- .../org/wagham/kabotapi/components/socket/CommandComponent.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt index 1bcc23c..4864b0d 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt @@ -24,7 +24,7 @@ class CommandComponent( private val socketMutex = Mutex() override fun handlePacket(packet: String) { - val parts = packet.trim('\n').split('|') + val parts = packet.trim('\n').split('|', limit = 4) packetChannel.trySend( ParsedPacket( parts[0].toLong(), From c83cd88870564ade375048d1b896a59dd24d22fd Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Mon, 14 Apr 2025 23:22:23 +0200 Subject: [PATCH 06/20] Updated crontab --- .../org/wagham/kabotapi/components/InstanceInactivityManager.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 20bd0fc..6e577c7 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -51,7 +51,7 @@ class InstanceInactivityManager( private fun listenForZombies() = managerScope.launch { // delay(instanceTtl) - doInfinity("0 15 * * * *") { + doInfinity("0 30 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { it.name !in excludedInstances From 2f4f291e393d1e14c451fc40b1761104347c3f76 Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Tue, 15 Apr 2025 10:37:41 +0200 Subject: [PATCH 07/20] Updated logging --- .../kabotapi/components/InstanceInactivityManager.kt | 2 +- .../kabotapi/components/socket/AbstractUdpListener.kt | 2 +- .../kabotapi/components/socket/CommandComponent.kt | 9 +++++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 6e577c7..424601c 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -51,7 +51,7 @@ class InstanceInactivityManager( private fun listenForZombies() = managerScope.launch { // delay(instanceTtl) - doInfinity("0 30 * * * *") { + doInfinity("0 0 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { it.name !in excludedInstances diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractUdpListener.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractUdpListener.kt index a9036d3..c0dda25 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractUdpListener.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractUdpListener.kt @@ -8,7 +8,7 @@ import io.ktor.util.logging.* abstract class AbstractUdpListener( listenPort: Int, protected val logger: Logger, - private val enableLogging: Boolean + protected val enableLogging: Boolean ) { private val receiveSocket = DatagramSocket(listenPort) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt index 4864b0d..dfe78ca 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt @@ -1,6 +1,7 @@ package org.wagham.kabotapi.components.socket import io.ktor.util.logging.* +import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -63,6 +64,14 @@ class CommandComponent( } } } + }.onFailure { + if (enableLogging) { + if(it is TimeoutCancellationException) { + logger.error("Timed out waiting for command $command") + } else { + logger.error("Error while waiting for command $command", it) + } + } }.getOrDefault(false) } while (hasNext) } From 211cb8537543dd3baf773c611d9a3d4faadf7c9d Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Tue, 15 Apr 2025 11:16:30 +0200 Subject: [PATCH 08/20] Updated process info and socket timeout --- .../kabotapi/components/InstanceInactivityManager.kt | 2 +- .../kabotapi/components/socket/CommandComponent.kt | 2 +- .../org/wagham/kabotapi/entities/pm2/Pm2ProcessInfo.kt | 10 ++++++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 424601c..27b7b54 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -54,7 +54,7 @@ class InstanceInactivityManager( doInfinity("0 0 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { - it.name !in excludedInstances + it.isActive && it.name !in excludedInstances }.forEach { if ((System.currentTimeMillis() - it.pm2Env.uptime).milliseconds > 1.hours) { val lastActivity = instanceActivity.getIfPresent(it.name) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt index dfe78ca..fcf6eea 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt @@ -49,7 +49,7 @@ class CommandComponent( val responseJob = launch { do { val hasNext = runCatching { - withTimeout(500.milliseconds) { + withTimeout(750.milliseconds) { val next = packetChannel.peek() when { next.ts < command.ts -> { diff --git a/src/main/kotlin/org/wagham/kabotapi/entities/pm2/Pm2ProcessInfo.kt b/src/main/kotlin/org/wagham/kabotapi/entities/pm2/Pm2ProcessInfo.kt index 8ab89aa..fdda80c 100644 --- a/src/main/kotlin/org/wagham/kabotapi/entities/pm2/Pm2ProcessInfo.kt +++ b/src/main/kotlin/org/wagham/kabotapi/entities/pm2/Pm2ProcessInfo.kt @@ -10,12 +10,18 @@ data class Pm2ProcessInfo( val name: String, @SerialName("pm2_env") val pm2Env: Pm2Env, val monit: Pm2Monit -) +) { + + val isActive: Boolean + get() = pm2Env.status == "online" + +} @Serializable data class Pm2Env( @SerialName("unstable_restarts") val unstableRestarts: Int, - @SerialName("pm_uptime") val uptime: Long + @SerialName("pm_uptime") val uptime: Long, + val status: String ) @Serializable From f9fca6836eb7595b96765d587536e5221e52f22c Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Tue, 15 Apr 2025 12:28:18 +0200 Subject: [PATCH 09/20] Updated socket timeout --- .../org/wagham/kabotapi/components/socket/CommandComponent.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt index fcf6eea..8957150 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt @@ -12,7 +12,7 @@ import org.wagham.kabotapi.data.PeekableChannel import java.net.DatagramPacket import java.net.DatagramSocket import java.net.InetAddress -import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds class CommandComponent( private val sendPort: Int, @@ -49,7 +49,7 @@ class CommandComponent( val responseJob = launch { do { val hasNext = runCatching { - withTimeout(750.milliseconds) { + withTimeout(1.seconds) { val next = packetChannel.peek() when { next.ts < command.ts -> { From d61242bc5c259b626047ec96bb49d60063addee4 Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Mon, 21 Apr 2025 11:17:41 +0200 Subject: [PATCH 10/20] Use TCP socket instead of UDP for commands --- .../components/InstanceInactivityManager.kt | 2 +- .../components/socket/AbstractTcpListener.kt | 41 +++++++++++++++++++ .../components/socket/CommandComponent.kt | 14 +++---- 3 files changed, 49 insertions(+), 8 deletions(-) create mode 100644 src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractTcpListener.kt diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 27b7b54..53948bc 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -51,7 +51,7 @@ class InstanceInactivityManager( private fun listenForZombies() = managerScope.launch { // delay(instanceTtl) - doInfinity("0 0 * * * *") { + doInfinity("0 25 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { it.isActive && it.name !in excludedInstances diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractTcpListener.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractTcpListener.kt new file mode 100644 index 0000000..ff04346 --- /dev/null +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractTcpListener.kt @@ -0,0 +1,41 @@ +package org.wagham.kabotapi.components.socket + +import kotlin.concurrent.thread +import io.ktor.util.logging.* +import java.io.BufferedReader +import java.io.InputStreamReader +import java.net.ServerSocket + +abstract class AbstractTcpListener( + listenPort: Int, + protected val logger: Logger, + protected val enableLogging: Boolean +) { + + private val receiveSocket = ServerSocket(listenPort) + + protected abstract fun handlePacket(packet: String) + + fun startListening() { + logger.info("Starting ${this::class.simpleName}") + thread(start = true, isDaemon = true) { + try { + while (true) { + val clientSocket = receiveSocket.accept() + clientSocket.use { socket -> + val reader = BufferedReader(InputStreamReader(socket.getInputStream())) + var line: String? + while (reader.readLine().also { line = it } != null) { + if (enableLogging) { + logger.info("Received: $line") + } + handlePacket(line!!) + } + } + } + } catch (e: Exception) { + logger.error("Error in server loop", e) + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt index 8957150..db90dc8 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt @@ -9,16 +9,15 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withTimeout import org.wagham.kabotapi.data.PeekableChannel -import java.net.DatagramPacket -import java.net.DatagramSocket import java.net.InetAddress +import java.net.Socket import kotlin.time.Duration.Companion.seconds class CommandComponent( private val sendPort: Int, receivePort: Int, enableLogging: Boolean, -): AbstractUdpListener(receivePort, KtorSimpleLogger("CommandComponent"), enableLogging) { +): AbstractTcpListener(receivePort, KtorSimpleLogger("CommandComponent"), enableLogging) { private val packetChannel = PeekableChannel(capacity = 1000, onBufferOverflow = BufferOverflow.DROP_OLDEST) private val address = InetAddress.getByName("127.0.0.1") @@ -36,10 +35,11 @@ class CommandComponent( ) } - private fun sendCommand(datagram: ByteArray) { - DatagramSocket().use { socket -> - val packet = DatagramPacket(datagram, datagram.size, address, sendPort) - socket.send(packet) + private fun sendCommand(data: ByteArray) { + Socket(address, sendPort).use { socket -> + val output = socket.getOutputStream() + output.write(data) + output.flush() } } From 8621da4d9b6c5c9a474e87d32468424011999c7a Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Mon, 21 Apr 2025 11:36:07 +0200 Subject: [PATCH 11/20] Revert "Use TCP socket instead of UDP for commands" This reverts commit d61242bc5c259b626047ec96bb49d60063addee4. --- .../components/InstanceInactivityManager.kt | 2 +- .../components/socket/AbstractTcpListener.kt | 41 ------------------- .../components/socket/CommandComponent.kt | 14 +++---- 3 files changed, 8 insertions(+), 49 deletions(-) delete mode 100644 src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractTcpListener.kt diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 53948bc..27b7b54 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -51,7 +51,7 @@ class InstanceInactivityManager( private fun listenForZombies() = managerScope.launch { // delay(instanceTtl) - doInfinity("0 25 * * * *") { + doInfinity("0 0 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { it.isActive && it.name !in excludedInstances diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractTcpListener.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractTcpListener.kt deleted file mode 100644 index ff04346..0000000 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractTcpListener.kt +++ /dev/null @@ -1,41 +0,0 @@ -package org.wagham.kabotapi.components.socket - -import kotlin.concurrent.thread -import io.ktor.util.logging.* -import java.io.BufferedReader -import java.io.InputStreamReader -import java.net.ServerSocket - -abstract class AbstractTcpListener( - listenPort: Int, - protected val logger: Logger, - protected val enableLogging: Boolean -) { - - private val receiveSocket = ServerSocket(listenPort) - - protected abstract fun handlePacket(packet: String) - - fun startListening() { - logger.info("Starting ${this::class.simpleName}") - thread(start = true, isDaemon = true) { - try { - while (true) { - val clientSocket = receiveSocket.accept() - clientSocket.use { socket -> - val reader = BufferedReader(InputStreamReader(socket.getInputStream())) - var line: String? - while (reader.readLine().also { line = it } != null) { - if (enableLogging) { - logger.info("Received: $line") - } - handlePacket(line!!) - } - } - } - } catch (e: Exception) { - logger.error("Error in server loop", e) - } - } - } -} \ No newline at end of file diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt index db90dc8..8957150 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/CommandComponent.kt @@ -9,15 +9,16 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withTimeout import org.wagham.kabotapi.data.PeekableChannel +import java.net.DatagramPacket +import java.net.DatagramSocket import java.net.InetAddress -import java.net.Socket import kotlin.time.Duration.Companion.seconds class CommandComponent( private val sendPort: Int, receivePort: Int, enableLogging: Boolean, -): AbstractTcpListener(receivePort, KtorSimpleLogger("CommandComponent"), enableLogging) { +): AbstractUdpListener(receivePort, KtorSimpleLogger("CommandComponent"), enableLogging) { private val packetChannel = PeekableChannel(capacity = 1000, onBufferOverflow = BufferOverflow.DROP_OLDEST) private val address = InetAddress.getByName("127.0.0.1") @@ -35,11 +36,10 @@ class CommandComponent( ) } - private fun sendCommand(data: ByteArray) { - Socket(address, sendPort).use { socket -> - val output = socket.getOutputStream() - output.write(data) - output.flush() + private fun sendCommand(datagram: ByteArray) { + DatagramSocket().use { socket -> + val packet = DatagramPacket(datagram, datagram.size, address, sendPort) + socket.send(packet) } } From f8af8ea14d3c2e7dd39f8a3e4f01598d60933574 Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Mon, 21 Apr 2025 12:11:22 +0200 Subject: [PATCH 12/20] Added retry for zombie inactivities --- .../kabotapi/components/InstanceInactivityManager.kt | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 27b7b54..2578418 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -50,7 +50,7 @@ class InstanceInactivityManager( } private fun listenForZombies() = managerScope.launch { -// delay(instanceTtl) + delay(instanceTtl) doInfinity("0 0 * * * *") { try { commandComponent.sendSocketCommand(Pm2ListCommand()).filter { @@ -59,7 +59,13 @@ class InstanceInactivityManager( if ((System.currentTimeMillis() - it.pm2Env.uptime).milliseconds > 1.hours) { val lastActivity = instanceActivity.getIfPresent(it.name) if (lastActivity == null) { - commandComponent.sendSocketCommand(Pm2StopCommand(it.name)) + var retries = 5 + do { + retries = runCatching { + commandComponent.sendSocketCommand(Pm2StopCommand(it.name)) + 0 + }.getOrDefault(retries - 1) + } while(retries > 0) } } } From 0b43eaee14754007f14f0aec592ce1edd85f0a0b Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Tue, 22 Apr 2025 21:47:59 +0200 Subject: [PATCH 13/20] Added foundry endpoints --- gradle/libs.versions.toml | 19 +++- kabot-db-connector | 2 +- .../wagham/kabotapi/KabotApiApplication.kt | 2 + .../components/InstanceConfigManager.kt | 40 +++++++- .../components/InstanceInactivityManager.kt | 6 +- .../wagham/kabotapi/components/JWTManager.kt | 7 +- .../kabotapi/configuration/HttpConfig.kt | 2 +- .../kabotapi/configuration/KoinConfig.kt | 11 +++ .../kabotapi/configuration/RouteConfig.kt | 4 + .../configuration/ThrottlingConfig.kt | 24 +++++ .../kabotapi/controllers/FoundryController.kt | 68 ++++++++++++++ .../kabotapi/controllers/GuildController.kt | 1 - .../kabotapi/entities/config/FoundryConfig.kt | 2 + .../kabotapi/entities/dto/InstanceInfoDto.kt | 15 +++ .../entities/foundry/FoundryOptions.kt | 3 +- .../org/wagham/kabotapi/logic/FoundryLogic.kt | 30 ++++++ .../kabotapi/logic/impl/FoundryLogicImpl.kt | 91 +++++++++++++++++++ 17 files changed, 312 insertions(+), 15 deletions(-) create mode 100644 src/main/kotlin/org/wagham/kabotapi/configuration/ThrottlingConfig.kt create mode 100644 src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt create mode 100644 src/main/kotlin/org/wagham/kabotapi/entities/dto/InstanceInfoDto.kt create mode 100644 src/main/kotlin/org/wagham/kabotapi/logic/FoundryLogic.kt create mode 100644 src/main/kotlin/org/wagham/kabotapi/logic/impl/FoundryLogicImpl.kt diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d4a6e75..6568c17 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -2,7 +2,7 @@ ktor = "3.1.2" logback = "1.4.14" kfs = "1.3.0" -koin = "4.0.4" +koin = "4.1.0-Beta7" caffeine = "3.1.0" tike = "2.9.1" jbcrypt = "0.4" @@ -22,13 +22,15 @@ ktor-server-netty = { module = "io.ktor:ktor-server-netty-jvm", version.ref = "k ktor-server-auth = { module = "io.ktor:ktor-server-auth", version.ref = "ktor" } ktor-server-auth-jwt = { module = "io.ktor:ktor-server-auth-jwt", version.ref = "ktor" } ktor-server-status-pages = { module = "io.ktor:ktor-server-status-pages", version.ref = "ktor" } +ktor-server-sse = { module = "io.ktor:ktor-server-sse", version.ref = "ktor" } +ktor-server-rate-limit = { module = "io.ktor:ktor-server-rate-limit", version.ref = "ktor"} ktor-serialization-kotlinx = { module = "io.ktor:ktor-serialization-kotlinx-json", version.ref = "ktor" } ktor-serialization-jackson = { module = "io.ktor:ktor-serialization-jackson", version.ref = "ktor" } ktor-client-cio = { module = "io.ktor:ktor-client-cio-jvm", version.ref = "ktor" } ktor-client-core = { module = "io.ktor:ktor-client-core-jvm", version.ref = "ktor" } ktor-client-content-negotiation = { module = "io.ktor:ktor-client-content-negotiation-jvm", version.ref = "ktor" } kfswatch = { module = "io.github.irgaly.kfswatch:kfswatch", version.ref = "kfs" } -koin-ktor = { module = "io.insert-koin:koin-ktor", version.ref = "koin" } +koin-ktor = { module = "io.insert-koin:koin-ktor3", version.ref = "koin" } koin-logger = { module = "io.insert-koin:koin-logger-slf4j", version.ref = "koin" } logback = { module = "ch.qos.logback:logback-classic", version.ref = "logback" } jbcrypt = { module = "org.mindrot:jbcrypt", version.ref = "jbcrypt" } @@ -36,7 +38,18 @@ krontab = { module = "dev.inmo:krontab", version.ref = "krontab" } jupyter = { module = "org.junit.jupiter:junit-jupiter", version.ref = "jupyter" } [bundles] -ktor-server = ["ktor-server-core", "ktor-server-cors", "ktor-server-content-negotiations", "ktor-server-logging", "ktor-server-netty", "ktor-server-auth", "ktor-server-auth-jwt", "ktor-server-status-pages"] +ktor-server = [ + "ktor-server-core", + "ktor-server-cors", + "ktor-server-content-negotiations", + "ktor-server-logging", + "ktor-server-netty", + "ktor-server-auth", + "ktor-server-auth-jwt", + "ktor-server-status-pages", + "ktor-server-sse", + "ktor-server-rate-limit" +] ktor-serialization = ["ktor-serialization-kotlinx", "ktor-serialization-jackson"] ktor-client = ["ktor-client-cio", "ktor-client-core", "ktor-client-content-negotiation"] koin = ["koin-ktor", "koin-logger"] diff --git a/kabot-db-connector b/kabot-db-connector index ee17c71..651d046 160000 --- a/kabot-db-connector +++ b/kabot-db-connector @@ -1 +1 @@ -Subproject commit ee17c718d1f583bbe73dda444bab81fb8ebf7c02 +Subproject commit 651d046d13e15e7119a532f14764203116bde509 diff --git a/src/main/kotlin/org/wagham/kabotapi/KabotApiApplication.kt b/src/main/kotlin/org/wagham/kabotapi/KabotApiApplication.kt index 59f75ef..3df9ba0 100644 --- a/src/main/kotlin/org/wagham/kabotapi/KabotApiApplication.kt +++ b/src/main/kotlin/org/wagham/kabotapi/KabotApiApplication.kt @@ -5,6 +5,7 @@ import org.wagham.kabotapi.configuration.configureExceptions import org.wagham.kabotapi.configuration.configureHTTP import org.wagham.kabotapi.configuration.configureKoin import org.wagham.kabotapi.configuration.configureRouting +import org.wagham.kabotapi.configuration.configureThrottling fun main(args: Array) { io.ktor.server.netty.EngineMain.main(args) @@ -15,5 +16,6 @@ fun Application.module() { configureHTTP() configureKoin() configureExceptions() + configureThrottling() configureRouting() } \ No newline at end of file diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceConfigManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceConfigManager.kt index dbae559..03d4985 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceConfigManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceConfigManager.kt @@ -1,5 +1,6 @@ package org.wagham.kabotapi.components +import dev.inmo.krontab.doInfinity import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import io.github.irgaly.kfswatch.KfsDirectoryWatcher @@ -9,6 +10,7 @@ import kotlinx.coroutines.launch import kotlinx.serialization.json.Json import org.wagham.kabotapi.entities.foundry.FoundryOptions import java.io.File +import java.nio.file.Files class InstanceConfigManager( private val baseFolder: String @@ -22,6 +24,8 @@ class InstanceConfigManager( private val scope = CoroutineScope(Dispatchers.IO) private val watcher = KfsDirectoryWatcher(scope) private val instancesByUrl = mutableMapOf() + private val urlById = mutableMapOf() + private val sizeByInstanceId = mutableMapOf() fun startWatching() { logger.info("Starting instance manager") @@ -51,9 +55,38 @@ class InstanceConfigManager( } } } + startWatchingInstanceSize() } - fun getInfoByUrl(url: String): InstanceInfo? = instancesByUrl[url] + fun getConfigByUrl(url: String): InstanceInfo? = instancesByUrl[url] + + fun getConfigById(id: String): InstanceInfo? = urlById[id]?.let { + getConfigByUrl(it) + } + + fun getFolderSizeById(id: String): Long? = sizeByInstanceId[id] + + private fun getFolderSize(folder: File): Long = folder.walkTopDown().filter { + it.isFile && !Files.isSymbolicLink(it.toPath()) + }.sumOf { it.length() } + + private fun computeInstancesSize() { +// File(baseFolder).listFiles().filter { +// it.isDirectory +// }.onEach { +// val size = getFolderSize(it) +// sizeByInstanceId[it.name] = size +// } + } + + private fun startWatchingInstanceSize() { + computeInstancesSize() + scope.launch { + doInfinity("0 0 * * * *") { + computeInstancesSize() + } + } + } private fun updateInstancesWith(optionsFile: File) { val options = Json.decodeFromString(optionsFile.readText()) @@ -61,7 +94,9 @@ class InstanceConfigManager( id = options.dataPath.split("/").last(), url = options.routePrefix, name = options.masterName ?: "unknown", + domain = options.domain ) + urlById[info.id] = info.url instancesByUrl[info.url] = info.also { logger.info("Updating ${info.url} with $it") } @@ -70,7 +105,8 @@ class InstanceConfigManager( data class InstanceInfo( val id: String, val url: String, - val name: String + val name: String, + val domain: String? ) } \ No newline at end of file diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt index 2578418..cb27dc2 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceInactivityManager.kt @@ -27,7 +27,7 @@ class InstanceInactivityManager( private val enableLogging: Boolean, ) { - private val urlExtractingRegex = Regex(".* \"https://fnd\\.kaironbot\\.net/([^/]+).*") + private val urlExtractingRegex = Regex(".* \"https://fnd\\.[^/]+/([^/]+).*") private val managerScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private val logger = KtorSimpleLogger(this.javaClass.simpleName) private val instanceActivity = Caffeine.newBuilder() @@ -95,11 +95,11 @@ class InstanceInactivityManager( } } - + fun getLastActivityOf(instanceId: String): Long? = instanceActivity.getIfPresent(instanceId) private fun getInstanceFromLog(log: String): InstanceConfigManager.InstanceInfo? = urlExtractingRegex.find(log)?.groupValues?.get(1)?.let { - instanceConfigManager.getInfoByUrl(it) + instanceConfigManager.getConfigByUrl(it) }?.takeIf { it.id !in excludedInstances } diff --git a/src/main/kotlin/org/wagham/kabotapi/components/JWTManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/JWTManager.kt index b776044..2df614d 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/JWTManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/JWTManager.kt @@ -5,7 +5,6 @@ import com.auth0.jwt.JWTVerifier import com.auth0.jwt.algorithms.Algorithm import com.auth0.jwt.interfaces.Payload import io.ktor.server.auth.jwt.* -import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import org.wagham.kabotapi.entities.config.JWTConfig import org.wagham.kabotapi.entities.security.JWTClaims @@ -52,14 +51,16 @@ class JWTManager( .sign(Algorithm.HMAC256(config.authSecret)) /** - * @return a [JWTVerifier] for the authentication jwt. + * a [JWTVerifier] for the authentication jwt. */ - fun authJWTVerifier(): JWTVerifier = JWT + val authJWTVerifier: JWTVerifier = JWT .require(Algorithm.HMAC256(config.authSecret)) .withAudience(config.audience) .withIssuer(config.issuer) .build() + fun decodeAndGetClaims(token: String): JWTClaims = authJWTVerifier.verify(token).toJWTClaims() + private fun Payload.isAuthJwtValid() = getClaim(USER_ID).asString().isNotBlank() && getClaim(GUILD_ID).asString().isNotBlank() diff --git a/src/main/kotlin/org/wagham/kabotapi/configuration/HttpConfig.kt b/src/main/kotlin/org/wagham/kabotapi/configuration/HttpConfig.kt index f204cbc..cba531f 100644 --- a/src/main/kotlin/org/wagham/kabotapi/configuration/HttpConfig.kt +++ b/src/main/kotlin/org/wagham/kabotapi/configuration/HttpConfig.kt @@ -34,7 +34,7 @@ fun Application.configureHTTP() { install(Authentication) { jwt(AUTH_CTX) { realm = jwtManager.config.realm - verifier(jwtManager.authJWTVerifier()) + verifier(jwtManager.authJWTVerifier) validate { credential -> jwtManager.authCredentialToPrincipal(credential) diff --git a/src/main/kotlin/org/wagham/kabotapi/configuration/KoinConfig.kt b/src/main/kotlin/org/wagham/kabotapi/configuration/KoinConfig.kt index 2ab3d67..4355bd7 100644 --- a/src/main/kotlin/org/wagham/kabotapi/configuration/KoinConfig.kt +++ b/src/main/kotlin/org/wagham/kabotapi/configuration/KoinConfig.kt @@ -20,6 +20,7 @@ import org.wagham.kabotapi.entities.config.MongoConfig import org.wagham.kabotapi.entities.config.SocketConfig import org.wagham.kabotapi.logic.CharacterLogic import org.wagham.kabotapi.logic.DiscordLogic +import org.wagham.kabotapi.logic.FoundryLogic import org.wagham.kabotapi.logic.ItemLogic import org.wagham.kabotapi.logic.LabelLogic import org.wagham.kabotapi.logic.PlayerLogic @@ -27,6 +28,7 @@ import org.wagham.kabotapi.logic.SessionLogic import org.wagham.kabotapi.logic.UtilitiesLogic import org.wagham.kabotapi.logic.impl.CharacterLogicImpl import org.wagham.kabotapi.logic.impl.DiscordLogicImpl +import org.wagham.kabotapi.logic.impl.FoundryLogicImpl import org.wagham.kabotapi.logic.impl.ItemLogicImpl import org.wagham.kabotapi.logic.impl.LabelLogicImpl import org.wagham.kabotapi.logic.impl.PlayerLogicImpl @@ -64,6 +66,15 @@ fun applicationModules( ) } + single { + FoundryLogicImpl( + defaultDomain = foundryConfig.defaultDomain, + instanceConfigManager = get(), + instanceInactivityManager = get(), + commandComponent = get(), + excludedInstances = foundryConfig.excludedInstances + ) + } single { DiscordLogicImpl(get(), discordConfig)} single { CharacterLogicImpl(get(), get()) } single { LabelLogicImpl(get()) } diff --git a/src/main/kotlin/org/wagham/kabotapi/configuration/RouteConfig.kt b/src/main/kotlin/org/wagham/kabotapi/configuration/RouteConfig.kt index af4cf8b..c5be98b 100644 --- a/src/main/kotlin/org/wagham/kabotapi/configuration/RouteConfig.kt +++ b/src/main/kotlin/org/wagham/kabotapi/configuration/RouteConfig.kt @@ -4,6 +4,7 @@ import io.ktor.serialization.kotlinx.json.* import io.ktor.server.application.* import io.ktor.server.plugins.contentnegotiation.* import io.ktor.server.routing.* +import io.ktor.server.sse.SSE import kotlinx.serialization.json.Json import kotlinx.serialization.modules.SerializersModule import org.wagham.kabotapi.controllers.* @@ -18,9 +19,12 @@ fun Application.configureRouting() { } }) } + install(SSE) + routing { authController() characterController() + foundryController() guildController() itemController() labelController() diff --git a/src/main/kotlin/org/wagham/kabotapi/configuration/ThrottlingConfig.kt b/src/main/kotlin/org/wagham/kabotapi/configuration/ThrottlingConfig.kt new file mode 100644 index 0000000..766fcb3 --- /dev/null +++ b/src/main/kotlin/org/wagham/kabotapi/configuration/ThrottlingConfig.kt @@ -0,0 +1,24 @@ +package org.wagham.kabotapi.configuration + +import io.ktor.server.application.Application +import io.ktor.server.application.install +import io.ktor.server.plugins.ratelimit.RateLimit +import io.ktor.server.plugins.ratelimit.RateLimitName +import io.ktor.server.request.path +import kotlin.time.Duration.Companion.seconds + +const val INACTIVE_RATE_LIMIT = "inactivity" + +fun Application.configureThrottling() { + val inactivityRegex = Regex(".*/foundry/inactive/([^/]+)") + install(RateLimit) { + register(RateLimitName(INACTIVE_RATE_LIMIT)) { + rateLimiter(limit = 10, refillPeriod = 60.seconds) + requestKey { applicationCall -> + val instanceOrNull = inactivityRegex.find(applicationCall.request.path()) + ?.groupValues?.getOrNull(1) + instanceOrNull ?: "UNKNOWN" + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt new file mode 100644 index 0000000..4ab9cb2 --- /dev/null +++ b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt @@ -0,0 +1,68 @@ +package org.wagham.kabotapi.controllers + +import io.ktor.server.plugins.ratelimit.RateLimitName +import io.ktor.server.plugins.ratelimit.rateLimit +import io.ktor.server.response.respond +import io.ktor.server.response.respondRedirect +import io.ktor.server.routing.Routing +import io.ktor.server.routing.get +import io.ktor.server.routing.route +import io.ktor.server.sse.sse +import korlibs.time.seconds +import kotlinx.coroutines.delay +import kotlinx.serialization.json.Json +import org.koin.ktor.ext.inject +import org.wagham.db.enums.NyxRoles +import org.wagham.kabotapi.components.JWTManager +import org.wagham.kabotapi.configuration.INACTIVE_RATE_LIMIT +import org.wagham.kabotapi.exceptions.JWTException +import org.wagham.kabotapi.logic.FoundryLogic +import org.wagham.kabotapi.utils.authenticatedPost +import kotlin.getValue + +fun Routing.foundryController() = route("/foundry") { + val foundryLogic by inject() + val jwtManager by inject() + + rateLimit(RateLimitName(INACTIVE_RATE_LIMIT)) { + get("/inactive/{instanceUrl}") { + val instanceUrl = checkNotNull(call.parameters["instanceUrl"]) { + "Instance URL must not be null" + } + delay(1.seconds) + call.respondRedirect(foundryLogic.startInstance(instanceUrl), permanent = false) + } + } + + rateLimit(RateLimitName(INACTIVE_RATE_LIMIT)) { + get("/inactive/{instanceUrl}/{other}") { + val instanceUrl = checkNotNull(call.parameters["instanceUrl"]) { + "Instance URL must not be null" + } + delay(1.seconds) + call.respondRedirect(foundryLogic.startInstance(instanceUrl), permanent = false) + } + } + + authenticatedPost("/{instanceId}", roles = setOf(NyxRoles.MANAGE_FOUNDRY)) { + val instanceId = checkNotNull(call.parameters["instanceId"]) { + "Instance ID must not be null" + } + foundryLogic.stopInstance(instanceId) + call.respond("ok") + } + + sse("/info") { + val jwt = call.parameters["jwt"] ?: throw JWTException("Missing JWT parameter") + val claims = jwtManager.decodeAndGetClaims(jwt) + if(!claims.roles.contains(NyxRoles.MANAGE_FOUNDRY)) { + throw IllegalAccessException("You are not authorized to access this endpoint") + } + foundryLogic.subscribeToInfoChannel().collect { + send( + data = Json.encodeToString(it), + event = "foundry-info" + ) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/wagham/kabotapi/controllers/GuildController.kt b/src/main/kotlin/org/wagham/kabotapi/controllers/GuildController.kt index 8386cb1..9f9ffa0 100644 --- a/src/main/kotlin/org/wagham/kabotapi/controllers/GuildController.kt +++ b/src/main/kotlin/org/wagham/kabotapi/controllers/GuildController.kt @@ -1,6 +1,5 @@ package org.wagham.kabotapi.controllers -import io.ktor.server.application.* import io.ktor.server.response.* import io.ktor.server.routing.* import org.koin.ktor.ext.inject diff --git a/src/main/kotlin/org/wagham/kabotapi/entities/config/FoundryConfig.kt b/src/main/kotlin/org/wagham/kabotapi/entities/config/FoundryConfig.kt index 458da8f..54261de 100644 --- a/src/main/kotlin/org/wagham/kabotapi/entities/config/FoundryConfig.kt +++ b/src/main/kotlin/org/wagham/kabotapi/entities/config/FoundryConfig.kt @@ -9,10 +9,12 @@ data class FoundryConfig( val instanceTtl: Duration, val excludedInstances: Set, val enableLogging: Boolean, + val defaultDomain: String ) { companion object { fun fromConfig(config: ApplicationConfig) = FoundryConfig( instanceFolder = config.property("foundry.instanceFolder").getString(), + defaultDomain = config.property("foundry.defaultDomain").getString(), enableLogging = config.property("foundry.enableLogging").getString().toBoolean(), instanceTtl = config.property("foundry.instanceTtlInMinutes").getString().toInt().minutes, excludedInstances = config.property("foundry.excludedInstances").getString() diff --git a/src/main/kotlin/org/wagham/kabotapi/entities/dto/InstanceInfoDto.kt b/src/main/kotlin/org/wagham/kabotapi/entities/dto/InstanceInfoDto.kt new file mode 100644 index 0000000..2584af7 --- /dev/null +++ b/src/main/kotlin/org/wagham/kabotapi/entities/dto/InstanceInfoDto.kt @@ -0,0 +1,15 @@ +package org.wagham.kabotapi.entities.dto + +import kotlinx.serialization.Serializable + +@Serializable +data class InstanceInfoDto( + val id: String, + val url: String, + val status: String, + val masterName: String, + val cpu: Double, + val memory: Long, + val uptime: Long, + val diskSize: Long, +) diff --git a/src/main/kotlin/org/wagham/kabotapi/entities/foundry/FoundryOptions.kt b/src/main/kotlin/org/wagham/kabotapi/entities/foundry/FoundryOptions.kt index bbcf58c..ea31229 100644 --- a/src/main/kotlin/org/wagham/kabotapi/entities/foundry/FoundryOptions.kt +++ b/src/main/kotlin/org/wagham/kabotapi/entities/foundry/FoundryOptions.kt @@ -29,5 +29,6 @@ data class FoundryOptions( val hotReload: Boolean = false, val protocol: String? = null, val telemetry: Boolean = false, - val masterName: String? = null + val masterName: String? = null, + val domain: String? = null, ) \ No newline at end of file diff --git a/src/main/kotlin/org/wagham/kabotapi/logic/FoundryLogic.kt b/src/main/kotlin/org/wagham/kabotapi/logic/FoundryLogic.kt new file mode 100644 index 0000000..5097640 --- /dev/null +++ b/src/main/kotlin/org/wagham/kabotapi/logic/FoundryLogic.kt @@ -0,0 +1,30 @@ +package org.wagham.kabotapi.logic + +import kotlinx.coroutines.flow.SharedFlow +import org.wagham.kabotapi.entities.dto.InstanceInfoDto +import org.wagham.kabotapi.exceptions.NotFoundException + +interface FoundryLogic { + + /** + * Send a start signal for the Foundry instance with the specified [instanceUrl], if it's not already active. + * + * @param instanceUrl the url of the instance to active, without the domain. + * @return the url of the instance activated, with the full domain. + * @throws NotFoundException if no instance exists with that URL. + * @throws IllegalAccessException if the instance cannot be started. + */ + suspend fun startInstance(instanceUrl: String): String + + /** + * Stops a running foundry instance, given its id. + * + * @param instanceId the id of the foundry instance. + */ + suspend fun stopInstance(instanceId: String) + + /** + * @return a [SharedFlow] that publishes the current status of all the foundry instances. + */ + fun subscribeToInfoChannel(): SharedFlow> +} \ No newline at end of file diff --git a/src/main/kotlin/org/wagham/kabotapi/logic/impl/FoundryLogicImpl.kt b/src/main/kotlin/org/wagham/kabotapi/logic/impl/FoundryLogicImpl.kt new file mode 100644 index 0000000..ea0fb2e --- /dev/null +++ b/src/main/kotlin/org/wagham/kabotapi/logic/impl/FoundryLogicImpl.kt @@ -0,0 +1,91 @@ +package org.wagham.kabotapi.logic.impl + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.wagham.kabotapi.components.InstanceConfigManager +import org.wagham.kabotapi.components.InstanceInactivityManager +import org.wagham.kabotapi.components.socket.CommandComponent +import org.wagham.kabotapi.components.socket.Pm2ListCommand +import org.wagham.kabotapi.components.socket.Pm2StartCommand +import org.wagham.kabotapi.entities.dto.InstanceInfoDto +import org.wagham.kabotapi.exceptions.NotFoundException +import org.wagham.kabotapi.logic.FoundryLogic +import java.util.concurrent.atomic.AtomicReference +import kotlin.time.Duration.Companion.minutes + +class FoundryLogicImpl( + private val defaultDomain: String, + private val instanceConfigManager: InstanceConfigManager, + private val instanceInactivityManager: InstanceInactivityManager, + private val commandComponent: CommandComponent, + private val excludedInstances: Set, +) : FoundryLogic { + + private val logicScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) + private val publishingJob = AtomicReference(null) + private val infoChannel = MutableStateFlow>(emptyMap()) + + override fun subscribeToInfoChannel(): SharedFlow> { + val currentJob = publishingJob.get() + if (currentJob == null || currentJob.isCompleted) { + val newJob = logicScope.launch(start = CoroutineStart.LAZY) { + do { + val newState = commandComponent + .sendSocketCommand(Pm2ListCommand()) + .associateTo(HashMap()) { info -> + val config = instanceConfigManager.getConfigById(info.name) + val infoDto = InstanceInfoDto( + id = info.name, + url = config?.fullUrl ?: defaultDomain, + masterName = config?.name ?: "unknown", + status = info.pm2Env.status, + cpu = info.monit.cpu, + memory = info.monit.memory, + uptime = info.pm2Env.uptime, + diskSize = instanceConfigManager.getFolderSizeById(info.name) ?: 0 + ) + info.name to infoDto + } + infoChannel.emit(newState) + delay(1.minutes) + } while (isActive && infoChannel.subscriptionCount.value > 0) + } + if (publishingJob.compareAndSet(currentJob, newJob)) { + newJob.start() + } else { + newJob.cancel() + } + } + return infoChannel.asSharedFlow() + } + + override suspend fun startInstance(instanceUrl: String): String { + val instanceInfo = instanceConfigManager.getConfigByUrl(instanceUrl) + ?: throw NotFoundException("Instance not found: $instanceUrl") + if (instanceInfo.id in excludedInstances) { + throw IllegalAccessException("You cannot start this instance") + } + if (instanceInactivityManager.getLastActivityOf(instanceInfo.id) == null) { + commandComponent.sendSocketCommand(Pm2StartCommand(instanceInfo.id)) + } + return instanceInfo.fullUrl + } + + override suspend fun stopInstance(instanceId: String) { + if (instanceInactivityManager.getLastActivityOf(instanceId) != null) { + commandComponent.sendSocketCommand(Pm2StartCommand(instanceId)) + } + } + + private val InstanceConfigManager.InstanceInfo.fullUrl: String + get() = "${domain ?: defaultDomain}/${url}" +} \ No newline at end of file From eddf7692dbb9b8e03420423703a7626cffb70592 Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Thu, 24 Apr 2025 21:41:44 +0200 Subject: [PATCH 14/20] increased wait before redirect --- .../org/wagham/kabotapi/controllers/FoundryController.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt index 4ab9cb2..1339848 100644 --- a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt +++ b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt @@ -29,7 +29,7 @@ fun Routing.foundryController() = route("/foundry") { val instanceUrl = checkNotNull(call.parameters["instanceUrl"]) { "Instance URL must not be null" } - delay(1.seconds) + delay(3.seconds) call.respondRedirect(foundryLogic.startInstance(instanceUrl), permanent = false) } } @@ -39,7 +39,7 @@ fun Routing.foundryController() = route("/foundry") { val instanceUrl = checkNotNull(call.parameters["instanceUrl"]) { "Instance URL must not be null" } - delay(1.seconds) + delay(3.seconds) call.respondRedirect(foundryLogic.startInstance(instanceUrl), permanent = false) } } From 48ecc1b847be29f57ff7be522e6b46a6db49601f Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Thu, 24 Apr 2025 22:11:08 +0200 Subject: [PATCH 15/20] Updated message receiving in socket --- .../components/socket/AbstractUdpListener.kt | 29 +++++++++++-------- .../kabotapi/controllers/FoundryController.kt | 4 +-- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractUdpListener.kt b/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractUdpListener.kt index c0dda25..ea6368d 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractUdpListener.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/socket/AbstractUdpListener.kt @@ -25,19 +25,24 @@ abstract class AbstractUdpListener( try { val packet = DatagramPacket(rcvBuffer, rcvBuffer.size) socket.receive(packet) - val idx = packet.data.indexOf('\n'.code.toByte()) - val received = if (idx == -1) null else buffer + String(packet.data, 0, idx) - buffer = - if (idx == -1) buffer + String(packet.data, 0, packet.length) - else String(packet.data, idx + 1, packet.length - idx - 1) - if (enableLogging) { - logger.info("Buffer: $buffer") - } - if (received != null) { - if (enableLogging) { - logger.info("Received: $received") + var data = packet.data.sliceArray(0 until packet.length) + while (data.isNotEmpty()) { + val idx = data.indexOf('\n'.code.toByte()) + + if (idx == -1) { + buffer += String(data, 0, data.size) + break + } else { + val line = buffer + String(data, 0, idx) + data = data.sliceArray(idx + 1 until data.size) + buffer = "" + if (line.isNotEmpty()) { + if (enableLogging) { + logger.info("Received: $line") + } + handlePacket(line) + } } - handlePacket(received) } } catch (e: Exception) { logger.error("Cannot receive packet", e) diff --git a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt index 1339848..d40fcf8 100644 --- a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt +++ b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt @@ -29,7 +29,7 @@ fun Routing.foundryController() = route("/foundry") { val instanceUrl = checkNotNull(call.parameters["instanceUrl"]) { "Instance URL must not be null" } - delay(3.seconds) + delay(10.seconds) call.respondRedirect(foundryLogic.startInstance(instanceUrl), permanent = false) } } @@ -39,7 +39,7 @@ fun Routing.foundryController() = route("/foundry") { val instanceUrl = checkNotNull(call.parameters["instanceUrl"]) { "Instance URL must not be null" } - delay(3.seconds) + delay(10.seconds) call.respondRedirect(foundryLogic.startInstance(instanceUrl), permanent = false) } } From 9253436c993ca3ed4eab161bfb23f28532843e71 Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Thu, 24 Apr 2025 22:38:54 +0200 Subject: [PATCH 16/20] Instance start should happen BEFORE waiting for redirect --- .../wagham/kabotapi/controllers/FoundryController.kt | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt index d40fcf8..f41bf44 100644 --- a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt +++ b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt @@ -29,8 +29,9 @@ fun Routing.foundryController() = route("/foundry") { val instanceUrl = checkNotNull(call.parameters["instanceUrl"]) { "Instance URL must not be null" } - delay(10.seconds) - call.respondRedirect(foundryLogic.startInstance(instanceUrl), permanent = false) + val redirectUrl = foundryLogic.startInstance(instanceUrl) + delay(2.seconds) + call.respondRedirect(redirectUrl, permanent = false) } } @@ -39,8 +40,9 @@ fun Routing.foundryController() = route("/foundry") { val instanceUrl = checkNotNull(call.parameters["instanceUrl"]) { "Instance URL must not be null" } - delay(10.seconds) - call.respondRedirect(foundryLogic.startInstance(instanceUrl), permanent = false) + val redirectUrl = foundryLogic.startInstance(instanceUrl) + delay(2.seconds) + call.respondRedirect(redirectUrl, permanent = false) } } From ecd89e35bab54de3b8c046d5ffe39f54d4b15105 Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Thu, 24 Apr 2025 22:48:35 +0200 Subject: [PATCH 17/20] Updated wait before redirect --- .../org/wagham/kabotapi/controllers/FoundryController.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt index f41bf44..444d319 100644 --- a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt +++ b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt @@ -30,7 +30,7 @@ fun Routing.foundryController() = route("/foundry") { "Instance URL must not be null" } val redirectUrl = foundryLogic.startInstance(instanceUrl) - delay(2.seconds) + delay(10.seconds) call.respondRedirect(redirectUrl, permanent = false) } } @@ -41,7 +41,7 @@ fun Routing.foundryController() = route("/foundry") { "Instance URL must not be null" } val redirectUrl = foundryLogic.startInstance(instanceUrl) - delay(2.seconds) + delay(10.seconds) call.respondRedirect(redirectUrl, permanent = false) } } From f12096103264ed6fe04c2cbc900004b134b431ae Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Thu, 24 Apr 2025 23:06:32 +0200 Subject: [PATCH 18/20] Updated delay before redirect --- .../org/wagham/kabotapi/controllers/FoundryController.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt index 444d319..bd02406 100644 --- a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt +++ b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt @@ -30,7 +30,7 @@ fun Routing.foundryController() = route("/foundry") { "Instance URL must not be null" } val redirectUrl = foundryLogic.startInstance(instanceUrl) - delay(10.seconds) + delay(2.5.seconds) call.respondRedirect(redirectUrl, permanent = false) } } @@ -41,7 +41,7 @@ fun Routing.foundryController() = route("/foundry") { "Instance URL must not be null" } val redirectUrl = foundryLogic.startInstance(instanceUrl) - delay(10.seconds) + delay(2.5.seconds) call.respondRedirect(redirectUrl, permanent = false) } } From fcf1223cb6d0909202f90a54cc9946f261fcaadf Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Thu, 1 May 2025 10:31:09 +0200 Subject: [PATCH 19/20] Small fix to redirect --- .../kabotapi/controllers/FoundryController.kt | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt index bd02406..1c30840 100644 --- a/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt +++ b/src/main/kotlin/org/wagham/kabotapi/controllers/FoundryController.kt @@ -3,13 +3,10 @@ package org.wagham.kabotapi.controllers import io.ktor.server.plugins.ratelimit.RateLimitName import io.ktor.server.plugins.ratelimit.rateLimit import io.ktor.server.response.respond -import io.ktor.server.response.respondRedirect import io.ktor.server.routing.Routing import io.ktor.server.routing.get import io.ktor.server.routing.route import io.ktor.server.sse.sse -import korlibs.time.seconds -import kotlinx.coroutines.delay import kotlinx.serialization.json.Json import org.koin.ktor.ext.inject import org.wagham.db.enums.NyxRoles @@ -30,19 +27,7 @@ fun Routing.foundryController() = route("/foundry") { "Instance URL must not be null" } val redirectUrl = foundryLogic.startInstance(instanceUrl) - delay(2.5.seconds) - call.respondRedirect(redirectUrl, permanent = false) - } - } - - rateLimit(RateLimitName(INACTIVE_RATE_LIMIT)) { - get("/inactive/{instanceUrl}/{other}") { - val instanceUrl = checkNotNull(call.parameters["instanceUrl"]) { - "Instance URL must not be null" - } - val redirectUrl = foundryLogic.startInstance(instanceUrl) - delay(2.5.seconds) - call.respondRedirect(redirectUrl, permanent = false) + call.respond(redirectUrl) } } From 0e34e96460879ddc3a2d626d4a23af0f3d3800cf Mon Sep 17 00:00:00 2001 From: Vincenzo Pierro Date: Thu, 1 May 2025 11:30:41 +0200 Subject: [PATCH 20/20] Unblocked size computation --- .../kabotapi/components/InstanceConfigManager.kt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/org/wagham/kabotapi/components/InstanceConfigManager.kt b/src/main/kotlin/org/wagham/kabotapi/components/InstanceConfigManager.kt index 03d4985..b97be6f 100644 --- a/src/main/kotlin/org/wagham/kabotapi/components/InstanceConfigManager.kt +++ b/src/main/kotlin/org/wagham/kabotapi/components/InstanceConfigManager.kt @@ -71,12 +71,12 @@ class InstanceConfigManager( }.sumOf { it.length() } private fun computeInstancesSize() { -// File(baseFolder).listFiles().filter { -// it.isDirectory -// }.onEach { -// val size = getFolderSize(it) -// sizeByInstanceId[it.name] = size -// } + File(baseFolder).listFiles().filter { + it.isDirectory + }.onEach { + val size = getFolderSize(it) + sizeByInstanceId[it.name] = size + } } private fun startWatchingInstanceSize() {