From 35d220eaa37b4ab2ece2585fb276ff9c4bcfc88a Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 6 May 2026 20:57:32 +0200 Subject: [PATCH 1/2] perf: speed up recommendation cache misses --- .../dev/typetype/server/ServiceRegistry.kt | 3 -- .../services/HomeRecommendationBuilder.kt | 4 +-- .../HomeRecommendationCandidateService.kt | 14 ++++++++ .../services/HomeRecommendationPoolBuilder.kt | 2 +- .../services/HomeRecommendationPoolCache.kt | 2 +- .../services/HomeRecommendationPoolMode.kt | 1 + .../HomeRecommendationPoolResolver.kt | 16 +++------ ...eRecommendationPoolResolverDependencies.kt | 1 - .../HomeRecommendationSignalContextService.kt | 25 -------------- .../HomeRecommendationUserSignalService.kt | 33 ++++++++++++++----- .../services/SubscriptionFeedService.kt | 2 +- .../services/SubscriptionShortsFeedService.kt | 19 ++++++++--- .../HomeRecommendationCandidateServiceTest.kt | 10 +++--- .../server/HomeRecommendationTestFixtures.kt | 2 -- 14 files changed, 68 insertions(+), 66 deletions(-) delete mode 100644 src/main/kotlin/dev/typetype/server/services/HomeRecommendationSignalContextService.kt diff --git a/src/main/kotlin/dev/typetype/server/ServiceRegistry.kt b/src/main/kotlin/dev/typetype/server/ServiceRegistry.kt index d75ec8a..d2fd3b9 100644 --- a/src/main/kotlin/dev/typetype/server/ServiceRegistry.kt +++ b/src/main/kotlin/dev/typetype/server/ServiceRegistry.kt @@ -31,7 +31,6 @@ import dev.typetype.server.services.PipePipeSuggestionService import dev.typetype.server.services.PipePipeTrendingService import dev.typetype.server.services.PlaylistService import dev.typetype.server.services.ProgressService -import dev.typetype.server.services.HomeRecommendationSignalContextService import dev.typetype.server.services.SearchHistoryService import dev.typetype.server.services.SettingsService import dev.typetype.server.services.HomeRecommendationPoolResolverDependencies @@ -83,7 +82,6 @@ internal class ServiceRegistry(cache: DragonflyService, subtitleServiceUrl: Stri val searchHistoryService = SearchHistoryService() val blockedService = BlockedService() val bugReportService = BugReportService() - val recommendationSignalContextService = HomeRecommendationSignalContextService(subscriptionsService, historyService, favoritesService) val youtubeTakeoutImportService = YoutubeTakeoutFactory.create(subscriptionsService, playlistService, historyService, favoritesService, watchLaterService) val recommendationPoolResolverDependencies = HomeRecommendationPoolResolverDependencies( subscriptionsService = subscriptionsService, @@ -93,7 +91,6 @@ internal class ServiceRegistry(cache: DragonflyService, subtitleServiceUrl: Stri favoritesService = favoritesService, watchLaterService = watchLaterService, blockedService = blockedService, - signalContextService = recommendationSignalContextService, streamService = streamService, cache = cache, ) diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationBuilder.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationBuilder.kt index 3185e8d..5537eac 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationBuilder.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationBuilder.kt @@ -10,7 +10,6 @@ class HomeRecommendationBuilder( private val favoritesService: FavoritesService, private val watchLaterService: WatchLaterService, private val blockedService: BlockedService, - private val signalContextService: HomeRecommendationSignalContextService, private val streamService: StreamService, ) { suspend fun build( @@ -26,13 +25,12 @@ class HomeRecommendationBuilder( watchLaterService = watchLaterService, blockedService = blockedService, ) - val profile = signalService.loadProfile(userId = userId) + val (profile, signalContext) = signalService.load(userId = userId) val candidates = HomeRecommendationCandidateService( subscriptionFeedService = subscriptionFeedService, subscriptionShortsFeedService = subscriptionShortsFeedService, streamService = streamService, ) - val signalContext = signalContextService.load(userId) val candidatePool = candidates.fetchCandidates( userId = userId, serviceId = serviceId, diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationCandidateService.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationCandidateService.kt index e826b79..48aa462 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationCandidateService.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationCandidateService.kt @@ -18,6 +18,17 @@ class HomeRecommendationCandidateService( mode: HomeRecommendationPoolMode, signalContext: HomeRecommendationSignalContext = HomeRecommendationSignalContext(), ): HomeRecommendationCandidatePool { + if (mode == HomeRecommendationPoolMode.FAST_SHORTS) { + val subscriptions = subscriptionShortsFeedService.getCachedFeed( + userId = userId, + page = 0, + limit = HomeRecommendationCandidateLimits.FAST_SUBSCRIPTION_PAGE_SIZE, + ) + ?.videos + .orEmpty() + .map { HomeRecommendationTaggedVideo(it, HomeRecommendationSourceTag.SUBSCRIPTION) } + return HomeRecommendationCandidatePool(subscriptions = subscriptions, discovery = emptyList()) + } if (mode == HomeRecommendationPoolMode.SHORTS) { if (serviceId != YOUTUBE_SERVICE_ID) { return HomeRecommendationCandidatePool(subscriptions = emptyList(), discovery = emptyList()) @@ -26,6 +37,9 @@ class HomeRecommendationCandidateService( } val subscriptions = fetchSubscriptionCandidates(userId, mode) .map { HomeRecommendationTaggedVideo(it, HomeRecommendationSourceTag.SUBSCRIPTION) } + if (mode == HomeRecommendationPoolMode.FAST) { + return HomeRecommendationCandidatePool(subscriptions = subscriptions, discovery = emptyList()) + } val subscriptionSeeds = subscriptions.map { it.video.url } val relatedFromSubscriptions = relatedCandidateService.fetch( seedUrls = subscriptionSeeds, diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolBuilder.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolBuilder.kt index 629bd21..d2550f1 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolBuilder.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolBuilder.kt @@ -11,7 +11,7 @@ class HomeRecommendationPoolBuilder { context: HomeRecommendationSessionContext, mode: HomeRecommendationPoolMode = HomeRecommendationPoolMode.FULL, ): HomeRecommendationPool { - val shortsMode = mode == HomeRecommendationPoolMode.SHORTS + val shortsMode = mode == HomeRecommendationPoolMode.SHORTS || mode == HomeRecommendationPoolMode.FAST_SHORTS val subscriptionsScored = scoreAndFilter( candidates = subscriptionCandidates, profile = profile, diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolCache.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolCache.kt index b520f80..bdcdd27 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolCache.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolCache.kt @@ -24,7 +24,7 @@ class HomeRecommendationPoolCache(private val cache: dev.typetype.server.cache.C } companion object { - private const val CACHE_TTL_SECONDS = 300L + private const val CACHE_TTL_SECONDS = 900L private const val CACHE_VERSION = 8 } } diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolMode.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolMode.kt index d73d2ad..61dd755 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolMode.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolMode.kt @@ -3,5 +3,6 @@ package dev.typetype.server.services enum class HomeRecommendationPoolMode { FULL, FAST, + FAST_SHORTS, SHORTS, } diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolver.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolver.kt index 085e9fd..7e4ded5 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolver.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolver.kt @@ -7,7 +7,6 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.launch -import kotlinx.coroutines.withTimeoutOrNull class HomeRecommendationPoolResolver( private val dependencies: HomeRecommendationPoolResolverDependencies, @@ -31,13 +30,12 @@ class HomeRecommendationPoolResolver( val cached = poolCache.read(key) if (cached != null) return cached val fullBuild = fullBuild(key, userId, serviceId, mode, context) - val quickFull = withTimeoutOrNull(FULL_BUILD_BUDGET_MS) { fullBuild.await() } - if (quickFull != null) { - poolCache.write(key, quickFull) - return quickFull - } schedulePersistence(key, fullBuild) - val fastMode = if (mode == HomeRecommendationPoolMode.SHORTS) HomeRecommendationPoolMode.SHORTS else HomeRecommendationPoolMode.FAST + val fastMode = if (mode == HomeRecommendationPoolMode.SHORTS) { + HomeRecommendationPoolMode.FAST_SHORTS + } else { + HomeRecommendationPoolMode.FAST + } return buildPool(userId, serviceId, fastMode, context) } @@ -76,7 +74,6 @@ class HomeRecommendationPoolResolver( watchLaterService = dependencies.watchLaterService, blockedService = dependencies.blockedService, streamService = dependencies.streamService, - signalContextService = dependencies.signalContextService, ).build( userId = userId, serviceId = serviceId, @@ -92,7 +89,4 @@ class HomeRecommendationPoolResolver( } } - companion object { - private const val FULL_BUILD_BUDGET_MS = 1_500L - } } diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolverDependencies.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolverDependencies.kt index 1b8649a..e256e9f 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolverDependencies.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolverDependencies.kt @@ -10,7 +10,6 @@ data class HomeRecommendationPoolResolverDependencies( val favoritesService: FavoritesService, val watchLaterService: WatchLaterService, val blockedService: BlockedService, - val signalContextService: HomeRecommendationSignalContextService, val streamService: StreamService = HomeRecommendationNoopStreamService, val cache: CacheService, ) diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationSignalContextService.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationSignalContextService.kt deleted file mode 100644 index 3edcbdb..0000000 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationSignalContextService.kt +++ /dev/null @@ -1,25 +0,0 @@ -package dev.typetype.server.services - -class HomeRecommendationSignalContextService( - private val subscriptionsService: SubscriptionsService, - private val historyService: HistoryService, - private val favoritesService: FavoritesService = FavoritesService(), -) { - suspend fun load(userId: String): HomeRecommendationSignalContext { - val subscriptions = subscriptionsService.getAll(userId) - val history = historyService.search( - userId = userId, - q = null, - from = null, - to = null, - limit = 60, - offset = 0, - ).first - val favorites = favoritesService.getAll(userId) - return HomeRecommendationSignalContext( - userSubscriptions = subscriptions.map { it.channelUrl }, - historyItems = history.map { it.url }, - favoriteUrls = favorites.map { it.videoUrl }, - ) - } -} diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationUserSignalService.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationUserSignalService.kt index e34126f..8ba1ef9 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationUserSignalService.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationUserSignalService.kt @@ -1,5 +1,8 @@ package dev.typetype.server.services +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope + class HomeRecommendationUserSignalService( private val subscriptionsService: SubscriptionsService, private val historyService: HistoryService, @@ -7,18 +10,26 @@ class HomeRecommendationUserSignalService( private val watchLaterService: WatchLaterService, private val blockedService: BlockedService, ) { - suspend fun loadProfile(userId: String): HomeRecommendationProfile { - val subscriptions = subscriptionsService.getAll(userId) - val favorites = favoritesService.getAll(userId) - val watchLater = watchLaterService.getAll(userId) - val historyItems = historyService.search(userId = userId, q = null, from = null, to = null, limit = 240, offset = 0).first - val blockedVideos = blockedService.getVideos(userId).map { it.url }.toSet() - val blockedChannels = blockedService.getChannels(userId).map { it.url }.toSet() + suspend fun load(userId: String): Pair = coroutineScope { + val subscriptionsDeferred = async { subscriptionsService.getAll(userId) } + val favoritesDeferred = async { favoritesService.getAll(userId) } + val watchLaterDeferred = async { watchLaterService.getAll(userId) } + val historyDeferred = async { + historyService.search(userId = userId, q = null, from = null, to = null, limit = 240, offset = 0).first + } + val blockedVideosDeferred = async { blockedService.getVideos(userId).map { it.url }.toSet() } + val blockedChannelsDeferred = async { blockedService.getChannels(userId).map { it.url }.toSet() } + val subscriptions = subscriptionsDeferred.await() + val favorites = favoritesDeferred.await() + val watchLater = watchLaterDeferred.await() + val historyItems = historyDeferred.await() + val blockedVideos = blockedVideosDeferred.await() + val blockedChannels = blockedChannelsDeferred.await() val seenUrls = historyItems.map { it.url }.toSet() val favoriteUrls = favorites.map { it.videoUrl }.toSet() val watchLaterUrls = watchLater.map { it.url }.toSet() val subscriptionChannels = subscriptions.map { it.channelUrl }.toSet() - return HomeRecommendationProfile( + val profile = HomeRecommendationProfile( seenUrls = seenUrls, blockedVideos = blockedVideos, blockedChannels = blockedChannels, @@ -32,5 +43,11 @@ class HomeRecommendationUserSignalService( themeQueries = emptyList(), personalizationEnabled = false, ) + val signalContext = HomeRecommendationSignalContext( + userSubscriptions = subscriptions.map { it.channelUrl }, + historyItems = historyItems.take(60).map { it.url }, + favoriteUrls = favorites.map { it.videoUrl }, + ) + profile to signalContext } } diff --git a/src/main/kotlin/dev/typetype/server/services/SubscriptionFeedService.kt b/src/main/kotlin/dev/typetype/server/services/SubscriptionFeedService.kt index a97ee61..a4c1f6e 100644 --- a/src/main/kotlin/dev/typetype/server/services/SubscriptionFeedService.kt +++ b/src/main/kotlin/dev/typetype/server/services/SubscriptionFeedService.kt @@ -80,7 +80,7 @@ class SubscriptionFeedService( Base64.getEncoder().encodeToString("""{"page":$page}""".toByteArray()) companion object { - private const val FEED_TTL_SECONDS = 300L + private const val FEED_TTL_SECONDS = 900L private const val MAX_CONCURRENT_FETCHES = 20 private const val CHANNEL_TIMEOUT_MS = 15_000L } diff --git a/src/main/kotlin/dev/typetype/server/services/SubscriptionShortsFeedService.kt b/src/main/kotlin/dev/typetype/server/services/SubscriptionShortsFeedService.kt index 089af05..a8a2c72 100644 --- a/src/main/kotlin/dev/typetype/server/services/SubscriptionShortsFeedService.kt +++ b/src/main/kotlin/dev/typetype/server/services/SubscriptionShortsFeedService.kt @@ -27,13 +27,21 @@ class SubscriptionShortsFeedService( val next = if (to < all.size) (page + 1).toString() else null return SubscriptionFeedResponse(videos = all.subList(from, to), nextpage = next) } - + suspend fun getCachedFeed(userId: String, page: Int, limit: Int): SubscriptionFeedResponse? { + val raw = runCatching { cache.get(SubscriptionFeedCacheKeys.shorts(userId)) }.getOrNull() ?: return null + val all = runCatching { CacheJson.decodeFromString(ListSerializer(VideoItem.serializer()), raw) }.getOrNull() + ?: return null + val from = page * limit + if (from >= all.size) return SubscriptionFeedResponse(videos = emptyList(), nextpage = null) + val to = minOf(from + limit, all.size) + val next = if (to < all.size) (page + 1).toString() else null + return SubscriptionFeedResponse(videos = all.subList(from, to), nextpage = next) + } suspend fun getBlendedFeed(userId: String, serviceId: Int, page: Int, limit: Int): SubscriptionFeedResponse { val sourcePage = page val subs = getFeed(userId, sourcePage, limit).videos return blendService.build(subs = subs, serviceId = serviceId, page = page, limit = limit) } - private suspend fun cachedAll(userId: String): List { val key = SubscriptionFeedCacheKeys.shorts(userId) runCatching { cache.get(key) }.getOrNull()?.let { raw -> @@ -42,7 +50,6 @@ class SubscriptionShortsFeedService( } return fetchAndCache(userId, key) } - private suspend fun fetchAndCache(userId: String, key: String): List { val subs = subscriptionsService.getAll(userId) val videos = coroutineScope { @@ -59,10 +66,9 @@ class SubscriptionShortsFeedService( .map { it.toShortCanonicalUrl() } .sortedByDescending { if (it.uploaded == -1L) Long.MIN_VALUE else it.uploaded } .let { diversifyByUploader(it) } - runCatching { cache.set(key, CacheJson.encodeToString(ListSerializer(VideoItem.serializer()), dedup), 300L) } + runCatching { cache.set(key, CacheJson.encodeToString(ListSerializer(VideoItem.serializer()), dedup), FEED_TTL_SECONDS) } return dedup } - private fun diversifyByUploader(videos: List): List { val grouped = videos.groupBy { uploaderKey(it) } val queues = grouped.values @@ -106,4 +112,7 @@ class SubscriptionShortsFeedService( return if (channelUrl.endsWith('/')) "${channelUrl}shorts" else "$channelUrl/shorts" } + companion object { + private const val FEED_TTL_SECONDS = 900L + } } diff --git a/src/test/kotlin/dev/typetype/server/HomeRecommendationCandidateServiceTest.kt b/src/test/kotlin/dev/typetype/server/HomeRecommendationCandidateServiceTest.kt index 8b69c4c..ccf3240 100644 --- a/src/test/kotlin/dev/typetype/server/HomeRecommendationCandidateServiceTest.kt +++ b/src/test/kotlin/dev/typetype/server/HomeRecommendationCandidateServiceTest.kt @@ -30,18 +30,18 @@ class HomeRecommendationCandidateServiceTest { @BeforeEach fun setup() { coEvery { subscriptionFeedService.getCachedFeed(any(), any(), any()) } returns SubscriptionFeedResponse(emptyList(), null) + coEvery { subscriptionFeedService.getFeed(any(), any(), any()) } returns SubscriptionFeedResponse(emptyList(), null) coEvery { subscriptionShortsFeedService.getBlendedFeed(any(), any(), any(), any()) } returns SubscriptionFeedResponse(emptyList(), null) coEvery { streamService.getStreamInfo(any()) } returns ExtractionResult.Failure("none") } @Test - fun `fast mode builds discovery from subscription related streams`() = runTest { + fun `fast mode stays cache-only without related streams`() = runTest { val seed = video("s1", "seed") - val related = video("r1", "related") coEvery { subscriptionFeedService.getCachedFeed(any(), any(), any()) } returns SubscriptionFeedResponse(listOf(seed), null) - coEvery { streamService.getStreamInfo(seed.url) } returns ExtractionResult.Success(stream(seed.url, listOf(related))) val pool = service.fetchCandidates("u", 0, profile(), HomeRecommendationPoolMode.FAST) - assertTrue(pool.discovery.any { it.video.id == "r1" && it.source == HomeRecommendationSourceTag.DISCOVERY_THEME }) + assertTrue(pool.subscriptions.any { it.video.id == "s1" && it.source == HomeRecommendationSourceTag.SUBSCRIPTION }) + assertTrue(pool.discovery.isEmpty()) } @Test @@ -50,7 +50,7 @@ class HomeRecommendationCandidateServiceTest { val related = video("rf1", "related") coEvery { streamService.getStreamInfo(favoriteSeed.url) } returns ExtractionResult.Success(stream(favoriteSeed.url, listOf(related))) val signalContext = HomeRecommendationSignalContext(favoriteUrls = listOf(favoriteSeed.url)) - val pool = service.fetchCandidates("u", 0, profile(), HomeRecommendationPoolMode.FAST, signalContext) + val pool = service.fetchCandidates("u", 0, profile(), HomeRecommendationPoolMode.FULL, signalContext) assertTrue(pool.discovery.any { it.video.id == "rf1" && it.source == HomeRecommendationSourceTag.DISCOVERY_EXPLORATION }) } diff --git a/src/test/kotlin/dev/typetype/server/HomeRecommendationTestFixtures.kt b/src/test/kotlin/dev/typetype/server/HomeRecommendationTestFixtures.kt index 49ce9ff..8e6e799 100644 --- a/src/test/kotlin/dev/typetype/server/HomeRecommendationTestFixtures.kt +++ b/src/test/kotlin/dev/typetype/server/HomeRecommendationTestFixtures.kt @@ -11,7 +11,6 @@ import dev.typetype.server.services.HomeRecommendationPoolResolver import dev.typetype.server.services.HomeRecommendationPoolResolverDependencies import dev.typetype.server.services.HomeRecommendationSessionContext import dev.typetype.server.services.HomeRecommendationSessionIntent -import dev.typetype.server.services.HomeRecommendationSignalContextService import dev.typetype.server.services.SubscriptionFeedService import dev.typetype.server.services.SubscriptionShortsBlendService import dev.typetype.server.services.SubscriptionShortsFeedService @@ -37,7 +36,6 @@ fun homeResolverDependencies( favoritesService = FavoritesService(), watchLaterService = WatchLaterService(), blockedService = BlockedService(), - signalContextService = HomeRecommendationSignalContextService(subscriptions, HistoryService()), cache = cache, ) From 9982f78d50fe956fb60c30b5c0282636406801df Mon Sep 17 00:00:00 2001 From: Priveetee Date: Thu, 7 May 2026 05:02:14 +0200 Subject: [PATCH 2/2] perf: warm recommendation caches proactively --- .../kotlin/dev/typetype/server/Application.kt | 2 +- .../server/HomeRecommendationServices.kt | 5 +- .../dev/typetype/server/ServiceRegistry.kt | 1 + .../dev/typetype/server/routes/AuthRoutes.kt | 17 ++-- .../server/routes/SubscriptionsRoutes.kt | 14 ++- .../typetype/server/routes/UserDataRoutes.kt | 2 +- .../HomeRecommendationCandidateLimits.kt | 8 +- .../services/HomeRecommendationPoolCache.kt | 20 ++++- .../HomeRecommendationPoolResolver.kt | 2 + .../services/HomeRecommendationWarmup.kt | 11 +++ .../HomeRecommendationWarmupService.kt | 86 +++++++++++++++++++ .../services/SubscriptionFeedService.kt | 2 +- .../services/SubscriptionShortsFeedService.kt | 2 +- 13 files changed, 152 insertions(+), 20 deletions(-) create mode 100644 src/main/kotlin/dev/typetype/server/services/HomeRecommendationWarmup.kt create mode 100644 src/main/kotlin/dev/typetype/server/services/HomeRecommendationWarmupService.kt diff --git a/src/main/kotlin/dev/typetype/server/Application.kt b/src/main/kotlin/dev/typetype/server/Application.kt index 53eaf63..dbd8d7e 100644 --- a/src/main/kotlin/dev/typetype/server/Application.kt +++ b/src/main/kotlin/dev/typetype/server/Application.kt @@ -99,7 +99,7 @@ fun Application.module() { storyboardProxyRoutes(svc.proxyService) } downloaderGatewayRoutes(downloaderGatewayService) - authRoutes(authService, passwordResetService, profileService, adminSettingsService) + authRoutes(authService, passwordResetService, profileService, adminSettingsService, svc.homeRecommendationWarmupService) adminRoutes(authService, userAdminService, passwordResetService, adminSettingsService) adminSessionRoutes(authService, activeSessionService) sessionActivityRoutes(authService, activeSessionService) diff --git a/src/main/kotlin/dev/typetype/server/HomeRecommendationServices.kt b/src/main/kotlin/dev/typetype/server/HomeRecommendationServices.kt index 9046ee1..305ccdb 100644 --- a/src/main/kotlin/dev/typetype/server/HomeRecommendationServices.kt +++ b/src/main/kotlin/dev/typetype/server/HomeRecommendationServices.kt @@ -4,9 +4,11 @@ import dev.typetype.server.cache.DragonflyService import dev.typetype.server.services.HomeRecommendationPoolResolver import dev.typetype.server.services.HomeRecommendationPoolResolverDependencies import dev.typetype.server.services.HomeRecommendationService +import dev.typetype.server.services.HomeRecommendationWarmupService data class HomeRecommendationServices( val recommendationService: HomeRecommendationService, + val warmupService: HomeRecommendationWarmupService, ) fun createHomeRecommendationServices( @@ -17,5 +19,6 @@ fun createHomeRecommendationServices( val recommendationService = HomeRecommendationService( poolResolver = HomeRecommendationPoolResolver(resolverDeps), ) - return HomeRecommendationServices(recommendationService) + val warmupService = HomeRecommendationWarmupService(recommendationService, cache) + return HomeRecommendationServices(recommendationService, warmupService) } diff --git a/src/main/kotlin/dev/typetype/server/ServiceRegistry.kt b/src/main/kotlin/dev/typetype/server/ServiceRegistry.kt index d2fd3b9..6100a64 100644 --- a/src/main/kotlin/dev/typetype/server/ServiceRegistry.kt +++ b/src/main/kotlin/dev/typetype/server/ServiceRegistry.kt @@ -96,4 +96,5 @@ internal class ServiceRegistry(cache: DragonflyService, subtitleServiceUrl: Stri ) private val homeRecommendationServices = createHomeRecommendationServices(cache, recommendationPoolResolverDependencies) val homeRecommendationService = homeRecommendationServices.recommendationService + val homeRecommendationWarmupService = homeRecommendationServices.warmupService } diff --git a/src/main/kotlin/dev/typetype/server/routes/AuthRoutes.kt b/src/main/kotlin/dev/typetype/server/routes/AuthRoutes.kt index 43f4562..1fbf2f7 100644 --- a/src/main/kotlin/dev/typetype/server/routes/AuthRoutes.kt +++ b/src/main/kotlin/dev/typetype/server/routes/AuthRoutes.kt @@ -5,6 +5,8 @@ import dev.typetype.server.models.UserProfileItem import dev.typetype.server.services.AdminSettingsService import dev.typetype.server.services.AuthCookieHelpers import dev.typetype.server.services.AuthService +import dev.typetype.server.services.HomeRecommendationWarmup +import dev.typetype.server.services.NoopHomeRecommendationWarmup import dev.typetype.server.services.PasswordResetService import dev.typetype.server.services.ProfileService import io.ktor.http.HttpStatusCode @@ -15,7 +17,7 @@ import io.ktor.server.routing.Route import io.ktor.server.routing.get import io.ktor.server.routing.post -fun Route.authRoutes(authService: AuthService, passwordResetService: PasswordResetService, profileService: ProfileService, adminSettingsService: AdminSettingsService) { +fun Route.authRoutes(authService: AuthService, passwordResetService: PasswordResetService, profileService: ProfileService, adminSettingsService: AdminSettingsService, warmupService: HomeRecommendationWarmup = NoopHomeRecommendationWarmup) { post("/auth/register") { val req = call.receive() if (req.email.isBlank() || req.password.isBlank() || req.name.isBlank()) { @@ -29,13 +31,13 @@ fun Route.authRoutes(authService: AuthService, passwordResetService: PasswordRes } try { val token = authService.register(req.email, req.password, req.name) + token.accessToken.warm(authService, warmupService) AuthCookieHelpers.setRefreshCookie(call.response, token.refreshToken) call.respond(SessionResponse(token.accessToken)) } catch (e: Exception) { call.respond(HttpStatusCode.BadRequest, ErrorResponse("Registration failed")) } } - post("/auth/login") { val req = call.receive() val identifier = req.identifier?.trim().orEmpty().ifBlank { req.email?.trim().orEmpty() } @@ -45,9 +47,9 @@ fun Route.authRoutes(authService: AuthService, passwordResetService: PasswordRes return@post } AuthCookieHelpers.setRefreshCookie(call.response, token.refreshToken) + token.accessToken.warm(authService, warmupService) call.respond(SessionResponse(token.accessToken)) } - post("/auth/refresh") { val req = runCatching { call.receive() }.getOrNull() val refreshToken = AuthCookieHelpers.extractRefreshToken(call) ?: req?.token @@ -57,9 +59,9 @@ fun Route.authRoutes(authService: AuthService, passwordResetService: PasswordRes return@post } AuthCookieHelpers.setRefreshCookie(call.response, newToken.refreshToken) + newToken.accessToken.warm(authService, warmupService) call.respond(SessionResponse(newToken.accessToken)) } - post("/auth/logout") { val refreshToken = AuthCookieHelpers.extractRefreshToken(call) authService.logout(refreshToken) @@ -95,7 +97,9 @@ fun Route.authRoutes(authService: AuthService, passwordResetService: PasswordRes } post("/auth/guest") { - call.respond(AuthResponse(authService.guestLogin())) + val token = authService.guestLogin() + token.warm(authService, warmupService) + call.respond(AuthResponse(token)) } post("/auth/reset-password") { @@ -111,3 +115,6 @@ fun Route.authRoutes(authService: AuthService, passwordResetService: PasswordRes if (ok) call.respond(HttpStatusCode.NoContent) else call.respond(HttpStatusCode.BadRequest, ErrorResponse("Invalid or expired reset token")) } } + +private fun String.warm(authService: AuthService, warmupService: HomeRecommendationWarmup): Unit = + authService.verify(this)?.let(warmupService::markActive) ?: Unit diff --git a/src/main/kotlin/dev/typetype/server/routes/SubscriptionsRoutes.kt b/src/main/kotlin/dev/typetype/server/routes/SubscriptionsRoutes.kt index 9d9c6f6..3d7b40b 100644 --- a/src/main/kotlin/dev/typetype/server/routes/SubscriptionsRoutes.kt +++ b/src/main/kotlin/dev/typetype/server/routes/SubscriptionsRoutes.kt @@ -3,6 +3,8 @@ package dev.typetype.server.routes import dev.typetype.server.models.ErrorResponse import dev.typetype.server.models.SubscriptionItem import dev.typetype.server.services.AuthService +import dev.typetype.server.services.HomeRecommendationWarmup +import dev.typetype.server.services.NoopHomeRecommendationWarmup import dev.typetype.server.services.SubscriptionsService import io.ktor.http.HttpStatusCode import io.ktor.server.application.ApplicationCall @@ -16,7 +18,7 @@ import io.ktor.server.routing.post import java.net.URLDecoder import java.nio.charset.StandardCharsets -fun Route.subscriptionsRoutes(subscriptionsService: SubscriptionsService, authService: AuthService) { +fun Route.subscriptionsRoutes(subscriptionsService: SubscriptionsService, authService: AuthService, warmupService: HomeRecommendationWarmup = NoopHomeRecommendationWarmup) { get("/subscriptions") { call.withJwtAuth(authService) { userId -> call.respond(subscriptionsService.getAll(userId)) } } @@ -25,31 +27,35 @@ fun Route.subscriptionsRoutes(subscriptionsService: SubscriptionsService, authSe val item = runCatching { call.receive() }.getOrElse { return@withJwtAuth call.respond(HttpStatusCode.BadRequest, ErrorResponse("Invalid request body")) } - call.respond(HttpStatusCode.Created, subscriptionsService.add(userId, item)) + val subscription = subscriptionsService.add(userId, item) + warmupService.invalidateAndWarm(userId) + call.respond(HttpStatusCode.Created, subscription) } } delete("/subscriptions") { call.withJwtAuth(authService) { userId -> val channelUrl = call.request.queryParameters["url"]?.takeIf { it.isNotBlank() } ?: return@withJwtAuth call.respond(HttpStatusCode.BadRequest, ErrorResponse("Missing channelUrl")) - call.respondDeleteResult(subscriptionsService, userId, channelUrl) + call.respondDeleteResult(subscriptionsService, warmupService, userId, channelUrl) } } delete("/subscriptions/{channelUrl...}") { call.withJwtAuth(authService) { userId -> val channelUrl = call.extractDeleteChannelUrl() ?: return@withJwtAuth call.respond(HttpStatusCode.BadRequest, ErrorResponse("Missing channelUrl")) - call.respondDeleteResult(subscriptionsService, userId, channelUrl) + call.respondDeleteResult(subscriptionsService, warmupService, userId, channelUrl) } } } private suspend fun ApplicationCall.respondDeleteResult( subscriptionsService: SubscriptionsService, + warmupService: HomeRecommendationWarmup, userId: String, channelUrl: String, ) { val deleted = subscriptionsService.delete(userId, channelUrl) + if (deleted) warmupService.invalidateAndWarm(userId) if (deleted) respond(HttpStatusCode.NoContent) else respond(HttpStatusCode.NotFound, ErrorResponse("Not found")) } diff --git a/src/main/kotlin/dev/typetype/server/routes/UserDataRoutes.kt b/src/main/kotlin/dev/typetype/server/routes/UserDataRoutes.kt index a4fdacb..88943c4 100644 --- a/src/main/kotlin/dev/typetype/server/routes/UserDataRoutes.kt +++ b/src/main/kotlin/dev/typetype/server/routes/UserDataRoutes.kt @@ -17,7 +17,7 @@ internal fun Route.userDataRoutes( restoreService: PipePipeBackupImporterService, ) { historyRoutes(svc.historyService, authService) - subscriptionsRoutes(svc.subscriptionsService, authService) + subscriptionsRoutes(svc.subscriptionsService, authService, svc.homeRecommendationWarmupService) subscriptionFeedRoutes(svc.subscriptionFeedService, authService) subscriptionShortsFeedRoutes(svc.subscriptionShortsFeedService, authService) playlistRoutes(svc.playlistService, authService) diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationCandidateLimits.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationCandidateLimits.kt index ffbd905..3f4ee9e 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationCandidateLimits.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationCandidateLimits.kt @@ -2,10 +2,10 @@ package dev.typetype.server.services object HomeRecommendationCandidateLimits { const val FAST_SUBSCRIPTION_PAGE_SIZE = 60 - const val SUBSCRIPTION_SEED_LIMIT = 20 - const val FAVORITE_SEED_LIMIT = 20 - const val RELATED_PER_SEED_LIMIT = 18 - const val RELATED_DISCOVERY_CAP = 80 + const val SUBSCRIPTION_SEED_LIMIT = 8 + const val FAVORITE_SEED_LIMIT = 6 + const val RELATED_PER_SEED_LIMIT = 10 + const val RELATED_DISCOVERY_CAP = 48 const val FAST_THEME_QUERY_LIMIT = 2 const val FULL_THEME_QUERY_LIMIT = 6 const val SIGNAL_QUERY_LIMIT = 4 diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolCache.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolCache.kt index bdcdd27..229de66 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolCache.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolCache.kt @@ -12,8 +12,21 @@ class HomeRecommendationPoolCache(private val cache: dev.typetype.server.cache.C return runCatching { CacheJson.decodeFromString(raw) }.getOrNull() } + suspend fun readStale(key: String): HomeRecommendationPool? { + val raw = runCatching { cache.get(staleKey(key)) }.getOrNull() ?: return null + return runCatching { CacheJson.decodeFromString(raw) }.getOrNull() + } + suspend fun write(key: String, pool: HomeRecommendationPool) { - runCatching { cache.set(key, CacheJson.encodeToString(pool), CACHE_TTL_SECONDS) } + val raw = CacheJson.encodeToString(pool) + runCatching { cache.set(key, raw, CACHE_TTL_SECONDS) } + runCatching { cache.set(staleKey(key), raw, STALE_TTL_SECONDS) } + } + + suspend fun delete(userId: String, serviceId: Int, mode: HomeRecommendationPoolMode) { + val key = key(userId, serviceId, mode, personalizationEnabled = false) + runCatching { cache.delete(key) } + runCatching { cache.delete(staleKey(key)) } } fun key(userId: String, serviceId: Int, mode: HomeRecommendationPoolMode, personalizationEnabled: Boolean): String { @@ -23,8 +36,11 @@ class HomeRecommendationPoolCache(private val cache: dev.typetype.server.cache.C return "recommendations:home:$hex" } + private fun staleKey(key: String): String = "$key:stale" + companion object { - private const val CACHE_TTL_SECONDS = 900L + private const val CACHE_TTL_SECONDS = 3_600L + private const val STALE_TTL_SECONDS = 86_400L private const val CACHE_VERSION = 8 } } diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolver.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolver.kt index 7e4ded5..90984e7 100644 --- a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolver.kt +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationPoolResolver.kt @@ -31,6 +31,8 @@ class HomeRecommendationPoolResolver( if (cached != null) return cached val fullBuild = fullBuild(key, userId, serviceId, mode, context) schedulePersistence(key, fullBuild) + val stale = poolCache.readStale(key) + if (stale != null) return stale val fastMode = if (mode == HomeRecommendationPoolMode.SHORTS) { HomeRecommendationPoolMode.FAST_SHORTS } else { diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationWarmup.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationWarmup.kt new file mode 100644 index 0000000..72e93e1 --- /dev/null +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationWarmup.kt @@ -0,0 +1,11 @@ +package dev.typetype.server.services + +interface HomeRecommendationWarmup { + fun markActive(userId: String) + fun invalidateAndWarm(userId: String) +} + +object NoopHomeRecommendationWarmup : HomeRecommendationWarmup { + override fun markActive(userId: String) = Unit + override fun invalidateAndWarm(userId: String) = Unit +} diff --git a/src/main/kotlin/dev/typetype/server/services/HomeRecommendationWarmupService.kt b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationWarmupService.kt new file mode 100644 index 0000000..44ddc94 --- /dev/null +++ b/src/main/kotlin/dev/typetype/server/services/HomeRecommendationWarmupService.kt @@ -0,0 +1,86 @@ +package dev.typetype.server.services + +import dev.typetype.server.cache.CacheService +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import java.util.concurrent.ConcurrentHashMap + +class HomeRecommendationWarmupService( + private val recommendationService: HomeRecommendationService, + private val cache: CacheService, +) : HomeRecommendationWarmup { + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + private val activeUsers = ConcurrentHashMap() + private val warmupStartedAt = ConcurrentHashMap() + private val poolCache = HomeRecommendationPoolCache(cache) + + init { + scope.launch { refreshLoop() } + } + + override fun markActive(userId: String) { + activeUsers[userId] = System.currentTimeMillis() + schedule(userId) + } + + override fun invalidateAndWarm(userId: String) { + activeUsers[userId] = System.currentTimeMillis() + scope.launch { + invalidate(userId) + schedule(userId, force = true) + } + } + + private fun schedule(userId: String, force: Boolean = false) { + val now = System.currentTimeMillis() + val previous = warmupStartedAt[userId] + if (!force && previous != null && now - previous < WARMUP_THROTTLE_MS) return + warmupStartedAt[userId] = now + scope.launch { warm(userId) } + } + + private suspend fun warm(userId: String) { + val context = context() + runCatching { + recommendationService.getHome(userId, YOUTUBE_SERVICE_ID, WARMUP_LIMIT, HomeRecommendationCursor(), context) + } + runCatching { + recommendationService.getShorts(userId, YOUTUBE_SERVICE_ID, WARMUP_LIMIT, HomeRecommendationCursor(), context) + } + } + + private suspend fun invalidate(userId: String) { + cache.delete(SubscriptionFeedCacheKeys.feed(userId)) + cache.delete(SubscriptionFeedCacheKeys.shorts(userId)) + poolCache.delete(userId, YOUTUBE_SERVICE_ID, HomeRecommendationPoolMode.FULL) + poolCache.delete(userId, YOUTUBE_SERVICE_ID, HomeRecommendationPoolMode.SHORTS) + } + + private suspend fun refreshLoop() { + while (scope.isActive) { + delay(REFRESH_INTERVAL_MS) + val now = System.currentTimeMillis() + activeUsers.entries.removeIf { now - it.value > ACTIVE_TTL_MS } + activeUsers.keys.forEach { schedule(it) } + } + } + + private fun context(): HomeRecommendationContext = HomeRecommendationContext( + serviceId = YOUTUBE_SERVICE_ID, + sessionContext = HomeRecommendationSessionContext( + intent = HomeRecommendationSessionIntent.AUTO, + deviceClass = HomeRecommendationDeviceClass.UNKNOWN, + ), + ) + + companion object { + private const val WARMUP_LIMIT = 20 + private const val WARMUP_THROTTLE_MS = 10 * 60 * 1000L + private const val REFRESH_INTERVAL_MS = 10 * 60 * 1000L + private const val ACTIVE_TTL_MS = 60 * 60 * 1000L + } +} diff --git a/src/main/kotlin/dev/typetype/server/services/SubscriptionFeedService.kt b/src/main/kotlin/dev/typetype/server/services/SubscriptionFeedService.kt index a4c1f6e..5a462f6 100644 --- a/src/main/kotlin/dev/typetype/server/services/SubscriptionFeedService.kt +++ b/src/main/kotlin/dev/typetype/server/services/SubscriptionFeedService.kt @@ -80,7 +80,7 @@ class SubscriptionFeedService( Base64.getEncoder().encodeToString("""{"page":$page}""".toByteArray()) companion object { - private const val FEED_TTL_SECONDS = 900L + private const val FEED_TTL_SECONDS = 3_600L private const val MAX_CONCURRENT_FETCHES = 20 private const val CHANNEL_TIMEOUT_MS = 15_000L } diff --git a/src/main/kotlin/dev/typetype/server/services/SubscriptionShortsFeedService.kt b/src/main/kotlin/dev/typetype/server/services/SubscriptionShortsFeedService.kt index a8a2c72..b881e4d 100644 --- a/src/main/kotlin/dev/typetype/server/services/SubscriptionShortsFeedService.kt +++ b/src/main/kotlin/dev/typetype/server/services/SubscriptionShortsFeedService.kt @@ -113,6 +113,6 @@ class SubscriptionShortsFeedService( } companion object { - private const val FEED_TTL_SECONDS = 900L + private const val FEED_TTL_SECONDS = 3_600L } }