From 516422efa90bd69967e1411942e054636e71c9c8 Mon Sep 17 00:00:00 2001 From: Jan Ove Kongshaug Date: Mon, 23 Mar 2026 09:55:41 +0100 Subject: [PATCH] feat: Kafka bootstrap and runtime health probes Introduce Kafka-aware readiness and liveness health handling for the consumer. Readiness is now based on initial Kafka bootstrap instead of a fixed startup delay. The application stays unready until the blocking listeners have consumed up to their startup end offsets, and then remains ready for the rest of the pod lifetime. This includes both the entity listener and the relation-update listener. Liveness is now separated from bootstrap and tracks Kafka runtime health for registered listeners. It reacts to Spring Kafka runtime events such as non-responsive consumers, failed starts and stopped consumers, while using a grace period to avoid false positives from short interruptions. Normal lag and quiet topics do not make the pod unhealthy. Also add Micrometer metrics for bootstrap progress and runtime Kafka health, including bootstrap duration, pending partitions, runtime problem counters and unhealthy state gauges. Update actuator health group configuration and add documentation for the new startup/readiness/liveness model, Kafka-specific health behavior, metrics and Kubernetes probe configuration. --- docs/kafka-health-checks.md | 268 ++++++++++++++++++ .../kafka/AutoRelationEntityConsumer.kt | 16 +- .../kafka/RelationUpdateConsumer.kt | 37 ++- .../health/BootstrapPartitionStatus.kt | 8 + .../health/BootstrapReadinessSnapshot.kt | 6 + .../consumer/health/EndOffsetProvider.kt | 42 +++ .../InitialKafkaBootstrapHealthIndicator.kt | 31 ++ .../health/InitialKafkaBootstrapTracker.kt | 193 +++++++++++++ .../health/KafkaHealthConfiguration.kt | 11 + .../consumer/health/KafkaHealthMetrics.kt | 190 +++++++++++++ .../consumer/health/KafkaHealthProperties.kt | 12 + .../KafkaListenerContainerHealthConfigurer.kt | 15 + .../consumer/health/KafkaListenerIds.kt | 9 + .../health/KafkaRuntimeHealthMonitor.kt | 161 +++++++++++ .../health/ListenerBootstrapStatus.kt | 10 + .../consumer/kafka/entity/EntityConsumer.kt | 31 +- .../kafka/event/EventResponseConsumer.kt | 16 +- .../kafka/event/RequestFintEventConsumer.kt | 16 +- src/main/resources/application.yaml | 20 ++ ...nitialKafkaBootstrapHealthIndicatorTest.kt | 29 ++ .../InitialKafkaBootstrapTrackerTest.kt | 191 +++++++++++++ .../health/KafkaRuntimeHealthMonitorTest.kt | 141 +++++++++ .../fintlabs/consumer/health/MutableClock.kt | 21 ++ .../entity/RelationUpdateConsumerTest.kt | 19 +- 24 files changed, 1471 insertions(+), 22 deletions(-) create mode 100644 docs/kafka-health-checks.md create mode 100644 src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt create mode 100644 src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt create mode 100644 src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt create mode 100644 src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt create mode 100644 src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt create mode 100644 src/test/java/no/fintlabs/consumer/health/MutableClock.kt diff --git a/docs/kafka-health-checks.md b/docs/kafka-health-checks.md new file mode 100644 index 00000000..a0379a8f --- /dev/null +++ b/docs/kafka-health-checks.md @@ -0,0 +1,268 @@ +# Kafka Health Checks + +Dette dokumentet beskriver hvordan health-checkene i `fint-core-consumer` fungerer etter innføringen av Kafka-basert readiness og liveness. + +## Oversikt + +Applikasjonen bruker tre forskjellige typer health/probes i Kubernetes: + +- `startupProbe`: brukes bare helt tidlig for å verifisere at JVM og Spring Boot faktisk starter. +- `readinessProbe`: brukes for å avgjøre om poden trygt kan motta trafikk. +- `livenessProbe`: brukes for å avgjøre om poden fortsatt lever, eller om Kubernetes skal restarte den. + +Disse probe-typene har forskjellig ansvar og skal ikke blandes: + +- `startupProbe` skal ikke vite noe om hvor langt Kafka-consumerne har kommet i bootstrap. +- `readinessProbe` skal blokkere trafikk til initial bootstrap er ferdig. +- `livenessProbe` skal ikke feile bare fordi applikasjonen ligger litt etter i konsumering; den skal feile hvis Kafka-consumerne i praksis har sluttet å fungere over tid. + +## Hvordan Consumer bruker dem + +Consumer eksponerer actuator-endepunktene: + +- `/actuator/health/readiness` +- `/actuator/health/liveness` + +Readiness er koblet til en initial Kafka-bootstrap-tracker. Liveness er koblet til en separat Kafka-runtime-monitor. + +## Readiness + +### Hensikt + +Readiness skal beskytte trafikk mot en pod som ennå ikke har bygd opp lokal cache ved oppstart. + +### Hvordan den virker + +Ved oppstart settes readiness til `REFUSING_TRAFFIC`. + +To listeners er definert som blokkerende for initial bootstrap: + +- `entity` +- `relation-update` + +For hver assigned partition hentes "startup end offset" fra Kafka i det assignment skjer. Deretter spores prosesserte offsets mens records behandles. + +Listeneren regnes som ferdig når alle dens assigned partitions har konsumert seg opp til offseten som gjaldt ved oppstartstidspunktet. + +Applikasjonen regnes som `ready` når alle blokkerende listeners er ferdige. + +### Viktig nyanse + +Dette er en `initial-only` readiness. + +Det betyr: + +- Readiness blokkerer ved oppstart. +- Readiness går til `UP` når initial bootstrap er ferdig. +- Readiness går ikke ned igjen senere bare fordi det kommer flere meldinger, full sync, eller midlertidig lag. + +Dette er bevisst. Etter at poden er sluppet i trafikk, skal vanlig Kafka-lag ikke stoppe lesetrafikk. + +### Hva får readiness til å feile + +Readiness blir `OUT_OF_SERVICE` hvis minst ett av disse forholdene gjelder under oppstart: + +- `entity` har ikke konsumert alle sine assigned partitions opp til startup-end-offset. +- `relation-update` har ikke konsumert alle sine assigned partitions opp til startup-end-offset. +- Kafka end-offset kan ikke hentes for assigned partitions. + +### Hva får ikke readiness til å feile + +Følgende forhold feiler ikke readiness etter at bootstrap er ferdig: + +- En ny full sync kommer inn og skaper lag. +- Det produseres mange meldinger mens appen kjører. +- Consumeren ligger midlertidig etter på topicet. +- Topicet er stille i lang tid. + +## Liveness + +### Hensikt + +Liveness skal oppdage at Kafka-consumerne i praksis har sluttet å fungere, og gi Kubernetes grunnlag for å restarte poden. + +### Hvordan den virker + +Liveness monitorerer runtime-status for registrerte Kafka-listeners. + +Den ser ikke på vanlig lag eller antall uprosesserte meldinger. I stedet ser den på Kafka-runtime-signaler: + +- `ConsumerStartedEvent` +- `ListenerContainerIdleEvent` +- `NonResponsiveConsumerEvent` +- `ConsumerFailedToStartEvent` +- `ConsumerStoppedEvent` + +Det brukes en grace-periode, default `15m`, for å unngå falske positive ved korte forstyrrelser. + +### Hva som holder liveness frisk + +Liveness holdes `UP` hvis en listener viser tegn til normal drift, for eksempel: + +- appen prosesserer records +- listeneren sender idle-events fordi topicet er stille +- consumer-containeren starter normalt + +Det betyr at stille topics ikke i seg selv gjør poden unhealthy. + +### Hva får liveness til å feile + +Liveness blir `DOWN` hvis en registrert listener har en runtime-feil som varer lenger enn konfigurert grace-periode. + +Eksempler: + +- `NonResponsiveConsumerEvent` og tilstanden varer lenger enn grace-perioden +- `ConsumerFailedToStartEvent` +- `ConsumerStoppedEvent` med annen grunn enn `NORMAL` + +Typiske scenarioer dette er ment å fange: + +- nettverksbrudd mellom pod og Kafka +- Kafka svarer ikke over tid +- autentiseringsfeil mot Kafka +- consumer-container stopper på grunn av feil + +### Hva får ikke liveness til å feile + +Følgende forhold skal ikke alene feile liveness: + +- vanlig Kafka-lag +- full sync som gjør at consumeren henger litt etter +- ingen nye meldinger på topicet i flere timer +- normal rebalance mellom pods + +En vanlig rebalance håndteres av partition assignment/revocation, ikke som en fatal liveness-feil. + +## Startup Probe + +### Hensikt + +`startupProbe` bør bare brukes som en enkel oppstartssperre mens JVM og Spring Boot kommer opp. + +Den bør ikke inneholde Kafka-bootstrap-logikk. Grunnen er at `startupProbe` bare brukes i den tidlige oppstartsfasen, mens readiness kan uttrykke "ikke klar enda" på en mer presis måte. + +### Anbefaling + +Bruk `startupProbe` mot en enkel actuator-health, mens readiness og liveness peker mot de dedikerte probe-endepunktene. + +## Konfigurasjon i Consumer + +Default konfigurasjon i applikasjonen: + +```yaml +fint: + consumer: + health: + kafka: + idle-event-interval: 1m + runtime-grace-period: 15m + monitor-interval-seconds: 30 + no-poll-threshold: 3.0 + +management: + endpoint: + health: + probes: + enabled: true + group: + readiness: + include: readinessState,initialKafkaBootstrap + liveness: + include: livenessState,kafkaRuntime +``` + +### Hva disse Kafka-innstillingene betyr + +- `idle-event-interval`: hvor ofte idle-event sendes mens et topic er stille. +- `runtime-grace-period`: hvor lenge runtime-feil kan vare før liveness går ned. +- `monitor-interval-seconds`: hvor ofte Spring Kafka sjekker consumerens poll-aktivitet. +- `no-poll-threshold`: terskel for når manglende poll anses som "non-responsive". + +## Metrikker + +I tillegg til actuator-health eksponerer applikasjonen nå Micrometer-metrikker for Kafka-bootstrap og Kafka-runtime-health. Disse er nyttige fordi health-endepunktene bare viser nåværende status, mens metrikker gjør det mulig å følge utvikling over tid i Prometheus og Grafana. + +### Bootstrap-metrikker + +- `fint.consumer.kafka.bootstrap.state` + Gauge per listener. `1` betyr at initial bootstrap fortsatt pågår, `0` betyr at den er ferdig. + +- `fint.consumer.kafka.bootstrap.partitions.pending` + Gauge per listener. Antall assigned partitions som ennå ikke har konsumert seg opp til startup-end-offset. + +- `fint.consumer.kafka.bootstrap.completed` + Counter per listener, og også for `listener=all`. Incrementes når bootstrap fullføres. + +- `fint.consumer.kafka.bootstrap.duration` + Timer per listener, og også for `listener=all`. Måler hvor lang tid initial bootstrap faktisk tok. + +- `fint.consumer.kafka.bootstrap.end_offset.lookup.failures` + Counter per listener. Incrementes når applikasjonen ikke klarer å hente startup end offset fra Kafka. + +### Runtime-metrikker + +- `fint.consumer.kafka.runtime.problem` + Counter med tags `listener` og `reason`. Incrementes når runtime-monitoren ser et problem, for eksempel `NON_RESPONSIVE` eller `STOPPED_AUTH`. + +- `fint.consumer.kafka.runtime.unhealthy` + Gauge per listener. `1` betyr at listeneren har vært i problemtilstand lenger enn grace-perioden og dermed gjør liveness `DOWN`. + +- `fint.consumer.kafka.runtime.problem.duration` + Gauge per listener. Viser hvor lenge den nåværende problemtilstanden har vart, i millisekunder. + +### Viktige tags + +Metrikkene er bevisst tagget lavt-kardinalt: + +- `listener` +- `reason` + +Det brukes ikke tags som Kafka-key, partition eller corrId, for å unngå høy kardinalitet og unødvendig støy i metrics-backend. + +## Eksempel i Kubernetes + +Eksempel på probe-oppsett: + +```yaml +startupProbe: + httpGet: + path: /utdanning/vurdering/actuator/health + port: 8080 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 60 + +readinessProbe: + httpGet: + path: /utdanning/vurdering/actuator/health/readiness + port: 8080 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + +livenessProbe: + httpGet: + path: /utdanning/vurdering/actuator/health/liveness + port: 8080 + periodSeconds: 30 + timeoutSeconds: 3 + failureThreshold: 3 +``` + +## Praktiske konsekvenser + +Med dette oppsettet blir flyten typisk slik: + +1. Poden starter. +2. `startupProbe` verifiserer at appen faktisk kommer opp. +3. `readinessProbe` holder poden ute av trafikk mens `entity` og `relation-update` bygger initial cache. +4. Når initial bootstrap er ferdig, blir poden `Ready`. +5. Senere full sync eller vanlig Kafka-lag påvirker ikke readiness. +6. Hvis Kafka-consumerne blir ikke-responsive eller stopper over tid, blir `liveness` `DOWN` og Kubernetes kan restarte poden. + +## Oppsummering + +- `startupProbe` beskytter bare oppstart av prosessen. +- `readinessProbe` beskytter trafikk under initial Kafka-bootstrap. +- `livenessProbe` beskytter mot vedvarende Kafka-runtime-feil. +- Vanlig lag eller stille topics skal ikke gjøre poden unhealthy. diff --git a/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt b/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt index a1c2e15f..9efb31a2 100644 --- a/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt +++ b/src/main/java/no/fintlabs/autorelation/kafka/AutoRelationEntityConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.autorelation.kafka import no.fintlabs.autorelation.RelationEventService import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.entity.extractIdentifier @@ -21,18 +24,22 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class AutoRelationEntityConsumer( private val consumerConfig: ConsumerConfiguration, private val relationEventService: RelationEventService, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(AutoRelationEntityConsumer::class.java) private const val CONSUMER_NAME = "autorelation-entity" } - @Bean + @Bean(name = [KafkaListenerIds.AUTORELATION_ENTITY]) fun buildAutoRelationConsumer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.AUTORELATION_ENTITY) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( Any::class.java, this::consumeRecord, @@ -54,6 +61,7 @@ class AutoRelationEntityConsumer( CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EntityTopicNameParameters .builder() @@ -66,6 +74,7 @@ class AutoRelationEntityConsumer( ).resourceName("${consumerConfig.domain}-${consumerConfig.packageName}") .build(), ).apply { concurrency = consumerConfig.kafka.entityConcurrency } + } fun consumeRecord(consumerRecord: ConsumerRecord) { consumerRecord @@ -77,6 +86,7 @@ class AutoRelationEntityConsumer( resource, ) } + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.AUTORELATION_ENTITY) } private fun ConsumerRecord.getResourceName(): String = diff --git a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt index 886d4b9c..95624519 100644 --- a/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt +++ b/src/main/java/no/fintlabs/autorelation/kafka/RelationUpdateConsumer.kt @@ -3,6 +3,10 @@ package no.fintlabs.autorelation.kafka import no.fintlabs.autorelation.AutoRelationService import no.fintlabs.autorelation.model.RelationUpdate import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.KafkaThroughputMetrics import no.novari.kafka.consuming.ErrorHandlerFactory @@ -23,13 +27,16 @@ class RelationUpdateConsumer( private val autoRelationService: AutoRelationService, private val consumerConfig: ConsumerConfiguration, private val kafkaThroughputMetrics: KafkaThroughputMetrics, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(RelationUpdateConsumer::class.java) private const val CONSUMER_NAME = "relation-update" } - @Bean + @Bean(name = [KafkaListenerIds.RELATION_UPDATE]) @ConditionalOnProperty( name = ["fint.consumer.autorelation"], havingValue = "true", @@ -38,8 +45,11 @@ class RelationUpdateConsumer( fun relationUpdateConsumerContainer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.RELATION_UPDATE) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.RELATION_UPDATE) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( RelationUpdate::class.java, this::consumeRecord, @@ -48,14 +58,21 @@ class RelationUpdateConsumer( .groupIdApplicationDefault() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned( + KafkaListenerIds.RELATION_UPDATE, + assignments.keys, + ) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.RELATION_UPDATE, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EntityTopicNamePatternParameters .builder() @@ -68,7 +85,8 @@ class RelationUpdateConsumer( // Makes sure we listen to component patterns such as utdanning-vurdering'-relation-update' ).resource(TopicNamePatternParameterPattern.endingWith("-relation-update")) .build(), - ).apply { consumerConfig.kafka.relationConcurrency } + ).apply { concurrency = consumerConfig.kafka.relationConcurrency } + } fun consumeRecord(consumerRecord: ConsumerRecord) { val startedAt = System.nanoTime() @@ -76,6 +94,8 @@ class RelationUpdateConsumer( if (relationUpdate == null) { kafkaThroughputMetrics.recordRelationUpdateConsumer(null, "ignored_null", System.nanoTime() - startedAt) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) return } @@ -85,6 +105,8 @@ class RelationUpdateConsumer( "ignored_foreign_component", System.nanoTime() - startedAt, ) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) return } @@ -95,12 +117,15 @@ class RelationUpdateConsumer( "processed", System.nanoTime() - startedAt, ) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) } catch (ex: Exception) { kafkaThroughputMetrics.recordRelationUpdateConsumer( relationUpdate.targetEntity.resourceName, "failed", System.nanoTime() - startedAt, ) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RELATION_UPDATE) throw ex } } diff --git a/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt new file mode 100644 index 00000000..c283e012 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/BootstrapPartitionStatus.kt @@ -0,0 +1,8 @@ +package no.fintlabs.consumer.health + +data class BootstrapPartitionStatus( + val partition: String, + val endOffset: Long, + val processedOffset: Long?, + val caughtUp: Boolean, +) diff --git a/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt b/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt new file mode 100644 index 00000000..1b352953 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/BootstrapReadinessSnapshot.kt @@ -0,0 +1,6 @@ +package no.fintlabs.consumer.health + +data class BootstrapReadinessSnapshot( + val ready: Boolean, + val blockingListeners: List, +) diff --git a/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt new file mode 100644 index 00000000..88268de1 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/EndOffsetProvider.kt @@ -0,0 +1,42 @@ +package no.fintlabs.consumer.health + +import jakarta.annotation.PreDestroy +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.OffsetSpec +import org.apache.kafka.common.TopicPartition +import org.springframework.boot.autoconfigure.kafka.KafkaProperties +import org.springframework.stereotype.Component +import java.time.Duration +import java.util.concurrent.TimeUnit + +interface EndOffsetProvider { + fun latestOffsets(partitions: Set): Map +} + +@Component +class KafkaAdminEndOffsetProvider( + kafkaProperties: KafkaProperties, +) : EndOffsetProvider { + private val adminClient = AdminClient.create(kafkaProperties.buildAdminProperties(null)) + + override fun latestOffsets(partitions: Set): Map { + if (partitions.isEmpty()) { + return emptyMap() + } + + return adminClient + .listOffsets(partitions.associateWith { OffsetSpec.latest() }) + .all() + .get(TIMEOUT.toSeconds(), TimeUnit.SECONDS) + .mapValues { (_, result) -> result.offset() } + } + + @PreDestroy + fun close() { + adminClient.close(TIMEOUT) + } + + companion object { + private val TIMEOUT = Duration.ofSeconds(10) + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt new file mode 100644 index 00000000..fbe89c6c --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicator.kt @@ -0,0 +1,31 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.actuate.health.Health +import org.springframework.boot.actuate.health.HealthIndicator +import org.springframework.stereotype.Component + +@Component("initialKafkaBootstrap") +class InitialKafkaBootstrapHealthIndicator( + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, +) : HealthIndicator { + override fun health(): Health { + val snapshot = initialKafkaBootstrapTracker.snapshot() + val builder = if (snapshot.ready) Health.up() else Health.outOfService() + + return builder + .withDetail("ready", snapshot.ready) + .withDetail("blockingListeners", snapshot.blockingListeners.size) + .withDetail( + "listeners", + snapshot.blockingListeners.associate { listener -> + listener.listenerId to + mapOf( + "assignmentSeen" to listener.assignmentSeen, + "completed" to listener.completed, + "assignedPartitions" to listener.assignedPartitions, + "caughtUpPartitions" to listener.caughtUpPartitions, + ) + }, + ).build() + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt new file mode 100644 index 00000000..2756b390 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTracker.kt @@ -0,0 +1,193 @@ +package no.fintlabs.consumer.health + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.slf4j.LoggerFactory +import org.springframework.boot.availability.AvailabilityChangeEvent +import org.springframework.boot.availability.ReadinessState +import org.springframework.context.ApplicationContext +import org.springframework.stereotype.Service +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlin.math.max + +@Service +class InitialKafkaBootstrapTracker( + private val endOffsetProvider: EndOffsetProvider, + private val applicationContext: ApplicationContext, + private val kafkaHealthMetrics: KafkaHealthMetrics, +) { + private val readinessPublished = AtomicReference(null) + private val bootstrapCompleted = AtomicBoolean(false) + private val blockingListeners = ConcurrentHashMap() + + init { + publishReadiness(false) + } + + fun registerBlockingListener(listenerId: String) { + blockingListeners.computeIfAbsent(listenerId) { ListenerBootstrapState() } + kafkaHealthMetrics.registerBootstrapListener(listenerId) + } + + fun onPartitionsAssigned( + listenerId: String, + assignments: Set, + ) { + if (bootstrapCompleted.get() || assignments.isEmpty()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + val endOffsets = + try { + endOffsetProvider.latestOffsets(assignments) + } catch (exception: RuntimeException) { + logger.error( + "Failed to fetch end offsets for listener={} assignments={}", + listenerId, + assignments, + exception, + ) + kafkaHealthMetrics.recordBootstrapEndOffsetLookupFailure(listenerId) + publishReadiness(false) + return + } + + listenerState.assignmentSeen.set(true) + assignments.forEach { topicPartition -> + listenerState.partitions[topicPartition] = PartitionBootstrapState(endOffsets[topicPartition] ?: 0L) + } + + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun onPartitionsRevoked( + listenerId: String, + partitions: Collection, + ) { + if (bootstrapCompleted.get()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + partitions.forEach(listenerState.partitions::remove) + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun onRecordProcessed( + listenerId: String, + record: ConsumerRecord<*, *>, + ) { + if (bootstrapCompleted.get()) { + return + } + + val listenerState = blockingListeners[listenerId] ?: return + val topicPartition = TopicPartition(record.topic(), record.partition()) + listenerState.partitions.computeIfPresent(topicPartition) { _, state -> + state.withOffset(record.offset()) + } + maybeCompleteListener(listenerId, listenerState) + maybeCompleteBootstrap() + kafkaHealthMetrics.updateBootstrapPendingPartitions(listenerId, pendingPartitions(listenerState)) + } + + fun snapshot(): BootstrapReadinessSnapshot { + val listenerStatuses = + blockingListeners.toSortedMap().map { (listenerId, listenerState) -> + ListenerBootstrapStatus( + listenerId = listenerId, + assignmentSeen = listenerState.assignmentSeen.get(), + completed = listenerState.completed.get(), + assignedPartitions = listenerState.partitions.size, + caughtUpPartitions = listenerState.partitions.values.count(PartitionBootstrapState::caughtUp), + partitions = + listenerState.partitions + .toSortedMap(compareBy({ it.topic() }, { it.partition() })) + .map { (topicPartition, partitionState) -> + BootstrapPartitionStatus( + partition = "${topicPartition.topic()}-${topicPartition.partition()}", + endOffset = partitionState.endOffset, + processedOffset = partitionState.processedOffset, + caughtUp = partitionState.caughtUp, + ) + }, + ) + } + + return BootstrapReadinessSnapshot( + ready = bootstrapCompleted.get(), + blockingListeners = listenerStatuses, + ) + } + + private fun maybeCompleteListener( + listenerId: String, + listenerState: ListenerBootstrapState, + ) { + if ( + !listenerState.completed.get() && + listenerState.assignmentSeen.get() && + listenerState.partitions.values.all(PartitionBootstrapState::caughtUp) + ) { + listenerState.completed.set(true) + kafkaHealthMetrics.markBootstrapCompleted(listenerId) + logger.info("Initial Kafka bootstrap completed for listener={}", listenerId) + } + } + + private fun maybeCompleteBootstrap() { + if ( + !bootstrapCompleted.get() && + blockingListeners.isNotEmpty() && + blockingListeners.values.all { it.completed.get() } + ) { + bootstrapCompleted.set(true) + kafkaHealthMetrics.markBootstrapAllCompleted() + publishReadiness(true) + logger.info("Initial Kafka bootstrap completed for all blocking listeners") + } + } + + private fun pendingPartitions(listenerState: ListenerBootstrapState): Int { + return listenerState.partitions.values.count { !it.caughtUp } + } + + private fun publishReadiness(ready: Boolean) { + val changed = readinessPublished.getAndSet(ready) != ready + if (changed) { + AvailabilityChangeEvent.publish( + applicationContext, + if (ready) ReadinessState.ACCEPTING_TRAFFIC else ReadinessState.REFUSING_TRAFFIC, + ) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(InitialKafkaBootstrapTracker::class.java) + } +} + +private class ListenerBootstrapState { + val completed = AtomicBoolean(false) + val assignmentSeen = AtomicBoolean(false) + val partitions = ConcurrentHashMap() +} + +private data class PartitionBootstrapState( + val endOffset: Long, + val processedOffset: Long? = null, +) { + val caughtUp: Boolean + get() = endOffset == 0L || ((processedOffset ?: -1L) + 1) >= endOffset + + fun withOffset(offset: Long): PartitionBootstrapState { + return copy(processedOffset = processedOffset?.let { max(it, offset) } ?: offset) + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt new file mode 100644 index 00000000..d5a1004b --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthConfiguration.kt @@ -0,0 +1,11 @@ +package no.fintlabs.consumer.health + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.time.Clock + +@Configuration +class KafkaHealthConfiguration { + @Bean + fun kafkaHealthClock(): Clock = Clock.systemUTC() +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt new file mode 100644 index 00000000..210dcac9 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthMetrics.kt @@ -0,0 +1,190 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tag +import io.micrometer.core.instrument.Timer +import org.springframework.stereotype.Component +import java.time.Clock +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +@Component +class KafkaHealthMetrics( + private val meterRegistry: MeterRegistry, + private val clock: Clock, + private val kafkaHealthProperties: KafkaHealthProperties, +) { + private val counters = ConcurrentHashMap() + private val timers = ConcurrentHashMap() + private val bootstrapStates = ConcurrentHashMap() + private val runtimeStates = ConcurrentHashMap() + private val bootstrapAllStartNanos = AtomicLong(System.nanoTime()) + private val bootstrapAllRecorded = AtomicBoolean(false) + + fun registerBootstrapListener(listenerId: String) { + bootstrapStates.computeIfAbsent(listenerId) { + BootstrapMetricState(System.nanoTime()).also { state -> + meterRegistry.gauge( + "fint.consumer.kafka.bootstrap.state", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.inProgress.get().toDouble() } + meterRegistry.gauge( + "fint.consumer.kafka.bootstrap.partitions.pending", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.pendingPartitions.get().toDouble() } + } + } + } + + fun updateBootstrapPendingPartitions( + listenerId: String, + pendingPartitions: Int, + ) { + bootstrapStates[listenerId]?.pendingPartitions?.set(pendingPartitions) + } + + fun markBootstrapCompleted(listenerId: String) { + bootstrapStates[listenerId]?.let { state -> + state.pendingPartitions.set(0) + state.inProgress.set(0) + if (state.completed.compareAndSet(false, true)) { + counter( + "fint.consumer.kafka.bootstrap.completed", + listOf(Tag.of("listener", listenerId)), + ).increment() + timer( + "fint.consumer.kafka.bootstrap.duration", + listOf(Tag.of("listener", listenerId)), + ).record(System.nanoTime() - state.startNanos, TimeUnit.NANOSECONDS) + } + } + } + + fun markBootstrapAllCompleted() { + if (bootstrapAllRecorded.compareAndSet(false, true)) { + counter( + "fint.consumer.kafka.bootstrap.completed", + listOf(Tag.of("listener", "all")), + ).increment() + timer( + "fint.consumer.kafka.bootstrap.duration", + listOf(Tag.of("listener", "all")), + ).record(System.nanoTime() - bootstrapAllStartNanos.get(), TimeUnit.NANOSECONDS) + } + } + + fun recordBootstrapEndOffsetLookupFailure(listenerId: String) { + counter( + "fint.consumer.kafka.bootstrap.end_offset.lookup.failures", + listOf(Tag.of("listener", listenerId)), + ).increment() + } + + fun registerRuntimeListener(listenerId: String) { + runtimeStates.computeIfAbsent(listenerId) { + RuntimeMetricState().also { state -> + meterRegistry.gauge( + "fint.consumer.kafka.runtime.unhealthy", + listOf(Tag.of("listener", listenerId)), + state, + ) { + if (it.isUnhealthy( + clock.millis(), + kafkaHealthProperties.runtimeGracePeriod.toMillis(), + ) + ) { + 1.0 + } else { + 0.0 + } + } + meterRegistry.gauge( + "fint.consumer.kafka.runtime.problem.duration", + listOf(Tag.of("listener", listenerId)), + state, + ) { it.problemDuration(clock.millis()).toDouble() } + } + } + } + + fun markRuntimeHealthy(listenerId: String) { + runtimeStates[listenerId]?.markHealthy(clock.millis()) + } + + fun markRuntimeProblem( + listenerId: String, + reason: String, + ) { + runtimeStates[listenerId]?.markProblem(clock.millis()) + counter( + "fint.consumer.kafka.runtime.problem", + listOf(Tag.of("listener", listenerId), Tag.of("reason", reason)), + ).increment() + } + + private fun counter( + name: String, + tags: List, + ): Counter { + return counters.computeIfAbsent(meterKey(name, tags)) { meterRegistry.counter(name, tags) } + } + + private fun timer( + name: String, + tags: List, + ): Timer { + return timers.computeIfAbsent(meterKey(name, tags)) { meterRegistry.timer(name, tags) } + } + + private fun meterKey( + name: String, + tags: List, + ): String { + return "$name|${tags.joinToString("|") { "${it.key}=${it.value}" }}" + } +} + +private class BootstrapMetricState( + val startNanos: Long, +) { + val pendingPartitions = AtomicInteger(0) + val inProgress = AtomicInteger(1) + val completed = AtomicBoolean(false) +} + +private class RuntimeMetricState { + private val problemSince = AtomicLong(0L) + + fun markHealthy(now: Long) { + problemSince.set(0L) + } + + fun markProblem(now: Long) { + problemSince.compareAndSet(0L, now) + } + + fun problemDuration(now: Long): Long { + return problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it } + ?: 0L + } + + fun isUnhealthy( + now: Long, + gracePeriodMs: Long, + ): Boolean { + return problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it >= gracePeriodMs } + ?: false + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt new file mode 100644 index 00000000..9729bb40 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaHealthProperties.kt @@ -0,0 +1,12 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.context.properties.ConfigurationProperties +import java.time.Duration + +@ConfigurationProperties(prefix = "fint.consumer.health.kafka") +data class KafkaHealthProperties( + val idleEventInterval: Duration = Duration.ofMinutes(1), + val runtimeGracePeriod: Duration = Duration.ofMinutes(15), + val monitorIntervalSeconds: Int = 30, + val noPollThreshold: Float = 3.0f, +) diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt b/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt new file mode 100644 index 00000000..eba005e7 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaListenerContainerHealthConfigurer.kt @@ -0,0 +1,15 @@ +package no.fintlabs.consumer.health + +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer +import org.springframework.stereotype.Component + +@Component +class KafkaListenerContainerHealthConfigurer( + private val kafkaHealthProperties: KafkaHealthProperties, +) { + fun customize(container: ConcurrentMessageListenerContainer) { + container.containerProperties.idleEventInterval = kafkaHealthProperties.idleEventInterval.toMillis() + container.containerProperties.monitorInterval = kafkaHealthProperties.monitorIntervalSeconds + container.containerProperties.noPollThreshold = kafkaHealthProperties.noPollThreshold + } +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt b/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt new file mode 100644 index 00000000..d4e15e80 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaListenerIds.kt @@ -0,0 +1,9 @@ +package no.fintlabs.consumer.health + +object KafkaListenerIds { + const val ENTITY = "resourceEntityConsumerFactory" + const val REQUEST_EVENT = "requestFintEventRequestListenerContainer" + const val RESPONSE_EVENT = "responseFintEventContainerListener" + const val RELATION_UPDATE = "relationUpdateConsumerContainer" + const val AUTORELATION_ENTITY = "buildAutoRelationConsumer" +} diff --git a/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt b/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt new file mode 100644 index 00000000..f94705ed --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitor.kt @@ -0,0 +1,161 @@ +package no.fintlabs.consumer.health + +import org.springframework.boot.actuate.health.Health +import org.springframework.boot.actuate.health.HealthIndicator +import org.springframework.context.event.EventListener +import org.springframework.kafka.event.ConsumerFailedToStartEvent +import org.springframework.kafka.event.ConsumerStartedEvent +import org.springframework.kafka.event.ConsumerStoppedEvent +import org.springframework.kafka.event.KafkaEvent +import org.springframework.kafka.event.ListenerContainerIdleEvent +import org.springframework.kafka.event.NonResponsiveConsumerEvent +import org.springframework.kafka.listener.MessageListenerContainer +import org.springframework.stereotype.Component +import java.time.Clock +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference + +@Component("kafkaRuntime") +class KafkaRuntimeHealthMonitor( + private val kafkaHealthProperties: KafkaHealthProperties, + private val clock: Clock, + private val kafkaHealthMetrics: KafkaHealthMetrics, +) : HealthIndicator { + private val trackedListeners = ConcurrentHashMap.newKeySet() + private val listenerStates = ConcurrentHashMap() + + fun registerListener(listenerId: String) { + trackedListeners.add(listenerId) + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) } + kafkaHealthMetrics.registerRuntimeListener(listenerId) + } + + fun onRecordProcessed(listenerId: String) { + if (!trackedListeners.contains(listenerId)) { + return + } + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + + @EventListener + fun onConsumerStarted(event: ConsumerStartedEvent) { + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + } + + @EventListener + fun onListenerContainerIdle(event: ListenerContainerIdleEvent) { + event.listenerId.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates.computeIfAbsent(listenerId) { ListenerRuntimeState(now()) }.markHealthy(now()) + kafkaHealthMetrics.markRuntimeHealthy(listenerId) + } + } + + @EventListener + fun onNonResponsiveConsumer(event: NonResponsiveConsumerEvent) { + event.listenerId.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent( + listenerId, + ) { ListenerRuntimeState(now()) } + .markProblem("NON_RESPONSIVE", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "NON_RESPONSIVE") + } + } + + @EventListener + fun onConsumerFailedToStart(event: ConsumerFailedToStartEvent) { + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent( + listenerId, + ) { ListenerRuntimeState(now()) } + .markProblem("FAILED_TO_START", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "FAILED_TO_START") + } + } + + @EventListener + fun onConsumerStopped(event: ConsumerStoppedEvent) { + if (event.reason == ConsumerStoppedEvent.Reason.NORMAL) { + return + } + + listenerIdOf(event)?.takeIf(trackedListeners::contains)?.let { listenerId -> + listenerStates + .computeIfAbsent(listenerId) { ListenerRuntimeState(now()) } + .markProblem("STOPPED_${event.reason.name}", now()) + kafkaHealthMetrics.markRuntimeProblem(listenerId, "STOPPED_${event.reason.name}") + } + } + + override fun health(): Health { + val now = now() + val unhealthyListeners = + trackedListeners + .mapNotNull { listenerId -> + listenerStates[listenerId] + ?.takeIf { it.isUnhealthy(now, kafkaHealthProperties.runtimeGracePeriod.toMillis()) } + ?.let { listenerId to it.snapshot(now) } + }.toMap() + + val builder = if (unhealthyListeners.isEmpty()) Health.up() else Health.down() + + return builder + .withDetail("trackedListeners", trackedListeners.size) + .withDetail("runtimeGracePeriodMs", kafkaHealthProperties.runtimeGracePeriod.toMillis()) + .withDetail("unhealthyListeners", unhealthyListeners) + .build() + } + + private fun listenerIdOf(event: KafkaEvent): String? { + return runCatching { + event.getContainer(MessageListenerContainer::class.java).listenerId + }.getOrNull() + } + + private fun now(): Long = clock.millis() +} + +private class ListenerRuntimeState( + initialHealthyAt: Long, +) { + private val lastHealthyAt = AtomicLong(initialHealthyAt) + private val problemSince = AtomicLong(0L) + private val problem = AtomicReference(null) + + fun markHealthy(now: Long) { + lastHealthyAt.set(now) + problemSince.set(0L) + problem.set(null) + } + + fun markProblem( + reason: String, + now: Long, + ) { + problem.compareAndSet(null, reason) + problemSince.compareAndSet(0L, now) + } + + fun isUnhealthy( + now: Long, + gracePeriodMs: Long, + ): Boolean = + problemSince + .get() + .takeIf { it > 0L } + ?.let { now - it >= gracePeriodMs } + ?: false + + fun snapshot(now: Long): Map = + mapOf( + "problem" to problem.get(), + "problemDurationMs" to (now - problemSince.get()), + "lastHealthyAtMs" to lastHealthyAt.get(), + ) +} diff --git a/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt b/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt new file mode 100644 index 00000000..8aff7fa3 --- /dev/null +++ b/src/main/java/no/fintlabs/consumer/health/ListenerBootstrapStatus.kt @@ -0,0 +1,10 @@ +package no.fintlabs.consumer.health + +data class ListenerBootstrapStatus( + val listenerId: String, + val assignmentSeen: Boolean, + val completed: Boolean, + val assignedPartitions: Int, + val caughtUpPartitions: Int, + val partitions: List, +) diff --git a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt index a4125075..dab154db 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/entity/EntityConsumer.kt @@ -1,6 +1,10 @@ package no.fintlabs.consumer.kafka.entity import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConstants.RESOURCE_NAME import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.kafka.stringValue @@ -21,18 +25,24 @@ class EntityConsumer( private val entityProcessingService: EntityProcessingService, private val consumerConfig: ConsumerConfiguration, private val resourceConverter: ResourceConverter, + private val initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(EntityConsumer::class.java) private const val CONSUMER_NAME = "entity" } - @Bean + @Bean(name = [KafkaListenerIds.ENTITY]) fun resourceEntityConsumerFactory( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + initialKafkaBootstrapTracker.registerBlockingListener(KafkaListenerIds.ENTITY) + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.ENTITY) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( Any::class.java, this::consumeRecord, @@ -41,14 +51,18 @@ class EntityConsumer( .groupIdApplicationDefault() .maxPollRecordsKafkaDefault() .maxPollIntervalKafkaDefault() - .seekToBeginningOnAssignment() - .build(), + .seekToBeginningAndPerformOperationOnAssignment { assignments -> + initialKafkaBootstrapTracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, assignments.keys) + }.onRevocation { partitions -> + initialKafkaBootstrapTracker.onPartitionsRevoked(KafkaListenerIds.ENTITY, partitions) + }.build(), errorHandlerFactory.createErrorHandler( KafkaConsumerErrorHandling.createLoggingErrorHandlerConfiguration( logger, CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EntityTopicNameParameters .builder() @@ -61,10 +75,15 @@ class EntityConsumer( ).resourceName("${consumerConfig.domain}-${consumerConfig.packageName}") .build(), ).apply { concurrency = consumerConfig.kafka.entityConcurrency } + } fun consumeRecord(consumerRecord: ConsumerRecord) = createEntityConsumerRecord(consumerRecord) - .let { entityProcessingService.processEntityConsumerRecord(it) } + .also { entityConsumerRecord -> + entityProcessingService.processEntityConsumerRecord(entityConsumerRecord) + initialKafkaBootstrapTracker.onRecordProcessed(KafkaListenerIds.ENTITY, consumerRecord) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.ENTITY) + } private fun createEntityConsumerRecord(consumerRecord: ConsumerRecord) = consumerRecord.getResourceName().let { resourceName -> diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt index 7082ea03..f8b9029f 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/EventResponseConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.ResponseFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory @@ -19,13 +22,17 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class EventResponseConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { - @Bean + @Bean(name = [KafkaListenerIds.RESPONSE_EVENT]) fun responseFintEventContainerListener( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.RESPONSE_EVENT) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( ResponseFintEvent::class.java, this::consumeRecord, @@ -42,6 +49,7 @@ class EventResponseConsumer( CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EventTopicNameParameters .builder() @@ -54,10 +62,12 @@ class EventResponseConsumer( ).eventName("${consumerConfig.domain}-${consumerConfig.packageName}-response") .build(), ).apply { concurrency = consumerConfig.kafka.responseConcurrency } + } private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Response: {}", consumerRecord.value()) eventStatusCache.trackResponse(consumerRecord.value().corrId, consumerRecord.value()) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.RESPONSE_EVENT) } companion object { diff --git a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt index 5244c157..5bfb758c 100644 --- a/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt +++ b/src/main/java/no/fintlabs/consumer/kafka/event/RequestFintEventConsumer.kt @@ -2,6 +2,9 @@ package no.fintlabs.consumer.kafka.event import no.fintlabs.adapter.models.event.RequestFintEvent import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaListenerIds +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaConsumerErrorHandling import no.fintlabs.consumer.resource.event.EventStatusCache import no.novari.kafka.consuming.ErrorHandlerFactory @@ -19,18 +22,22 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer class RequestFintEventConsumer( private val consumerConfig: ConsumerConfiguration, private val eventStatusCache: EventStatusCache, + private val kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor, + private val kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer, ) { companion object { private val logger = LoggerFactory.getLogger(RequestFintEventConsumer::class.java) private const val CONSUMER_NAME = "request-fint-event" } - @Bean + @Bean(name = [KafkaListenerIds.REQUEST_EVENT]) fun requestFintEventRequestListenerContainer( parameterizedListenerContainerFactoryService: ParameterizedListenerContainerFactoryService, errorHandlerFactory: ErrorHandlerFactory, - ): ConcurrentMessageListenerContainer = - parameterizedListenerContainerFactoryService + ): ConcurrentMessageListenerContainer { + kafkaRuntimeHealthMonitor.registerListener(KafkaListenerIds.REQUEST_EVENT) + + return parameterizedListenerContainerFactoryService .createRecordListenerContainerFactory( RequestFintEvent::class.java, this::consumeRecord, @@ -47,6 +54,7 @@ class RequestFintEventConsumer( CONSUMER_NAME, ), ), + kafkaListenerContainerHealthConfigurer::customize, ).createContainer( EventTopicNameParameters .builder() @@ -59,9 +67,11 @@ class RequestFintEventConsumer( ).eventName("${consumerConfig.domain}-${consumerConfig.packageName}-request") .build(), ).apply { concurrency = consumerConfig.kafka.requestConcurrency } + } private fun consumeRecord(consumerRecord: ConsumerRecord) { logger.info("Received Request: {}", consumerRecord.key()) eventStatusCache.trackRequest(consumerRecord.value()) + kafkaRuntimeHealthMonitor.onRecordProcessed(KafkaListenerIds.REQUEST_EVENT) } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 1ab24193..306f36ea 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -5,8 +5,12 @@ fint: exposed-endpoints: - /actuator/prometheus - /actuator/health + - /actuator/health/readiness + - /actuator/health/liveness + relation: base-url: https://api.felleskomponent.no + consumer: pod-url: http://fint-core-consumer-${fint.consumer.domain}-${fint.consumer.package}:8080 base-url: ${fint.relation.base-url} @@ -15,6 +19,12 @@ fint: packageName: ${fint.consumer.package} org-id: ${fint.org-id} coreVersionHeader: 2 + health: + kafka: + idle-event-interval: 1m + runtime-grace-period: 15m + monitor-interval-seconds: 30 + no-poll-threshold: 3.0 novari: kafka: @@ -34,6 +44,16 @@ spring: base-path: ${fint.consumer.domain}/${fint.consumer.package} management: + endpoint: + health: + probes: + enabled: true + group: + readiness: + include: readinessState,initialKafkaBootstrap + liveness: + include: livenessState,kafkaRuntime + endpoints: web: exposure: diff --git a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt new file mode 100644 index 00000000..0c50fff9 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapHealthIndicatorTest.kt @@ -0,0 +1,29 @@ +package no.fintlabs.consumer.health + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.springframework.boot.actuate.health.Status + +class InitialKafkaBootstrapHealthIndicatorTest { + private val tracker: InitialKafkaBootstrapTracker = mockk() + + @Test + fun `should report out of service while bootstrap is incomplete`() { + every { tracker.snapshot() } returns BootstrapReadinessSnapshot(false, emptyList()) + + val health = InitialKafkaBootstrapHealthIndicator(tracker).health() + + assertEquals(Status.OUT_OF_SERVICE, health.status) + } + + @Test + fun `should report up when bootstrap is complete`() { + every { tracker.snapshot() } returns BootstrapReadinessSnapshot(true, emptyList()) + + val health = InitialKafkaBootstrapHealthIndicator(tracker).health() + + assertEquals(Status.UP, health.status) + } +} diff --git a/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt new file mode 100644 index 00000000..28d0b3b8 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/InitialKafkaBootstrapTrackerTest.kt @@ -0,0 +1,191 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.mockk.every +import io.mockk.mockk +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.context.ApplicationContext +import java.time.Duration +import java.time.Instant + +class InitialKafkaBootstrapTrackerTest { + private val endOffsetProvider: EndOffsetProvider = mockk() + private val applicationContext: ApplicationContext = mockk(relaxed = true) + private val meterRegistry = SimpleMeterRegistry() + private val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + private val kafkaHealthMetrics = KafkaHealthMetrics(meterRegistry, clock, KafkaHealthProperties()) + + private lateinit var tracker: InitialKafkaBootstrapTracker + + @BeforeEach + fun setUp() { + tracker = InitialKafkaBootstrapTracker(endOffsetProvider, applicationContext, kafkaHealthMetrics) + tracker.registerBlockingListener(KafkaListenerIds.ENTITY) + } + + @Test + fun `should stay unready until all assigned partitions catch up`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0, partition1)) } returns + mapOf(partition0 to 2L, partition1 to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 1, 0L, "key", "value")) + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 1L, "key", "value")) + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should become ready immediately when assigned partitions are empty at startup offset`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns + mapOf(partition0 to 0L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should ignore new assignments after initial bootstrap has completed`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } returns mapOf(partition0 to 1L) + every { endOffsetProvider.latestOffsets(setOf(partition1)) } returns mapOf(partition1 to 2L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + + assertTrue(tracker.snapshot().ready) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition1)) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should wait for both entity and relation update listeners`() { + val entityPartition = TopicPartition("entity-topic", 0) + val relationPartition = TopicPartition("relation-topic", 0) + + tracker.registerBlockingListener(KafkaListenerIds.RELATION_UPDATE) + + every { endOffsetProvider.latestOffsets(setOf(entityPartition)) } returns mapOf(entityPartition to 1L) + every { endOffsetProvider.latestOffsets(setOf(relationPartition)) } returns mapOf(relationPartition to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(entityPartition)) + tracker.onPartitionsAssigned(KafkaListenerIds.RELATION_UPDATE, setOf(relationPartition)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("entity-topic", 0, 0L, "key", "value")) + + assertFalse(tracker.snapshot().ready) + + tracker.onRecordProcessed( + KafkaListenerIds.RELATION_UPDATE, + ConsumerRecord("relation-topic", 0, 0L, "key", "value"), + ) + + assertTrue(tracker.snapshot().ready) + } + + @Test + fun `should publish bootstrap metrics for completion and pending partitions`() { + val partition0 = TopicPartition("topic", 0) + val partition1 = TopicPartition("topic", 1) + + every { endOffsetProvider.latestOffsets(setOf(partition0, partition1)) } returns + mapOf(partition0 to 2L, partition1 to 1L) + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0, partition1)) + + assertEquals( + 2.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.partitions.pending") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.state") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + + clock.advance(Duration.ofSeconds(2)) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 0L, "key", "value")) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 1, 0L, "key", "value")) + tracker.onRecordProcessed(KafkaListenerIds.ENTITY, ConsumerRecord("topic", 0, 1L, "key", "value")) + + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.partitions.pending") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.state") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.completed") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.completed") + .tag("listener", "all") + .counter() + .count(), + ) + } + + @Test + fun `should count end offset lookup failures`() { + val partition0 = TopicPartition("topic", 0) + + every { endOffsetProvider.latestOffsets(setOf(partition0)) } throws RuntimeException("boom") + + tracker.onPartitionsAssigned(KafkaListenerIds.ENTITY, setOf(partition0)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.bootstrap.end_offset.lookup.failures") + .tag("listener", KafkaListenerIds.ENTITY) + .counter() + .count(), + ) + } +} diff --git a/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt b/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt new file mode 100644 index 00000000..4b6bd637 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/KafkaRuntimeHealthMonitorTest.kt @@ -0,0 +1,141 @@ +package no.fintlabs.consumer.health + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.mockk.mockk +import org.apache.kafka.clients.consumer.Consumer +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.springframework.boot.actuate.health.Status +import org.springframework.kafka.event.NonResponsiveConsumerEvent +import java.time.Duration +import java.time.Instant + +class KafkaRuntimeHealthMonitorTest { + @Test + fun `should stay up during grace period for non responsive consumer`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + clock.advance(Duration.ofMinutes(14)) + + assertEquals(Status.UP, monitor.health().status) + } + + @Test + fun `should go down after grace period for non responsive consumer`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + clock.advance(Duration.ofMinutes(16)) + + assertEquals(Status.DOWN, monitor.health().status) + } + + @Test + fun `should recover after healthy activity resumes`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val monitor = + KafkaRuntimeHealthMonitor( + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + clock, + KafkaHealthMetrics( + SimpleMeterRegistry(), + clock, + KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)), + ), + ) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + clock.advance(Duration.ofMinutes(5)) + + monitor.onRecordProcessed(KafkaListenerIds.ENTITY) + clock.advance(Duration.ofMinutes(20)) + + assertEquals(Status.UP, monitor.health().status) + } + + @Test + fun `should publish runtime metrics for problem and unhealthy state`() { + val clock = MutableClock(Instant.parse("2026-03-20T10:00:00Z")) + val meterRegistry = SimpleMeterRegistry() + val properties = KafkaHealthProperties(runtimeGracePeriod = Duration.ofMinutes(15)) + val monitor = KafkaRuntimeHealthMonitor(properties, clock, KafkaHealthMetrics(meterRegistry, clock, properties)) + + monitor.registerListener(KafkaListenerIds.ENTITY) + monitor.onNonResponsiveConsumer(nonResponsiveConsumerEvent(KafkaListenerIds.ENTITY)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.runtime.problem") + .tag("listener", KafkaListenerIds.ENTITY) + .tag("reason", "NON_RESPONSIVE") + .counter() + .count(), + ) + assertEquals( + 0.0, + meterRegistry + .get("fint.consumer.kafka.runtime.unhealthy") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + + clock.advance(Duration.ofMinutes(16)) + + assertEquals( + 1.0, + meterRegistry + .get("fint.consumer.kafka.runtime.unhealthy") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + assertEquals( + Duration.ofMinutes(16).toMillis().toDouble(), + meterRegistry + .get("fint.consumer.kafka.runtime.problem.duration") + .tag("listener", KafkaListenerIds.ENTITY) + .gauge() + .value(), + ) + } +} + +private fun nonResponsiveConsumerEvent(listenerId: String): NonResponsiveConsumerEvent = + NonResponsiveConsumerEvent( + Any(), + Any(), + 1_000L, + listenerId, + emptyList(), + mockk>(relaxed = true), + ) diff --git a/src/test/java/no/fintlabs/consumer/health/MutableClock.kt b/src/test/java/no/fintlabs/consumer/health/MutableClock.kt new file mode 100644 index 00000000..7dd866f5 --- /dev/null +++ b/src/test/java/no/fintlabs/consumer/health/MutableClock.kt @@ -0,0 +1,21 @@ +package no.fintlabs.consumer.health + +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.time.ZoneId +import java.time.ZoneOffset + +class MutableClock( + private var instant: Instant, +) : Clock() { + override fun getZone(): ZoneId = ZoneOffset.UTC + + override fun withZone(zone: ZoneId): Clock = this + + override fun instant(): Instant = instant + + fun advance(duration: Duration) { + instant = instant.plus(duration) + } +} diff --git a/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt b/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt index c54442c2..b335dfa0 100644 --- a/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt +++ b/src/test/java/no/fintlabs/consumer/kafka/entity/RelationUpdateConsumerTest.kt @@ -8,6 +8,9 @@ import no.fintlabs.autorelation.kafka.RelationUpdateConsumer import no.fintlabs.autorelation.model.RelationUpdate import no.fintlabs.autorelation.model.createEntityDescriptor import no.fintlabs.consumer.config.ConsumerConfiguration +import no.fintlabs.consumer.health.InitialKafkaBootstrapTracker +import no.fintlabs.consumer.health.KafkaListenerContainerHealthConfigurer +import no.fintlabs.consumer.health.KafkaRuntimeHealthMonitor import no.fintlabs.consumer.kafka.KafkaThroughputMetrics import org.apache.kafka.clients.consumer.ConsumerRecord import org.junit.jupiter.api.BeforeEach @@ -20,6 +23,9 @@ class RelationUpdateConsumerTest { private lateinit var consumerRecord: ConsumerRecord private lateinit var relationUpdate: RelationUpdate private lateinit var kafkaThroughputMetrics: KafkaThroughputMetrics + private lateinit var initialKafkaBootstrapTracker: InitialKafkaBootstrapTracker + private lateinit var kafkaRuntimeHealthMonitor: KafkaRuntimeHealthMonitor + private lateinit var kafkaListenerContainerHealthConfigurer: KafkaListenerContainerHealthConfigurer @BeforeEach fun setUp() { @@ -32,7 +38,18 @@ class RelationUpdateConsumerTest { } kafkaThroughputMetrics = mockk(relaxed = true) - relationUpdateConsumer = RelationUpdateConsumer(autoRelationService, consumerConfig, kafkaThroughputMetrics) + initialKafkaBootstrapTracker = mockk(relaxed = true) + kafkaRuntimeHealthMonitor = mockk(relaxed = true) + kafkaListenerContainerHealthConfigurer = mockk(relaxed = true) + relationUpdateConsumer = + RelationUpdateConsumer( + autoRelationService, + consumerConfig, + kafkaThroughputMetrics, + initialKafkaBootstrapTracker, + kafkaRuntimeHealthMonitor, + kafkaListenerContainerHealthConfigurer, + ) } @Test