diff --git a/docs/kafka-health-checks.md b/docs/kafka-health-checks.md new file mode 100644 index 00000000..a0379a8f --- /dev/null +++ b/docs/kafka-health-checks.md @@ -0,0 +1,268 @@ +# Kafka Health Checks + +Dette dokumentet beskriver hvordan health-checkene i `fint-core-consumer` fungerer etter innføringen av Kafka-basert readiness og liveness. + +## Oversikt + +Applikasjonen bruker tre forskjellige typer health/probes i Kubernetes: + +- `startupProbe`: brukes bare helt tidlig for å verifisere at JVM og Spring Boot faktisk starter. +- `readinessProbe`: brukes for å avgjøre om poden trygt kan motta trafikk. +- `livenessProbe`: brukes for å avgjøre om poden fortsatt lever, eller om Kubernetes skal restarte den. + +Disse probe-typene har forskjellig ansvar og skal ikke blandes: + +- `startupProbe` skal ikke vite noe om hvor langt Kafka-consumerne har kommet i bootstrap. +- `readinessProbe` skal blokkere trafikk til initial bootstrap er ferdig. +- `livenessProbe` skal ikke feile bare fordi applikasjonen ligger litt etter i konsumering; den skal feile hvis Kafka-consumerne i praksis har sluttet å fungere over tid. + +## Hvordan Consumer bruker dem + +Consumer eksponerer actuator-endepunktene: + +- `/actuator/health/readiness` +- `/actuator/health/liveness` + +Readiness er koblet til en initial Kafka-bootstrap-tracker. Liveness er koblet til en separat Kafka-runtime-monitor. + +## Readiness + +### Hensikt + +Readiness skal beskytte trafikk mot en pod som ennå ikke har bygd opp lokal cache ved oppstart. + +### Hvordan den virker + +Ved oppstart settes readiness til `REFUSING_TRAFFIC`. + +To listeners er definert som blokkerende for initial bootstrap: + +- `entity` +- `relation-update` + +For hver assigned partition hentes "startup end offset" fra Kafka i det assignment skjer. Deretter spores prosesserte offsets mens records behandles. + +Listeneren regnes som ferdig når alle dens assigned partitions har konsumert seg opp til offseten som gjaldt ved oppstartstidspunktet. + +Applikasjonen regnes som `ready` når alle blokkerende listeners er ferdige. + +### Viktig nyanse + +Dette er en `initial-only` readiness. + +Det betyr: + +- Readiness blokkerer ved oppstart. +- Readiness går til `UP` når initial bootstrap er ferdig. +- Readiness går ikke ned igjen senere bare fordi det kommer flere meldinger, full sync, eller midlertidig lag. + +Dette er bevisst. Etter at poden er sluppet i trafikk, skal vanlig Kafka-lag ikke stoppe lesetrafikk. + +### Hva får readiness til å feile + +Readiness blir `OUT_OF_SERVICE` hvis minst ett av disse forholdene gjelder under oppstart: + +- `entity` har ikke konsumert alle sine assigned partitions opp til startup-end-offset. +- `relation-update` har ikke konsumert alle sine assigned partitions opp til startup-end-offset. +- Kafka end-offset kan ikke hentes for assigned partitions. + +### Hva får ikke readiness til å feile + +Følgende forhold feiler ikke readiness etter at bootstrap er ferdig: + +- En ny full sync kommer inn og skaper lag. +- Det produseres mange meldinger mens appen kjører. +- Consumeren ligger midlertidig etter på topicet. +- Topicet er stille i lang tid. + +## Liveness + +### Hensikt + +Liveness skal oppdage at Kafka-consumerne i praksis har sluttet å fungere, og gi Kubernetes grunnlag for å restarte poden. + +### Hvordan den virker + +Liveness monitorerer runtime-status for registrerte Kafka-listeners. + +Den ser ikke på vanlig lag eller antall uprosesserte meldinger. I stedet ser den på Kafka-runtime-signaler: + +- `ConsumerStartedEvent` +- `ListenerContainerIdleEvent` +- `NonResponsiveConsumerEvent` +- `ConsumerFailedToStartEvent` +- `ConsumerStoppedEvent` + +Det brukes en grace-periode, default `15m`, for å unngå falske positive ved korte forstyrrelser. + +### Hva som holder liveness frisk + +Liveness holdes `UP` hvis en listener viser tegn til normal drift, for eksempel: + +- appen prosesserer records +- listeneren sender idle-events fordi topicet er stille +- consumer-containeren starter normalt + +Det betyr at stille topics ikke i seg selv gjør poden unhealthy. + +### Hva får liveness til å feile + +Liveness blir `DOWN` hvis en registrert listener har en runtime-feil som varer lenger enn konfigurert grace-periode. + +Eksempler: + +- `NonResponsiveConsumerEvent` og tilstanden varer lenger enn grace-perioden +- `ConsumerFailedToStartEvent` +- `ConsumerStoppedEvent` med annen grunn enn `NORMAL` + +Typiske scenarioer dette er ment å fange: + +- nettverksbrudd mellom pod og Kafka +- Kafka svarer ikke over tid +- autentiseringsfeil mot Kafka +- consumer-container stopper på grunn av feil + +### Hva får ikke liveness til å feile + +Følgende forhold skal ikke alene feile liveness: + +- vanlig Kafka-lag +- full sync som gjør at consumeren henger litt etter +- ingen nye meldinger på topicet i flere timer +- normal rebalance mellom pods + +En vanlig rebalance håndteres av partition assignment/revocation, ikke som en fatal liveness-feil. + +## Startup Probe + +### Hensikt + +`startupProbe` bør bare brukes som en enkel oppstartssperre mens JVM og Spring Boot kommer opp. + +Den bør ikke inneholde Kafka-bootstrap-logikk. Grunnen er at `startupProbe` bare brukes i den tidlige oppstartsfasen, mens readiness kan uttrykke "ikke klar enda" på en mer presis måte. + +### Anbefaling + +Bruk `startupProbe` mot en enkel actuator-health, mens readiness og liveness peker mot de dedikerte probe-endepunktene. + +## Konfigurasjon i Consumer + +Default konfigurasjon i applikasjonen: + +```yaml +fint: + consumer: + health: + kafka: + idle-event-interval: 1m + runtime-grace-period: 15m + monitor-interval-seconds: 30 + no-poll-threshold: 3.0 + +management: + endpoint: + health: + probes: + enabled: true + group: + readiness: + include: readinessState,initialKafkaBootstrap + liveness: + include: livenessState,kafkaRuntime +``` + +### Hva disse Kafka-innstillingene betyr + +- `idle-event-interval`: hvor ofte idle-event sendes mens et topic er stille. +- `runtime-grace-period`: hvor lenge runtime-feil kan vare før liveness går ned. +- `monitor-interval-seconds`: hvor ofte Spring Kafka sjekker consumerens poll-aktivitet. +- `no-poll-threshold`: terskel for når manglende poll anses som "non-responsive". + +## Metrikker + +I tillegg til actuator-health eksponerer applikasjonen nå Micrometer-metrikker for Kafka-bootstrap og Kafka-runtime-health. Disse er nyttige fordi health-endepunktene bare viser nåværende status, mens metrikker gjør det mulig å følge utvikling over tid i Prometheus og Grafana. + +### Bootstrap-metrikker + +- `fint.consumer.kafka.bootstrap.state` + Gauge per listener. `1` betyr at initial bootstrap fortsatt pågår, `0` betyr at den er ferdig. + +- `fint.consumer.kafka.bootstrap.partitions.pending` + Gauge per listener. Antall assigned partitions som ennå ikke har konsumert seg opp til startup-end-offset. + +- `fint.consumer.kafka.bootstrap.completed` + Counter per listener, og også for `listener=all`. Incrementes når bootstrap fullføres. + +- `fint.consumer.kafka.bootstrap.duration` + Timer per listener, og også for `listener=all`. Måler hvor lang tid initial bootstrap faktisk tok. + +- `fint.consumer.kafka.bootstrap.end_offset.lookup.failures` + Counter per listener. Incrementes når applikasjonen ikke klarer å hente startup end offset fra Kafka. + +### Runtime-metrikker + +- `fint.consumer.kafka.runtime.problem` + Counter med tags `listener` og `reason`. Incrementes når runtime-monitoren ser et problem, for eksempel `NON_RESPONSIVE` eller `STOPPED_AUTH`. + +- `fint.consumer.kafka.runtime.unhealthy` + Gauge per listener. `1` betyr at listeneren har vært i problemtilstand lenger enn grace-perioden og dermed gjør liveness `DOWN`. + +- `fint.consumer.kafka.runtime.problem.duration` + Gauge per listener. Viser hvor lenge den nåværende problemtilstanden har vart, i millisekunder. + +### Viktige tags + +Metrikkene er bevisst tagget lavt-kardinalt: + +- `listener` +- `reason` + +Det brukes ikke tags som Kafka-key, partition eller corrId, for å unngå høy kardinalitet og unødvendig støy i metrics-backend. + +## Eksempel i Kubernetes + +Eksempel på probe-oppsett: + +```yaml +startupProbe: + httpGet: + path: /utdanning/vurdering/actuator/health + port: 8080 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 60 + +readinessProbe: + httpGet: + path: /utdanning/vurdering/actuator/health/readiness + port: 8080 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + +livenessProbe: + httpGet: + path: /utdanning/vurdering/actuator/health/liveness + port: 8080 + periodSeconds: 30 + timeoutSeconds: 3 + failureThreshold: 3 +``` + +## Praktiske konsekvenser + +Med dette oppsettet blir flyten typisk slik: + +1. Poden starter. +2. `startupProbe` verifiserer at appen faktisk kommer opp. +3. `readinessProbe` holder poden ute av trafikk mens `entity` og `relation-update` bygger initial cache. +4. Når initial bootstrap er ferdig, blir poden `Ready`. +5. Senere full sync eller vanlig Kafka-lag påvirker ikke readiness. +6. Hvis Kafka-consumerne blir ikke-responsive eller stopper over tid, blir `liveness` `DOWN` og Kubernetes kan restarte poden. + +## Oppsummering + +- `startupProbe` beskytter bare oppstart av prosessen. +- `readinessProbe` beskytter trafikk under initial Kafka-bootstrap. +- `livenessProbe` beskytter mot vedvarende Kafka-runtime-feil. +- Vanlig lag eller stille topics skal ikke gjøre poden unhealthy. diff --git a/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt b/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt index a1c2e15f..9efb31a2 100644 --- a/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt +++ b/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.autorelation.kafka import no.fintlabs.autorelation.RelationEventService import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.entity.extractIdentifier @@ -21,18 +24,22 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class AutoRelationEntityConsumer( private val consumerConfig: ConsumerConfiguration, private val relationEventService: RelationEventService, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(AutoRelationEntityConsumer::class.java) private const val CONSUMER_NAME = "autorelation-entity" } - @Bean + @Bean(name = [KafkaListenerIds.AUTORELATION_ENTITY]) fun buildAutoRelationConsumer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.AUTORELATION_ENTITY) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( Any::class.java, this::consumeRecord, @@ -54,6 +61,7 @@ class AutoRelationEntityConsumer( CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EntityTopicNameParameters .builder() @@ -66,6 +74,7 @@ class AutoRelationEntityConsumer( ).resourceName("${consumerConfig.domain}-${consumerConfig.packageName}") .build(), ).apply { concurrency = consumerConfig.kafka.entityConcurrency } + } fun consumeRecord(consumerRecord: ConsumerRecord) { consumerRecord @@ -77,6 +86,7 @@ class AutoRelationEntityConsumer( resource, ) } + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.AUTORELATION_ENTITY) } private fun ConsumerRecord.getResourceName(): String = diff --git a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt index 886d4b9c..95624519 100644 --- a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt +++ b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt @@ -3,6 +3,10 @@ package no.fintlabs.autorelation.kafka import no.fintlabs.autorelation.AutoRelationService import no.fintlabs.autorelation.model.RelationUpdate import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.KafkaThroughputMetrics import no.novari.kafka.consuming.ErrorHandlerFactory @@ -23,13 +27,16 @@ class RelationUpdateConsumer( private val autoRelationService: AutoRelationService, private val consumerConfig: ConsumerConfiguration, private val kafkaThroughputMetrics: KafkaThroughputMetrics, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(RelationUpdateConsumer::class.java) private const val CONSUMER_NAME = "relation-update" } - @Bean + @Bean(name = [KafkaListenerIds.RELATION_UPDATE]) @ConditionalOnProperty( name = ["fint.consumer.autorelation"], havingValue = "true", @@ -38,8 +45,11 @@ class RelationUpdateConsumer( fun relationUpdateConsumerContainer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.RELATION_UPDATE) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.RELATION_UPDATE) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( RelationUpdate::class.java, this::consumeRecord, @@ -48,14 +58,21 @@ class RelationUpdateConsumer( .groupIdApplicationDefault() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned( + KafkaListenerIds.RELATION_UPDATE, + assignments.keys, + ) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.RELATION_UPDATE, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EntityTopicNamePatternParameters .builder() @@ -68,7 +85,8 @@ class RelationUpdateConsumer( // Makes sure we listen to component patterns such as utdanning-vurdering'-relation-update' ).resource(TopicNamePatternParameterPattern.endingWith("-relation-update")) .build(), - ).apply { consumerConfig.kafka.relationConcurrency } + ).apply { concurrency = consumerConfig.kafka.relationConcurrency } + } fun consumeRecord(consumerRecord: ConsumerRecord) { val startedAt = System.nanoTime() @@ -76,6 +94,8 @@ class RelationUpdateConsumer( if (relationUpdate == null) { kafkaThroughputMetrics.recordRelationUpdateConsumer(null, "ignored_null", System.nanoTime() - startedAt) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) return } @@ -85,6 +105,8 @@ class RelationUpdateConsumer( "ignored_foreign_component", System.nanoTime() - startedAt, ) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) return } @@ -95,12 +117,15 @@ class RelationUpdateConsumer( "processed", System.nanoTime() - startedAt, ) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) } catch (ex: Exception) { kafkaThroughputMetrics.recordRelationUpdateConsumer( relationUpdate.targetEntity.resourceName, "failed", System.nanoTime() - startedAt, ) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) throw ex } } diff --git a/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt new file mode 100644 index 00000000..c283e012 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt @@ -0,0 +1,8 @@ +package no.fintlabs.consumer.health + +data class BootstrapPartitionStatus( + val partition: String, + val endOffset: Long, + val processedOffset: Long?, + val caughtUp: Boolean, +) diff --git a/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt b/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt new file mode 100644 index 00000000..1b352953 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt @@ -0,0 +1,6 @@ +package no.fintlabs.consumer.health + +data class BootstrapReadinessSnapshot( + val ready: Boolean, + val blockingListeners: List, +) diff --git a/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt new file mode 100644 index 00000000..88268de1 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt @@ -0,0 +1,42 @@ +package no.fintlabs.consumer.health + +import jakarta.annotation.PreDestroy +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.OffsetSpec +import org.apache.kafka.common.TopicPartition +import org.springframework.boot.autoconfigure.kafka.KafkaProperties +import org.springframework.stereotype.Component +import java.time.Duration +import java.util.concurrent.TimeUnit + +interface EndOffsetProvider { + fun latestOffsets(partitions: Set): Map +} + +@Component +class KafkaAdminEndOffsetProvider( + kafkaProperties: KafkaProperties, +) : EndOffsetProvider { + private val adminClient = AdminClient.create(kafkaProperties.buildAdminProperties(null)) + + override fun latestOffsets(partitions: Set): Map { + if (partitions.isEmpty()) { + return emptyMap() + } + + return adminClient + .listOffsets(partitions.associateWith { OffsetSpec.latest() }) + .all() + .get(TIMEOUT.toSeconds(), TimeUnit.SECONDS) + .mapValues { (_, result) -> result.offset() } + } + + @PreDestroy + fun close() { + adminClient.close(TIMEOUT) + } + + companion object { + private val TIMEOUT = Duration.ofSeconds(10) + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt new file mode 100644 index 00000000..fbe89c6c --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt @@ -0,0 +1,31 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.actuate.health.Health +import org.springframework.boot.actuate.health.HealthIndicator +import org.springframework.stereotype.Component + +@Component("initialKafkaBootstrap") +class InitialKafkaBootstrapHealthIndicator( + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, +) : HealthIndicator { + override fun health(): Health { + val snapshot = initialKafkaBootstrapTracker.snapshot() + val builder = if (snapshot.ready) Health.up() else Health.outOfService() + + return builder + .withDetail("ready", snapshot.ready) + .withDetail("blockingListeners", snapshot.blockingListeners.size) + .withDetail( + "listeners", + snapshot.blockingListeners.associate { listener -> + listener.listenerId to + mapOf( + "assignmentSeen" to listener.assignmentSeen, + "completed" to listener.completed, + "assignedPartitions" to listener.assignedPartitions, + "caughtUpPartitions" to listener.caughtUpPartitions, + ) + }, + ).build() + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt new file mode 100644 index 00000000..2756b390 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt @@ -0,0 +1,193 @@ +package no.fintlabs.consumer.health + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.slf4j.LoggerFactory +import org.springframework.boot.availability.AvailabilityChangeEvent +import org.springframework.boot.availability.ReadinessState +import org.springframework.context.ApplicationContext +import org.springframework.stereotype.Service +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlin.math.max + +@Service +class InitialKafkaBootstrapTracker( + private val endOffsetProvider: EndOffsetProvider, + private val applicationContext: ApplicationContext, + private val kafkaHealthMetrics: KafkaHealthMetrics, +) { + private val readinessPublished = AtomicReference(null) + private val bootstrapCompleted = AtomicBoolean(false) + private val blockingListeners = ConcurrentHashMap() + + init { + publishReadiness(false) + } + + fun registerBlockingListener(listenerId: String) { + blockingListeners.computeIfAbsent(listenerId) { ListenerBootstrapState() } + kafkaHealthMetrics.registerBootstrapListener(listenerId) + } + + fun onPartitionsAssigned( + listenerId: String, + assignments: Set, + ) { + if (bootstrapCompleted.get() || assignments.isEmpty()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + val endOffsets = + try { + endOffsetProvider.latestOffsets(assignments) + } catch (exception: RuntimeException) { + logger.error( + "Failed to fetch end offsets for listener={} assignments={}", + listenerId, + assignments, + exception, + ) + kafkaHealthMetrics.recordBootstrapEndOffsetLookupFailure(listenerId) + publishReadiness(false) + return + } + + listenerState.assignmentSeen.set(true) + assignments.forEach { topicPartition -> + listenerState.partitions[topicPartition] = PartitionBootstrapState(endOffsets[topicPartition] ?: 0L) + } + + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun onPartitionsRevoked( + listenerId: String, + partitions: Collection, + ) { + if (bootstrapCompleted.get()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + partitions.forEach(listenerState.partitions::remove) + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun onRecordProcessed( + listenerId: String, + record: ConsumerRecord<*, *>, + ) { + if (bootstrapCompleted.get()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + val topicPartition = TopicPartition(record.topic(), record.partition()) + listenerState.partitions.computeIfPresent(topicPartition) { _, state -> + state.withOffset(record.offset()) + } + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun snapshot(): BootstrapReadinessSnapshot { + val listenerStatuses = + blockingListeners.toSortedMap().map { (listenerId, listenerState) -> + ListenerBootstrapStatus( + listenerId = listenerId, + assignmentSeen = listenerState.assignmentSeen.get(), + completed = listenerState.completed.get(), + assignedPartitions = listenerState.partitions.size, + caughtUpPartitions = listenerState.partitions.values.count(PartitionBootstrapState::caughtUp), + partitions = + listenerState.partitions + .toSortedMap(compareBy({ it.topic() }, { it.partition() })) + .map { (topicPartition, partitionState) -> + BootstrapPartitionStatus( + partition = "${topicPartition.topic()}-${topicPartition.partition()}", + endOffset = partitionState.endOffset, + processedOffset = partitionState.processedOffset, + caughtUp = partitionState.caughtUp, + ) + }, + ) + } + + return BootstrapReadinessSnapshot( + ready = bootstrapCompleted.get(), + blockingListeners = listenerStatuses, + ) + } + + private fun maybeCompleteListener( + listenerId: String, + listenerState: ListenerBootstrapState, + ) { + if ( + !listenerState.completed.get() && + listenerState.assignmentSeen.get() && + listenerState.partitions.values.all(PartitionBootstrapState::caughtUp) + ) { + listenerState.completed.set(true) + kafkaHealthMetrics.markBootstrapCompleted(listenerId) + logger.info("Initial Kafka bootstrap completed for listener={}", listenerId) + } + } + + private fun maybeCompleteBootstrap() { + if ( + !bootstrapCompleted.get() && + blockingListeners.isNotEmpty() && + blockingListeners.values.all { it.completed.get() } + ) { + bootstrapCompleted.set(true) + kafkaHealthMetrics.markBootstrapAllCompleted() + publishReadiness(true) + logger.info("Initial Kafka bootstrap completed for all blocking listeners") + } + } + + private fun pendingPartitions(listenerState: ListenerBootstrapState): Int { + return listenerState.partitions.values.count { !it.caughtUp } + } + + private fun publishReadiness(ready: Boolean) { + val changed = readinessPublished.getAndSet(ready) != ready + if (changed) { + AvailabilityChangeEvent.publish( + applicationContext, + if (ready) ReadinessState.ACCEPTING_TRAFFIC else ReadinessState.REFUSING_TRAFFIC, + ) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(InitialKafkaBootstrapTracker::class.java) + } +} + +private class ListenerBootstrapState { + val completed = AtomicBoolean(false) + val assignmentSeen = AtomicBoolean(false) + val partitions = ConcurrentHashMap() +} + +private data class PartitionBootstrapState( + val endOffset: Long, + val processedOffset: Long? = null, +) { + val caughtUp: Boolean + get() = endOffset == 0L || ((processedOffset ?: -1L) + 1) >= endOffset + + fun withOffset(offset: Long): PartitionBootstrapState { + return copy(processedOffset = processedOffset?.let { max(it, offset) } ?: offset) + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt new file mode 100644 index 00000000..d5a1004b --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt @@ -0,0 +1,11 @@ +package no.fintlabs.consumer.health + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.time.Clock + +@Configuration +class KafkaHealthConfiguration { + @Bean + fun kafkaHealthClock(): Clock = Clock.systemUTC() +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt new file mode 100644 index 00000000..210dcac9 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt @@ -0,0 +1,190 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tag +import io.micrometer.core.instrument.Timer +import org.springframework.stereotype.Component +import java.time.Clock +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +@Component +class KafkaHealthMetrics( + private val meterRegistry: MeterRegistry, + private val clock: Clock, + private val kafkaHealthProperties: KafkaHealthProperties, +) { + private val counters = ConcurrentHashMap() + private val timers = ConcurrentHashMap() + private val bootstrapStates = ConcurrentHashMap() + private val runtimeStates = ConcurrentHashMap() + private val bootstrapAllStartNanos = AtomicLong(System.nanoTime()) + private val bootstrapAllRecorded = AtomicBoolean(false) + + fun registerBootstrapListener(listenerId: String) { + bootstrapStates.computeIfAbsent(listenerId) { + BootstrapMetricState(System.nanoTime()).also { state -> + meterRegistry.gauge( + "fint.consumer.kafka.bootstrap.state", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.inProgress.get().toDouble() } + meterRegistry.gauge( + "fint.consumer.kafka.bootstrap.partitions.pending", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.pendingPartitions.get().toDouble() } + } + } + } + + fun updateBootstrapPendingPartitions( + listenerId: String, + pendingPartitions: Int, + ) { + bootstrapStates[listenerId]?.pendingPartitions?.set(pendingPartitions) + } + + fun markBootstrapCompleted(listenerId: String) { + bootstrapStates[listenerId]?.let { state -> + state.pendingPartitions.set(0) + state.inProgress.set(0) + if (state.completed.compareAndSet(false, true)) { + counter( + "fint.consumer.kafka.bootstrap.completed", + listOf(Tag.of("listener", listenerId)), + ).increment() + timer( + "fint.consumer.kafka.bootstrap.duration", + listOf(Tag.of("listener", listenerId)), + ).record(System.nanoTime() - state.startNanos, TimeUnit.NANOSECONDS) + } + } + } + + fun markBootstrapAllCompleted() { + if (bootstrapAllRecorded.compareAndSet(false, true)) { + counter( + "fint.consumer.kafka.bootstrap.completed", + listOf(Tag.of("listener", "all")), + ).increment() + timer( + "fint.consumer.kafka.bootstrap.duration", + listOf(Tag.of("listener", "all")), + ).record(System.nanoTime() - bootstrapAllStartNanos.get(), TimeUnit.NANOSECONDS) + } + } + + fun recordBootstrapEndOffsetLookupFailure(listenerId: String) { + counter( + "fint.consumer.kafka.bootstrap.end_offset.lookup.failures", + listOf(Tag.of("listener", listenerId)), + ).increment() + } + + fun registerRuntimeListener(listenerId: String) { + runtimeStates.computeIfAbsent(listenerId) { + RuntimeMetricState().also { state -> + meterRegistry.gauge( + "fint.consumer.kafka.runtime.unhealthy", + listOf(Tag.of("listener", listenerId)), + state, + ) { + if (it.isUnhealthy( + clock.millis(), + kafkaHealthProperties.runtimeGracePeriod.toMillis(), + ) + ) { + 1.0 + } else { + 0.0 + } + } + meterRegistry.gauge( + "fint.consumer.kafka.runtime.problem.duration", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.problemDuration(clock.millis()).toDouble() } + } + } + } + + fun markRuntimeHealthy(listenerId: String) { + runtimeStates[listenerId]?.markHealthy(clock.millis()) + } + + fun markRuntimeProblem( + listenerId: String, + reason: String, + ) { + runtimeStates[listenerId]?.markProblem(clock.millis()) + counter( + "fint.consumer.kafka.runtime.problem", + listOf(Tag.of("listener", listenerId), Tag.of("reason", reason)), + ).increment() + } + + private fun counter( + name: String, + tags: List, + ): Counter { + return counters.computeIfAbsent(meterKey(name, tags)) { meterRegistry.counter(name, tags) } + } + + private fun timer( + name: String, + tags: List, + ): Timer { + return timers.computeIfAbsent(meterKey(name, tags)) { meterRegistry.timer(name, tags) } + } + + private fun meterKey( + name: String, + tags: List, + ): String { + return "$name|${tags.joinToString("|") { "${it.key}=${it.value}" }}" + } +} + +private class BootstrapMetricState( + val startNanos: Long, +) { + val pendingPartitions = AtomicInteger(0) + val inProgress = AtomicInteger(1) + val completed = AtomicBoolean(false) +} + +private class RuntimeMetricState { + private val problemSince = AtomicLong(0L) + + fun markHealthy(now: Long) { + problemSince.set(0L) + } + + fun markProblem(now: Long) { + problemSince.compareAndSet(0L, now) + } + + fun problemDuration(now: Long): Long { + return problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it } + ?: 0L + } + + fun isUnhealthy( + now: Long, + gracePeriodMs: Long, + ): Boolean { + return problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it >= gracePeriodMs } + ?: false + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt new file mode 100644 index 00000000..9729bb40 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt @@ -0,0 +1,12 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.context.properties.ConfigurationProperties +import java.time.Duration + +@ConfigurationProperties(prefix = "fint.consumer.health.kafka") +data class KafkaHealthProperties( + val idleEventInterval: Duration = Duration.ofMinutes(1), + val runtimeGracePeriod: Duration = Duration.ofMinutes(15), + val monitorIntervalSeconds: Int = 30, + val noPollThreshold: Float = 3.0f, +) diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt b/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt new file mode 100644 index 00000000..eba005e7 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt @@ -0,0 +1,15 @@ +package no.fintlabs.consumer.health + +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer +import org.springframework.stereotype.Component + +@Component +class KafkaListenerContainerHealthConfigurer( + private val kafkaHealthProperties: KafkaHealthProperties, +) { + fun customize(container: ConcurrentMessageListenerContainer) { + container.containerProperties.idleEventInterval = kafkaHealthProperties.idleEventInterval.toMillis() + container.containerProperties.monitorInterval = kafkaHealthProperties.monitorIntervalSeconds + container.containerProperties.noPollThreshold = kafkaHealthProperties.noPollThreshold + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt b/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt new file mode 100644 index 00000000..d4e15e80 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt @@ -0,0 +1,9 @@ +package no.fintlabs.consumer.health + +object KafkaListenerIds { + const val ENTITY = "resourceEntityConsumerFactory" + const val REQUEST_EVENT = "requestFintEventRequestListenerContainer" + const val RESPONSE_EVENT = "responseFintEventContainerListener" + const val RELATION_UPDATE = "relationUpdateConsumerContainer" + const val AUTORELATION_ENTITY = "buildAutoRelationConsumer" +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt b/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt new file mode 100644 index 00000000..f94705ed --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt @@ -0,0 +1,161 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.actuate.health.Health +import org.springframework.boot.actuate.health.HealthIndicator +import org.springframework.context.event.EventListener +import org.springframework.kafka.event.ConsumerFailedToStartEvent +import org.springframework.kafka.event.ConsumerStartedEvent +import org.springframework.kafka.event.ConsumerStoppedEvent +import org.springframework.kafka.event.KafkaEvent +import org.springframework.kafka.event.ListenerContainerIdleEvent +import org.springframework.kafka.event.NonResponsiveConsumerEvent +import org.springframework.kafka.listener.MessageListenerContainer +import org.springframework.stereotype.Component +import java.time.Clock +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference + +@Component("kafkaRuntime") +class KafkaRuntimeHealthMonitor( + private val kafkaHealthProperties: KafkaHealthProperties, + private val clock: Clock, + private val kafkaHealthMetrics: KafkaHealthMetrics, +) : HealthIndicator { + private val trackedListeners = ConcurrentHashMap.newKeySet() + private val listenerStates = ConcurrentHashMap() + + fun registerListener(listenerId: String) { + trackedListeners.add(listenerId) + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) } + kafkaHealthMetrics.registerRuntimeListener(listenerId) + } + + fun onRecordProcessed(listenerId: String) { + if (!trackedListeners.contains(listenerId)) { + return + } + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + + @EventListener + fun onConsumerStarted(event: ConsumerStartedEvent) { + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + } + + @EventListener + fun onListenerContainerIdle(event: ListenerContainerIdleEvent) { + event.listenerId.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + } + + @EventListener + fun onNonResponsiveConsumer(event: NonResponsiveConsumerEvent) { + event.listenerId.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent( + listenerId, + ) { ListenerRuntimeState(now()) } + .markProblem("NON_RESPONSIVE", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "NON_RESPONSIVE") + } + } + + @EventListener + fun onConsumerFailedToStart(event: ConsumerFailedToStartEvent) { + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent( + listenerId, + ) { ListenerRuntimeState(now()) } + .markProblem("FAILED_TO_START", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "FAILED_TO_START") + } + } + + @EventListener + fun onConsumerStopped(event: ConsumerStoppedEvent) { + if (event.reason == ConsumerStoppedEvent.Reason.NORMAL) { + return + } + + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent(listenerId) { ListenerRuntimeState(now()) } + .markProblem("STOPPED_${event.reason.name}", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "STOPPED_${event.reason.name}") + } + } + + override fun health(): Health { + val now = now() + val unhealthyListeners = + trackedListeners + .mapNotNull { listenerId -> + listenerStates[listenerId] + ?.takeIf { it.isUnhealthy(now, kafkaHealthProperties.runtimeGracePeriod.toMillis()) } + ?.let { listenerId to it.snapshot(now) } + }.toMap() + + val builder = if (unhealthyListeners.isEmpty()) Health.up() else Health.down() + + return builder + .withDetail("trackedListeners", trackedListeners.size) + .withDetail("runtimeGracePeriodMs", kafkaHealthProperties.runtimeGracePeriod.toMillis()) + .withDetail("unhealthyListeners", unhealthyListeners) + .build() + } + + private fun listenerIdOf(event: KafkaEvent): String? { + return runCatching { + event.getContainer(MessageListenerContainer::class.java).listenerId + }.getOrNull() + } + + private fun now(): Long = clock.millis() +} + +private class ListenerRuntimeState( + initialHealthyAt: Long, +) { + private val lastHealthyAt = AtomicLong(initialHealthyAt) + private val problemSince = AtomicLong(0L) + private val problem = AtomicReference(null) + + fun markHealthy(now: Long) { + lastHealthyAt.set(now) + problemSince.set(0L) + problem.set(null) + } + + fun markProblem( + reason: String, + now: Long, + ) { + problem.compareAndSet(null, reason) + problemSince.compareAndSet(0L, now) + } + + fun isUnhealthy( + now: Long, + gracePeriodMs: Long, + ): Boolean = + problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it >= gracePeriodMs } + ?: false + + fun snapshot(now: Long): Map = + mapOf( + "problem" to problem.get(), + "problemDurationMs" to (now - problemSince.get()), + "lastHealthyAtMs" to lastHealthyAt.get(), + ) +} diff --git a/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt b/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt new file mode 100644 index 00000000..8aff7fa3 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt @@ -0,0 +1,10 @@ +package no.fintlabs.consumer.health + +data class ListenerBootstrapStatus( + val listenerId: String, + val assignmentSeen: Boolean, + val completed: Boolean, + val assignedPartitions: Int, + val caughtUpPartitions: Int, + val partitions: List, +) diff --git a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt index a4125075..dab154db 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt @@ -1,6 +1,10 @@ package no.fintlabs.consumer.kafka.entity import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.stringValue @@ -21,18 +25,24 @@ class EntityConsumer( private val entityProcessingService: EntityProcessingService, private val consumerConfig: ConsumerConfiguration, private val resourceConverter: ResourceConverter, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(EntityConsumer::class.java) private const val CONSUMER_NAME = "entity" } - @Bean + @Bean(name = [KafkaListenerIds.ENTITY]) fun resourceEntityConsumerFactory( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.ENTITY) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.ENTITY) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( Any::class.java, this::consumeRecord, @@ -41,14 +51,18 @@ class EntityConsumer( .groupIdApplicationDefault() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, assignments.keys) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.ENTITY, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EntityTopicNameParameters .builder() @@ -61,10 +75,15 @@ class EntityConsumer( ).resourceName("${consumerConfig.domain}-${consumerConfig.packageName}") .build(), ).apply { concurrency = consumerConfig.kafka.entityConcurrency } + } fun consumeRecord(consumerRecord: ConsumerRecord) = createEntityConsumerRecord(consumerRecord) - .let { entityProcessingService.processEntityConsumerRecord(it) } + .also { entityConsumerRecord -> + entityProcessingService.processEntityConsumerRecord(entityConsumerRecord) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.ENTITY, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.ENTITY) + } private fun createEntityConsumerRecord(consumerRecord: ConsumerRecord) = consumerRecord.getResourceName().let { resourceName -> diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt index 7082ea03..f8b9029f 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.ResponseFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory @@ -19,13 +22,17 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class EventResponseConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { - @Bean + @Bean(name = [KafkaListenerIds.RESPONSE_EVENT]) fun responseFintEventContainerListener( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.RESPONSE_EVENT) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( ResponseFintEvent::class.java, this::consumeRecord, @@ -42,6 +49,7 @@ class EventResponseConsumer( CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EventTopicNameParameters .builder() @@ -54,10 +62,12 @@ class EventResponseConsumer( ).eventName("${consumerConfig.domain}-${consumerConfig.packageName}-response") .build(), ).apply { concurrency = consumerConfig.kafka.responseConcurrency } + } private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Response: {}", consumerRecord.value()) eventStatusCache.trackResponse(consumerRecord.value().corrId, consumerRecord.value()) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RESPONSE_EVENT) } companion object { diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt index 5244c157..5bfb758c 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.RequestFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory @@ -19,18 +22,22 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class RequestFintEventConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(RequestFintEventConsumer::class.java) private const val CONSUMER_NAME = "request-fint-event" } - @Bean + @Bean(name = [KafkaListenerIds.REQUEST_EVENT]) fun requestFintEventRequestListenerContainer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.REQUEST_EVENT) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( RequestFintEvent::class.java, this::consumeRecord, @@ -47,6 +54,7 @@ class RequestFintEventConsumer( CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EventTopicNameParameters .builder() @@ -59,9 +67,11 @@ class RequestFintEventConsumer( ).eventName("${consumerConfig.domain}-${consumerConfig.packageName}-request") .build(), ).apply { concurrency = consumerConfig.kafka.requestConcurrency } + } private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Request: {}", consumerRecord.key()) eventStatusCache.trackRequest(consumerRecord.value()) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.REQUEST_EVENT) } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 1ab24193..306f36ea 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -5,8 +5,12 @@ fint: exposed-endpoints: - /actuator/prometheus - /actuator/health + - /actuator/health/readiness + - /actuator/health/liveness + relation: base-url: https://api.felleskomponent.no + consumer: pod-url: http://fint-core-consumer-${fint.consumer.domain}-${fint.consumer.package}:8080 base-url: ${fint.relation.base-url} @@ -15,6 +19,12 @@ fint: packageName: ${fint.consumer.package} org-id: ${fint.org-id} coreVersionHeader: 2 + health: + kafka: + idle-event-interval: 1m + runtime-grace-period: 15m + monitor-interval-seconds: 30 + no-poll-threshold: 3.0 novari: kafka: @@ -34,6 +44,16 @@ spring: base-path: ${fint.consumer.domain}/${fint.consumer.package} management: + endpoint: + health: + probes: + enabled: true + group: + readiness: + include: readinessState,initialKafkaBootstrap + liveness: + include: livenessState,kafkaRuntime + endpoints: web: exposure: diff --git a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt new file mode 100644 index 00000000..0c50fff9 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt @@ -0,0 +1,29 @@ +package no.fintlabs.consumer.health + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.springframework.boot.actuate.health.Status + +class InitialKafkaBootstrapHealthIndicatorTest { + private val tracker: InitialKafkaBootstrapTracker = mockk() + + @Test + fun `should report out of service while bootstrap is incomplete`() { + every { tracker.snapshot() } returns BootstrapReadinessSnapshot(false, emptyList()) + + val health = InitialKafkaBootstrapHealthIndicator(tracker).health() + + assertEquals(Status.OUT_OF_SERVICE, health.status) + } + + @Test + fun `should report up when bootstrap is complete`() { + every { tracker.snapshot() } returns BootstrapReadinessSnapshot(true, emptyList()) + + val health = InitialKafkaBootstrapHealthIndicator(tracker).health() + + assertEquals(Status.UP, health.status) + } +} diff --git a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt new file mode 100644 index 00000000..28d0b3b8 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt @@ -0,0 +1,191 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.mockk.every +import io.mockk.mockk +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.context.ApplicationContext +import java.time.Duration +import java.time.Instant + +class InitialKafkaBootstrapTrackerTest { + private val endOffsetProvider: EndOffsetProvider = mockk() + private val applicationContext: ApplicationContext = mockk(relaxed = true) + private val meterRegistry = SimpleMeterRegistry() + private val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + private val kafkaHealthMetrics = KafkaHealthMetrics(meterRegistry, clock, KafkaHealthProperties()) + + private lateinit var tracker: InitialKafkaBootstrapTracker + + @BeforeEach + fun setUp() { + tracker = InitialKafkaBootstrapTracker(endOffsetProvider, applicationContext, kafkaHealthMetrics) + tracker.registerBlockingListener(KafkaListenerIds.ENTITY) + } + + @Test + fun `should stay unready until all assigned partitions catch up`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0, partition1)) } returns + mapOf(partition0 to 2L, partition1 to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 1, 0L, "key", "value")) + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 1L, "key", "value")) + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should become ready immediately when assigned partitions are empty at startup offset`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns + mapOf(partition0 to 0L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should ignore new assignments after initial bootstrap has completed`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns mapOf(partition0 to 1L) + every { endOffsetProvider.latestOffsets(setOf(partition1)) } returns mapOf(partition1 to 2L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + + assertTrue(tracker.snapshot().ready) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition1)) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should wait for both entity and relation update listeners`() { + val entityPartition = TopicPartition("entity-topic", 0) + val relationPartition = TopicPartition("relation-topic", 0) + + tracker.registerBlockingListener(KafkaListenerIds.RELATION_UPDATE) + + every { endOffsetProvider.latestOffsets(setOf(entityPartition)) } returns mapOf(entityPartition to 1L) + every { endOffsetProvider.latestOffsets(setOf(relationPartition)) } returns mapOf(relationPartition to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(entityPartition)) + tracker.onPartitionsAssigned(KafkaListenerIds.RELATION_UPDATE, setOf(relationPartition)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("entity-topic", 0, 0L, "key", "value")) + + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed( + KafkaListenerIds.RELATION_UPDATE, + ConsumerRecord("relation-topic", 0, 0L, "key", "value"), + ) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should publish bootstrap metrics for completion and pending partitions`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0, partition1)) } returns + mapOf(partition0 to 2L, partition1 to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + + assertEquals( + 2.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.partitions.pending") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.state") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + + clock.advance(Duration.ofSeconds(2)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 1, 0L, "key", "value")) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 1L, "key", "value")) + + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.partitions.pending") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.state") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.completed") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.completed") + .tag("listener", "all") + .counter() + .count(), + ) + } + + @Test + fun `should count end offset lookup failures`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } throws RuntimeException("boom") + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.end_offset.lookup.failures") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + } +} diff --git a/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt b/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt new file mode 100644 index 00000000..4b6bd637 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt @@ -0,0 +1,141 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.mockk.mockk +import org.apache.kafka.clients.consumer.Consumer +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.springframework.boot.actuate.health.Status +import org.springframework.kafka.event.NonResponsiveConsumerEvent +import java.time.Duration +import java.time.Instant + +class KafkaRuntimeHealthMonitorTest { + @Test + fun `should stay up during grace period for non responsive consumer`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + clock.advance(Duration.ofMinutes(14)) + + assertEquals(Status.UP, monitor.health().status) + } + + @Test + fun `should go down after grace period for non responsive consumer`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + clock.advance(Duration.ofMinutes(16)) + + assertEquals(Status.DOWN, monitor.health().status) + } + + @Test + fun `should recover after healthy activity resumes`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + clock.advance(Duration.ofMinutes(5)) + + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + clock.advance(Duration.ofMinutes(20)) + + assertEquals(Status.UP, monitor.health().status) + } + + @Test + fun `should publish runtime metrics for problem and unhealthy state`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val meterRegistry = SimpleMeterRegistry() + val properties = KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)) + val monitor = KafkaRuntimeHealthMonitor(properties, clock, KafkaHealthMetrics(meterRegistry, clock, properties)) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.runtime.problem") + .tag("listener", KafkaListenerIds.ENTITY) + .tag("reason", "NON_RESPONSIVE") + .counter() + .count(), + ) + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.runtime.unhealthy") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + + clock.advance(Duration.ofMinutes(16)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.runtime.unhealthy") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + Duration.ofMinutes(16).toMillis().toDouble(), + meterRegistry + .get("fint.consumer.kafka.runtime.problem.duration") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + } +} + +private fun nonResponsiveConsumerEvent(listenerId: String): NonResponsiveConsumerEvent = + NonResponsiveConsumerEvent( + Any(), + Any(), + 1_000L, + listenerId, + emptyList(), + mockk>(relaxed = true), + ) diff --git a/src/test/java/no/fintlabs/consumer/health/MutableClock.kt b/src/test/java/no/fintlabs/consumer/health/MutableClock.kt new file mode 100644 index 00000000..7dd866f5 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/MutableClock.kt @@ -0,0 +1,21 @@ +package no.fintlabs.consumer.health + +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.time.ZoneId +import java.time.ZoneOffset + +class MutableClock( + private var instant: Instant, +) : Clock() { + override fun getZone(): ZoneId = ZoneOffset.UTC + + override fun withZone(zone: ZoneId): Clock = this + + override fun instant(): Instant = instant + + fun advance(duration: Duration) { + instant = instant.plus(duration) + } +} diff --git a/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt index c54442c2..b335dfa0 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt @@ -8,6 +8,9 @@ import no.fintlabs.autorelation.kafka.RelationUpdateConsumer import no.fintlabs.autorelation.model.RelationUpdate import no.fintlabs.autorelation.model.createEntityDescriptor import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaThroughputMetrics import org.apache.kafka.clients.consumer.ConsumerRecord import org.junit.jupiter.api.BeforeEach @@ -20,6 +23,9 @@ class RelationUpdateConsumerTest { private lateinit var consumerRecord: ConsumerRecord private lateinit var relationUpdate: RelationUpdate private lateinit var kafkaThroughputMetrics: KafkaThroughputMetrics + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer @BeforeEach fun setUp() { @@ -32,7 +38,18 @@ class RelationUpdateConsumerTest { } kafkaThroughputMetrics = mockk(relaxed = true) - relationUpdateConsumer = RelationUpdateConsumer(autoRelationService, consumerConfig, kafkaThroughputMetrics) + initialKafkaBootstrapTracker = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) + relationUpdateConsumer = + RelationUpdateConsumer( + autoRelationService, + consumerConfig, + kafkaThroughputMetrics, + initialKafkaBootstrapTracker, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test