Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package no.fintlabs.config

import no.fintlabs.utils.EntityProducer
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.context.annotation.Import

@TestConfiguration
@Import(EntityProducer::class)
class KafkaTestConfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package no.fintlabs.consumer.integration

import com.fasterxml.jackson.databind.ObjectMapper
import no.fintlabs.adapter.models.sync.SyncType
import no.fintlabs.cache.CacheService
import no.fintlabs.config.KafkaTestConfig
import no.fintlabs.consumer.admin.CacheEntry
import no.fintlabs.consumer.resource.dto.LastUpdatedResponse
import no.fintlabs.utils.EntityProducer
import no.novari.fint.model.felles.kompleksedatatyper.Identifikator
import no.novari.fint.model.resource.utdanning.vurdering.ElevfravarResource
import org.awaitility.kotlin.await
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.web.server.LocalServerPort
import org.springframework.context.annotation.Import
import org.springframework.core.ParameterizedTypeReference
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ActiveProfiles
import org.springframework.test.web.reactive.server.WebTestClient
import org.springframework.test.web.reactive.server.expectBody
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertTrue

@EmbeddedKafka(
partitions = 1,
topics = ["fintlabs-no.fint-core.entity.utdanning-vurdering"],
)
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = [
"fint.security.enabled=false",
],
)
@ActiveProfiles("utdanning-vurdering")
@Import(KafkaTestConfig::class)
class SyncTimestampIT {
@LocalServerPort
private var port: Int = 0

private val client by lazy {
WebTestClient.bindToServer().baseUrl("http://localhost:$port").build()
}

private val resourceName = "elevfravar"

@Autowired
lateinit var entityProducer: EntityProducer

@Autowired
lateinit var objectMapper: ObjectMapper

@Autowired
lateinit var cacheService: CacheService

@AfterEach
fun tearDown() {
cacheService.getCache(resourceName).evictExpired(Long.MAX_VALUE)
}

@Test
@DirtiesContext(methodMode = DirtiesContext.MethodMode.BEFORE_METHOD)
fun `before any full sync, lastFullSync is epoch in last-updated response`() {
val lastUpdated = getLastUpdated()
assertEquals(
0L,
lastUpdated.lastCompletedFullSync.time,
"lastCompletedFullSync should be epoch before any full sync",
)
}

@Test
@DirtiesContext(methodMode = DirtiesContext.MethodMode.BEFORE_METHOD)
fun `before any full sync, admin cache status shows epoch for lastFullSync`() {
val cacheStatus = getCacheStatus()
val fagEntry = cacheStatus[resourceName] ?: error("No fag entry in cache status")
assertEquals(
0L,
fagEntry.lastCompletedFullSync.time,
"lastCompletedFullSync should be epoch before any full sync",
)
}

@Test
fun `after full sync completes, last-updated response contains sync timestamp`() {
val syncTimestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli()
sendFullSync(timestamp = syncTimestamp)

await.atMost(Duration.ofSeconds(10)).untilAsserted {
val lastUpdated = getLastUpdated()
assertTrue(
lastUpdated.lastCompletedFullSync.time > 0,
"lastCompletedFullSync should be non-epoch after full sync",
)
assertEquals(
syncTimestamp,
lastUpdated.lastCompletedFullSync.time,
"lastCompletedFullSync should match the timestamp of the completed sync",
)
}
}

@Test
fun `after full sync completes, admin cache status contains sync timestamp`() {
val syncTimestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli()
sendFullSync(timestamp = syncTimestamp)

await.atMost(Duration.ofSeconds(10)).untilAsserted {
val fagEntry = getCacheStatus()[resourceName] ?: error("No fag entry in cache status")
assertTrue(
fagEntry.lastCompletedFullSync.time > 0,
"lastCompletedFullSync should be non-epoch after full sync",
)
assertEquals(
syncTimestamp,
fagEntry.lastCompletedFullSync.time,
"lastCompletedFullSync in admin cache status should match the completed sync timestamp",
)
}
}

@Test
fun `second full sync updates lastFullSync to the newer timestamp`() {
val firstTimestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli()
sendFullSync(timestamp = firstTimestamp)

await.atMost(Duration.ofSeconds(10)).untilAsserted {
assertEquals(firstTimestamp, getLastUpdated().lastCompletedFullSync.time)
}

val corrId = UUID.randomUUID().toString()
val secondTimestamp = firstTimestamp + 5_000
sendFullSync(timestamp = secondTimestamp, syncCorrId = corrId, syncTotalSize = 2)
sendFullSync(timestamp = secondTimestamp, syncCorrId = corrId, syncTotalSize = 2)

await.atMost(Duration.ofSeconds(10)).untilAsserted {
assertEquals(
secondTimestamp,
getLastUpdated().lastCompletedFullSync.time,
"lastCompletedFullSync should be updated to the second sync's timestamp",
)
}
}

private fun getLastUpdated(): LastUpdatedResponse =
client
.get()
.uri("/utdanning/vurdering/elevfravar/last-updated")
.exchange()
.expectStatus()
.isOk
.expectBody<LastUpdatedResponse>()
.returnResult()
.responseBody!!

private fun getCacheStatus(): Map<String, CacheEntry> =
client
.get()
.uri("/utdanning/vurdering/admin/cache/status")
.exchange()
.expectStatus()
.isOk
.expectBody(object : ParameterizedTypeReference<Map<String, CacheEntry>>() {})
.returnResult()
.responseBody!!

private fun sendFullSync(
timestamp: Long = Instant.now().toEpochMilli(),
syncTotalSize: Long = 1,
syncCorrId: String = UUID.randomUUID().toString(),
) {
val resourceId = UUID.randomUUID().toString()
val resource =
ElevfravarResource().apply {
systemId =
Identifikator().apply {
identifikatorverdi = resourceId
}
}

entityProducer
.publish(
resourceName,
resource,
resourceId,
SyncType.FULL,
syncCorrId,
syncTotalSize,
timestamp,
).get(10, TimeUnit.SECONDS)
}
}
77 changes: 77 additions & 0 deletions src/integrationTest/java/no/fintlabs/utils/EntityProducer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package no.fintlabs.utils

import no.fintlabs.adapter.models.sync.SyncType
import no.fintlabs.consumer.config.ConsumerConfiguration
import no.fintlabs.consumer.kafka.KafkaConstants.LAST_MODIFIED
import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME
import no.fintlabs.consumer.kafka.KafkaConstants.SYNC_CORRELATION_ID
import no.fintlabs.consumer.kafka.KafkaConstants.SYNC_TOTAL_SIZE
import no.fintlabs.consumer.kafka.KafkaConstants.SYNC_TYPE
import no.fintlabs.consumer.kafka.entity.ENTITY_KEY_DELIMITER
import no.novari.kafka.producing.ParameterizedProducerRecord
import no.novari.kafka.producing.ParameterizedTemplateFactory
import no.novari.kafka.topic.name.EntityTopicNameParameters
import no.novari.kafka.topic.name.TopicNamePrefixParameters
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.springframework.boot.test.context.TestComponent
import org.springframework.kafka.support.SendResult
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture

@TestComponent
class EntityProducer(
parameterizedTemplateFactory: ParameterizedTemplateFactory,
private val consumerConfig: ConsumerConfiguration,
) {
private val producer = parameterizedTemplateFactory.createTemplate(Any::class.java)

fun publish(
resourceName: String,
resource: Any?,
resourceId: String,
syncType: SyncType,
syncCorrId: String,
syncTotalSize: Long,
timestamp: Long = System.currentTimeMillis(),
domainName: String = consumerConfig.domain,
packageName: String = consumerConfig.packageName,
): CompletableFuture<SendResult<String, in Any?>> =
producer.send(
ParameterizedProducerRecord
.builder<Any>()
.key("$resourceName$ENTITY_KEY_DELIMITER$resourceId")
.headers(createSyncHeaders(resourceName, syncType, syncCorrId, syncTotalSize, timestamp))
.topicNameParameters(createEntityTopicNameParameters(domainName, packageName))
.value(resource)
.build(),
)

private fun createSyncHeaders(
resourceName: String,
syncType: SyncType,
syncCorrId: String,
syncTotalSize: Long,
timestamp: Long,
) = RecordHeaders().apply {
add(RecordHeader(RESOURCE_NAME, resourceName.toByteArray()))
add(SYNC_TYPE, byteArrayOf(syncType.ordinal.toByte()))
add(SYNC_CORRELATION_ID, syncCorrId.toByteArray())
add(SYNC_TOTAL_SIZE, ByteBuffer.allocate(Long.SIZE_BYTES).putLong(syncTotalSize).array())
add(LAST_MODIFIED, ByteBuffer.allocate(8).putLong(timestamp).array())
}

private fun createEntityTopicNameParameters(
domainName: String,
packageName: String,
) = EntityTopicNameParameters
.builder()
.topicNamePrefixParameters(
TopicNamePrefixParameters
.stepBuilder()
.orgId(consumerConfig.orgId.asTopicSegment)
.domainContextApplicationDefault()
.build(),
).resourceName("$domainName-$packageName")
.build()
}
8 changes: 6 additions & 2 deletions src/main/java/no/fintlabs/cache/CacheService.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package no.fintlabs.cache

import no.fintlabs.consumer.resource.context.ResourceContext
import no.novari.fint.model.resource.FintResource
import org.springframework.stereotype.Service
import java.util.concurrent.ConcurrentHashMap

@Service
class CacheService {
class CacheService(
resourceContext: ResourceContext,
) {
/** Eagerly initializes a [FintCache] for each resource name configured in [ResourceContext]. */
private val resourceCaches: MutableMap<String, FintCache<FintResource>> =
ConcurrentHashMap<String, FintCache<FintResource>>()
resourceContext.resourceNames.associateWithTo(ConcurrentHashMap()) { FintCache() }

fun getCachedResourceNames(): Set<String> = resourceCaches.keys

Expand Down
5 changes: 4 additions & 1 deletion src/main/java/no/fintlabs/consumer/admin/AdminController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package no.fintlabs.consumer.admin
import no.fintlabs.cache.CacheService
import no.fintlabs.consumer.config.ConsumerConfiguration
import no.fintlabs.consumer.config.EndpointsConstants
import no.fintlabs.consumer.kafka.sync.SyncTimestampStore
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
Expand All @@ -17,6 +18,7 @@ import java.util.Date
class AdminController(
private val cacheService: CacheService,
private val configuration: ConsumerConfiguration,
private val syncTimestampStore: SyncTimestampStore,
) {
@GetMapping("/health")
fun healthChecks(): ResponseEntity<*>? = null // TODO: Implement when status service is working!
Expand Down Expand Up @@ -50,7 +52,8 @@ class AdminController(
fun cacheStatus(): Map<String, CacheEntry> =
cacheService.getCachedResourceNames().associateWith { resourceName ->
val cache = cacheService.getCache(resourceName)
CacheEntry(Date(cache.lastUpdated), cache.size)
val lastFullSync = syncTimestampStore.getLastFullSync(resourceName)?.let { Date.from(it) } ?: Date(0)
CacheEntry(Date(cache.lastUpdated), lastFullSync, cache.size)
}

@PostMapping("/cache/rebuild", "/cache/rebuild/{model}")
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/no/fintlabs/consumer/admin/CacheEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@

import java.util.Date;

public record CacheEntry(Date lastUpdated, Integer size) {
public record CacheEntry(Date lastUpdated, Date lastCompletedFullSync, Integer size) {
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package no.fintlabs.consumer.kafka.sync

import org.springframework.stereotype.Component
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap

/**
* Tracks the most recent successfully completed full sync timestamp for each resource.
*
* Example:
* ```
* store.recordFullSync("employee", 1710758400000L)
* store.getLastFullSync("employee") // -> 2024-03-18T08:00:00Z
* store.getLastFullSync("unknown") // -> null
* ```
*/
@Component
class SyncTimestampStore {
private val lastFullSync: ConcurrentHashMap<String, Instant> = ConcurrentHashMap()

fun recordFullSync(
resourceName: String,
timestamp: Long,
) = lastFullSync.put(resourceName, Instant.ofEpochMilli(timestamp))

fun getLastFullSync(resourceName: String): Instant? = lastFullSync[resourceName]
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import java.time.Duration
class SyncTrackerService(
private val evictionService: CacheEvictionService,
private val syncStatusProducer: SyncStatusProducer,
private val syncTimestampStore: SyncTimestampStore,
private val meterRegistry: MeterRegistry,
caffeineCacheProperties: CaffeineCacheProperties,
) {
Expand Down Expand Up @@ -144,11 +145,13 @@ class SyncTrackerService(
resourceName,
newSyncState.processedCount,
)

timed(resourceName, syncType, "sync.full.evictExpired") {
evictionService.evictExpired(resourceName, newSyncState.startTimestamp)
}
timed(resourceName, syncType, "sync.full.removeTracking") {
fullSyncPerResourceName.remove(resourceName)
syncTimestampStore.recordFullSync(resourceName, newSyncState.startTimestamp)
}
timed(resourceName, syncType, "sync.status.publish.completed") {
syncStatusProducer.publish(SyncStatus(correlationId, newSyncState.syncType, "Completed"))
Expand Down
Loading
Loading