diff --git a/src/integrationTest/java/no/fintlabs/config/KafkaTestConfig.kt b/src/integrationTest/java/no/fintlabs/config/KafkaTestConfig.kt new file mode 100644 index 00000000..4ddd270c --- /dev/null +++ b/src/integrationTest/java/no/fintlabs/config/KafkaTestConfig.kt @@ -0,0 +1,9 @@ +package no.fintlabs.config + +import no.fintlabs.utils.EntityProducer +import org.springframework.boot.test.context.TestConfiguration +import org.springframework.context.annotation.Import + +@TestConfiguration +@Import(EntityProducer::class) +class KafkaTestConfig diff --git a/src/integrationTest/java/no/fintlabs/consumer/integration/SyncTimestampIT.kt b/src/integrationTest/java/no/fintlabs/consumer/integration/SyncTimestampIT.kt new file mode 100644 index 00000000..45adf819 --- /dev/null +++ b/src/integrationTest/java/no/fintlabs/consumer/integration/SyncTimestampIT.kt @@ -0,0 +1,200 @@ +package no.fintlabs.consumer.integration + +import com.fasterxml.jackson.databind.ObjectMapper +import no.fintlabs.adapter.models.sync.SyncType +import no.fintlabs.cache.CacheService +import no.fintlabs.config.KafkaTestConfig +import no.fintlabs.consumer.admin.CacheEntry +import no.fintlabs.consumer.resource.dto.LastUpdatedResponse +import no.fintlabs.utils.EntityProducer +import no.novari.fint.model.felles.kompleksedatatyper.Identifikator +import no.novari.fint.model.resource.utdanning.vurdering.ElevfravarResource +import org.awaitility.kotlin.await +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.web.server.LocalServerPort +import org.springframework.context.annotation.Import +import org.springframework.core.ParameterizedTypeReference +import org.springframework.kafka.test.context.EmbeddedKafka +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.web.reactive.server.WebTestClient +import org.springframework.test.web.reactive.server.expectBody +import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.UUID +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +@EmbeddedKafka( + partitions = 1, + topics = ["fintlabs-no.fint-core.entity.utdanning-vurdering"], +) +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = [ + "fint.security.enabled=false", + ], +) +@ActiveProfiles("utdanning-vurdering") +@Import(KafkaTestConfig::class) +class SyncTimestampIT { + @LocalServerPort + private var port: Int = 0 + + private val client by lazy { + WebTestClient.bindToServer().baseUrl("http://localhost:$port").build() + } + + private val resourceName = "elevfravar" + + @Autowired + lateinit var entityProducer: EntityProducer + + @Autowired + lateinit var objectMapper: ObjectMapper + + @Autowired + lateinit var cacheService: CacheService + + @AfterEach + fun tearDown() { + cacheService.getCache(resourceName).evictExpired(Long.MAX_VALUE) + } + + @Test + @DirtiesContext(methodMode = DirtiesContext.MethodMode.BEFORE_METHOD) + fun `before any full sync, lastFullSync is epoch in last-updated response`() { + val lastUpdated = getLastUpdated() + assertEquals( + 0L, + lastUpdated.lastCompletedFullSync.time, + "lastCompletedFullSync should be epoch before any full sync", + ) + } + + @Test + @DirtiesContext(methodMode = DirtiesContext.MethodMode.BEFORE_METHOD) + fun `before any full sync, admin cache status shows epoch for lastFullSync`() { + val cacheStatus = getCacheStatus() + val fagEntry = cacheStatus[resourceName] ?: error("No fag entry in cache status") + assertEquals( + 0L, + fagEntry.lastCompletedFullSync.time, + "lastCompletedFullSync should be epoch before any full sync", + ) + } + + @Test + fun `after full sync completes, last-updated response contains sync timestamp`() { + val syncTimestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli() + sendFullSync(timestamp = syncTimestamp) + + await.atMost(Duration.ofSeconds(10)).untilAsserted { + val lastUpdated = getLastUpdated() + assertTrue( + lastUpdated.lastCompletedFullSync.time > 0, + "lastCompletedFullSync should be non-epoch after full sync", + ) + assertEquals( + syncTimestamp, + lastUpdated.lastCompletedFullSync.time, + "lastCompletedFullSync should match the timestamp of the completed sync", + ) + } + } + + @Test + fun `after full sync completes, admin cache status contains sync timestamp`() { + val syncTimestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli() + sendFullSync(timestamp = syncTimestamp) + + await.atMost(Duration.ofSeconds(10)).untilAsserted { + val fagEntry = getCacheStatus()[resourceName] ?: error("No fag entry in cache status") + assertTrue( + fagEntry.lastCompletedFullSync.time > 0, + "lastCompletedFullSync should be non-epoch after full sync", + ) + assertEquals( + syncTimestamp, + fagEntry.lastCompletedFullSync.time, + "lastCompletedFullSync in admin cache status should match the completed sync timestamp", + ) + } + } + + @Test + fun `second full sync updates lastFullSync to the newer timestamp`() { + val firstTimestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli() + sendFullSync(timestamp = firstTimestamp) + + await.atMost(Duration.ofSeconds(10)).untilAsserted { + assertEquals(firstTimestamp, getLastUpdated().lastCompletedFullSync.time) + } + + val corrId = UUID.randomUUID().toString() + val secondTimestamp = firstTimestamp + 5_000 + sendFullSync(timestamp = secondTimestamp, syncCorrId = corrId, syncTotalSize = 2) + sendFullSync(timestamp = secondTimestamp, syncCorrId = corrId, syncTotalSize = 2) + + await.atMost(Duration.ofSeconds(10)).untilAsserted { + assertEquals( + secondTimestamp, + getLastUpdated().lastCompletedFullSync.time, + "lastCompletedFullSync should be updated to the second sync's timestamp", + ) + } + } + + private fun getLastUpdated(): LastUpdatedResponse = + client + .get() + .uri("/utdanning/vurdering/elevfravar/last-updated") + .exchange() + .expectStatus() + .isOk + .expectBody() + .returnResult() + .responseBody!! + + private fun getCacheStatus(): Map = + client + .get() + .uri("/utdanning/vurdering/admin/cache/status") + .exchange() + .expectStatus() + .isOk + .expectBody(object : ParameterizedTypeReference>() {}) + .returnResult() + .responseBody!! + + private fun sendFullSync( + timestamp: Long = Instant.now().toEpochMilli(), + syncTotalSize: Long = 1, + syncCorrId: String = UUID.randomUUID().toString(), + ) { + val resourceId = UUID.randomUUID().toString() + val resource = + ElevfravarResource().apply { + systemId = + Identifikator().apply { + identifikatorverdi = resourceId + } + } + + entityProducer + .publish( + resourceName, + resource, + resourceId, + SyncType.FULL, + syncCorrId, + syncTotalSize, + timestamp, + ).get(10, TimeUnit.SECONDS) + } +} diff --git a/src/integrationTest/java/no/fintlabs/utils/EntityProducer.kt b/src/integrationTest/java/no/fintlabs/utils/EntityProducer.kt new file mode 100644 index 00000000..276411fa --- /dev/null +++ b/src/integrationTest/java/no/fintlabs/utils/EntityProducer.kt @@ -0,0 +1,77 @@ +package no.fintlabs.utils + +import no.fintlabs.adapter.models.sync.SyncType +import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.kafka.KafkaConstants.LAST_MODIFIED +import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME +import no.fintlabs.consumer.kafka.KafkaConstants.SYNC_CORRELATION_ID +import no.fintlabs.consumer.kafka.KafkaConstants.SYNC_TOTAL_SIZE +import no.fintlabs.consumer.kafka.KafkaConstants.SYNC_TYPE +import no.fintlabs.consumer.kafka.entity.ENTITY_KEY_DELIMITER +import no.novari.kafka.producing.ParameterizedProducerRecord +import no.novari.kafka.producing.ParameterizedTemplateFactory +import no.novari.kafka.topic.name.EntityTopicNameParameters +import no.novari.kafka.topic.name.TopicNamePrefixParameters +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders +import org.springframework.boot.test.context.TestComponent +import org.springframework.kafka.support.SendResult +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture + +@TestComponent +class EntityProducer( + parameterizedTemplateFactory: ParameterizedTemplateFactory, + private val consumerConfig: ConsumerConfiguration, +) { + private val producer = parameterizedTemplateFactory.createTemplate(Any::class.java) + + fun publish( + resourceName: String, + resource: Any?, + resourceId: String, + syncType: SyncType, + syncCorrId: String, + syncTotalSize: Long, + timestamp: Long = System.currentTimeMillis(), + domainName: String = consumerConfig.domain, + packageName: String = consumerConfig.packageName, + ): CompletableFuture> = + producer.send( + ParameterizedProducerRecord + .builder() + .key("$resourceName$ENTITY_KEY_DELIMITER$resourceId") + .headers(createSyncHeaders(resourceName, syncType, syncCorrId, syncTotalSize, timestamp)) + .topicNameParameters(createEntityTopicNameParameters(domainName, packageName)) + .value(resource) + .build(), + ) + + private fun createSyncHeaders( + resourceName: String, + syncType: SyncType, + syncCorrId: String, + syncTotalSize: Long, + timestamp: Long, + ) = RecordHeaders().apply { + add(RecordHeader(RESOURCE_NAME, resourceName.toByteArray())) + add(SYNC_TYPE, byteArrayOf(syncType.ordinal.toByte())) + add(SYNC_CORRELATION_ID, syncCorrId.toByteArray()) + add(SYNC_TOTAL_SIZE, ByteBuffer.allocate(Long.SIZE_BYTES).putLong(syncTotalSize).array()) + add(LAST_MODIFIED, ByteBuffer.allocate(8).putLong(timestamp).array()) + } + + private fun createEntityTopicNameParameters( + domainName: String, + packageName: String, + ) = EntityTopicNameParameters + .builder() + .topicNamePrefixParameters( + TopicNamePrefixParameters + .stepBuilder() + .orgId(consumerConfig.orgId.asTopicSegment) + .domainContextApplicationDefault() + .build(), + ).resourceName("$domainName-$packageName") + .build() +} diff --git a/src/main/java/no/fintlabs/cache/CacheService.kt b/src/main/java/no/fintlabs/cache/CacheService.kt index 2912477f..aac9210f 100644 --- a/src/main/java/no/fintlabs/cache/CacheService.kt +++ b/src/main/java/no/fintlabs/cache/CacheService.kt @@ -1,13 +1,17 @@ package no.fintlabs.cache +import no.fintlabs.consumer.resource.context.ResourceContext import no.novari.fint.model.resource.FintResource import org.springframework.stereotype.Service import java.util.concurrent.ConcurrentHashMap @Service -class CacheService { +class CacheService( + resourceContext: ResourceContext, +) { + /** Eagerly initializes a [FintCache] for each resource name configured in [ResourceContext]. */ private val resourceCaches: MutableMap> = - ConcurrentHashMap>() + resourceContext.resourceNames.associateWithTo(ConcurrentHashMap()) { FintCache() } fun getCachedResourceNames(): Set = resourceCaches.keys diff --git a/src/main/java/no/fintlabs/consumer/admin/AdminController.kt b/src/main/java/no/fintlabs/consumer/admin/AdminController.kt index b2eb970c..fafe9737 100644 --- a/src/main/java/no/fintlabs/consumer/admin/AdminController.kt +++ b/src/main/java/no/fintlabs/consumer/admin/AdminController.kt @@ -3,6 +3,7 @@ package no.fintlabs.consumer.admin import no.fintlabs.cache.CacheService import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.EndpointsConstants +import no.fintlabs.consumer.kafka.sync.SyncTimestampStore import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.PathVariable @@ -17,6 +18,7 @@ import java.util.Date class AdminController( private val cacheService: CacheService, private val configuration: ConsumerConfiguration, + private val syncTimestampStore: SyncTimestampStore, ) { @GetMapping("/health") fun healthChecks(): ResponseEntity<*>? = null // TODO: Implement when status service is working! @@ -50,7 +52,8 @@ class AdminController( fun cacheStatus(): Map = cacheService.getCachedResourceNames().associateWith { resourceName -> val cache = cacheService.getCache(resourceName) - CacheEntry(Date(cache.lastUpdated), cache.size) + val lastFullSync = syncTimestampStore.getLastFullSync(resourceName)?.let { Date.from(it) } ?: Date(0) + CacheEntry(Date(cache.lastUpdated), lastFullSync, cache.size) } @PostMapping("/cache/rebuild", "/cache/rebuild/{model}") diff --git a/src/main/java/no/fintlabs/consumer/admin/CacheEntry.java b/src/main/java/no/fintlabs/consumer/admin/CacheEntry.java index 16e6d5c5..c68548c8 100644 --- a/src/main/java/no/fintlabs/consumer/admin/CacheEntry.java +++ b/src/main/java/no/fintlabs/consumer/admin/CacheEntry.java @@ -2,6 +2,5 @@ import java.util.Date; -public record CacheEntry(Date lastUpdated, Integer size) { +public record CacheEntry(Date lastUpdated, Date lastCompletedFullSync, Integer size) { } - diff --git a/src/main/java/no/fintlabs/consumer/kafka/sync/SyncTimestampStore.kt b/src/main/java/no/fintlabs/consumer/kafka/sync/SyncTimestampStore.kt new file mode 100644 index 00000000..346e321d --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/kafka/sync/SyncTimestampStore.kt @@ -0,0 +1,27 @@ +package no.fintlabs.consumer.kafka.sync + +import org.springframework.stereotype.Component +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap + +/** + * Tracks the most recent successfully completed full sync timestamp for each resource. + * + * Example: + * ``` + * store.recordFullSync("employee", 1710758400000L) + * store.getLastFullSync("employee") // -> 2024-03-18T08:00:00Z + * store.getLastFullSync("unknown") // -> null + * ``` + */ +@Component +class SyncTimestampStore { + private val lastFullSync: ConcurrentHashMap = ConcurrentHashMap() + + fun recordFullSync( + resourceName: String, + timestamp: Long, + ) = lastFullSync.put(resourceName, Instant.ofEpochMilli(timestamp)) + + fun getLastFullSync(resourceName: String): Instant? = lastFullSync[resourceName] +} diff --git a/src/main/java/no/fintlabs/consumer/kafka/sync/SyncTrackerService.kt b/src/main/java/no/fintlabs/consumer/kafka/sync/SyncTrackerService.kt index 3ad8b1d8..2c413059 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/sync/SyncTrackerService.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/sync/SyncTrackerService.kt @@ -37,6 +37,7 @@ import java.time.Duration class SyncTrackerService( private val evictionService: CacheEvictionService, private val syncStatusProducer: SyncStatusProducer, + private val syncTimestampStore: SyncTimestampStore, private val meterRegistry: MeterRegistry, caffeineCacheProperties: CaffeineCacheProperties, ) { @@ -144,11 +145,13 @@ class SyncTrackerService( resourceName, newSyncState.processedCount, ) + timed(resourceName, syncType, "sync.full.evictExpired") { evictionService.evictExpired(resourceName, newSyncState.startTimestamp) } timed(resourceName, syncType, "sync.full.removeTracking") { fullSyncPerResourceName.remove(resourceName) + syncTimestampStore.recordFullSync(resourceName, newSyncState.startTimestamp) } timed(resourceName, syncType, "sync.status.publish.completed") { syncStatusProducer.publish(SyncStatus(correlationId, newSyncState.syncType, "Completed")) diff --git a/src/main/java/no/fintlabs/consumer/resource/ResourceController.kt b/src/main/java/no/fintlabs/consumer/resource/ResourceController.kt index 586ad5d9..455c0b76 100644 --- a/src/main/java/no/fintlabs/consumer/resource/ResourceController.kt +++ b/src/main/java/no/fintlabs/consumer/resource/ResourceController.kt @@ -80,9 +80,9 @@ class ResourceController( fun getLastUpdated( @PathVariable resource: String, ): ResponseEntity = - resourceService.getLastUpdated(resource).let { - ResponseEntity.ok(LastUpdatedResponse(it)) - } + resourceService + .getLastUpdated(resource) + .let { lastUpdatedResponse -> ResponseEntity.ok(lastUpdatedResponse) } @GetMapping(EndpointsConstants.CACHE_SIZE) fun getResourceCacheSize( diff --git a/src/main/java/no/fintlabs/consumer/resource/ResourceService.kt b/src/main/java/no/fintlabs/consumer/resource/ResourceService.kt index f77f2514..55cd7b48 100644 --- a/src/main/java/no/fintlabs/consumer/resource/ResourceService.kt +++ b/src/main/java/no/fintlabs/consumer/resource/ResourceService.kt @@ -1,15 +1,19 @@ package no.fintlabs.consumer.resource import no.fintlabs.cache.CacheService +import no.fintlabs.consumer.kafka.sync.SyncTimestampStore import no.fintlabs.consumer.links.LinkService +import no.fintlabs.consumer.resource.dto.LastUpdatedResponse import no.fintlabs.model.resource.FintResources import no.novari.fint.model.resource.FintResource import org.springframework.stereotype.Service +import java.util.Date @Service class ResourceService( private val linkService: LinkService, private val cacheService: CacheService, + private val syncTimestampStore: SyncTimestampStore, ) { fun getResources( resourceName: String, @@ -29,7 +33,11 @@ class ResourceService( idValue: String, ): FintResource? = cacheService.getCache(resourceName).getByIdField(idField, idValue) - fun getLastUpdated(resourceName: String): Long = cacheService.getCache(resourceName).lastUpdated + fun getLastUpdated(resourceName: String): LastUpdatedResponse { + val lastUpdated = cacheService.getCache(resourceName).lastUpdated + val lastFullSync = syncTimestampStore.getLastFullSync(resourceName) + return LastUpdatedResponse(lastUpdated, lastFullSync?.let { Date.from(it) } ?: Date(0)) + } fun getCacheSize(resourceName: String): Int = cacheService.getCache(resourceName).size } diff --git a/src/main/java/no/fintlabs/consumer/resource/dto/LastUpdatedResponse.kt b/src/main/java/no/fintlabs/consumer/resource/dto/LastUpdatedResponse.kt index 1c0290fa..3574885f 100644 --- a/src/main/java/no/fintlabs/consumer/resource/dto/LastUpdatedResponse.kt +++ b/src/main/java/no/fintlabs/consumer/resource/dto/LastUpdatedResponse.kt @@ -1,5 +1,8 @@ package no.fintlabs.consumer.resource.dto +import java.util.Date + data class LastUpdatedResponse( val lastUpdated: Long, + val lastCompletedFullSync: Date, ) diff --git a/src/test/java/no/fintlabs/cache/CacheEvictionServiceTest.kt b/src/test/java/no/fintlabs/cache/CacheEvictionServiceTest.kt index 8280b0cc..b8cafb52 100644 --- a/src/test/java/no/fintlabs/cache/CacheEvictionServiceTest.kt +++ b/src/test/java/no/fintlabs/cache/CacheEvictionServiceTest.kt @@ -9,6 +9,7 @@ import io.mockk.verify import no.fintlabs.autorelation.RelationEventService import no.fintlabs.consumer.config.ConsumerConfiguration import no.fintlabs.consumer.config.OrgId +import no.fintlabs.consumer.resource.context.ResourceContext import no.novari.fint.model.resource.FintResource import no.novari.fint.model.resource.utdanning.vurdering.ElevfravarResource import org.junit.jupiter.api.AfterEach @@ -25,10 +26,12 @@ class CacheEvictionServiceTest { private lateinit var relationEventService: RelationEventService private lateinit var consumerConfiguration: ConsumerConfiguration private lateinit var cacheEvictionService: CacheEvictionService + private lateinit var resourceContext: ResourceContext @BeforeEach fun setUp() { - cacheService = CacheService() + resourceContext = mockk(relaxed = true) + cacheService = CacheService(resourceContext) relationEventService = mockk(relaxed = true) consumerConfiguration = mockk { diff --git a/src/test/java/no/fintlabs/consumer/kafka/sync/SyncTrackerServiceTest.kt b/src/test/java/no/fintlabs/consumer/kafka/sync/SyncTrackerServiceTest.kt index b91f471a..5cd363c3 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/sync/SyncTrackerServiceTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/sync/SyncTrackerServiceTest.kt @@ -33,6 +33,7 @@ class SyncTrackerServiceTest { private lateinit var evictionService: CacheEvictionService private lateinit var syncStatusProducer: SyncStatusProducer private lateinit var syncTracker: SyncTrackerService + private lateinit var syncTimestampStore: SyncTimestampStore private val meterRegistry = SimpleMeterRegistry() private val cacheProperties: CaffeineCacheProperties = CaffeineCacheProperties() private val resourceName = "elevfravar" @@ -41,7 +42,9 @@ class SyncTrackerServiceTest { fun setUp() { evictionService = mockk(relaxed = true) syncStatusProducer = mockk(relaxed = true) - syncTracker = SyncTrackerService(evictionService, syncStatusProducer, meterRegistry, cacheProperties) + syncTimestampStore = mockk(relaxed = true) + syncTracker = + SyncTrackerService(evictionService, syncStatusProducer, syncTimestampStore, meterRegistry, cacheProperties) } @Test diff --git a/src/test/java/no/fintlabs/consumer/resource/ResourceServiceTest.kt b/src/test/java/no/fintlabs/consumer/resource/ResourceServiceTest.kt index c7108241..eb6da89b 100644 --- a/src/test/java/no/fintlabs/consumer/resource/ResourceServiceTest.kt +++ b/src/test/java/no/fintlabs/consumer/resource/ResourceServiceTest.kt @@ -4,6 +4,7 @@ import io.mockk.every import io.mockk.mockk import no.fintlabs.cache.CacheService import no.fintlabs.cache.FintCache +import no.fintlabs.consumer.kafka.sync.SyncTimestampStore import no.fintlabs.consumer.links.LinkService import no.fintlabs.model.resource.FintResources import no.novari.fint.model.resource.FintResource @@ -13,7 +14,8 @@ import org.junit.jupiter.api.Test class ResourceServiceTest { private val linkService = mockk() private val cacheService = mockk() - private val resourceService = ResourceService(linkService, cacheService) + private val syncTimestampStore = mockk() + private val resourceService = ResourceService(linkService, cacheService, syncTimestampStore) @Test fun `getResources fetches from cache and transforms through linkService`() {