diff --git a/README.md b/README.md index 30d2db7..160d743 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,44 @@ # Fint Kafkarator -Fint Kafkarator is an operator that creates a service user and ACL in Aiven for Kafka. -Username, password, ACL id and access certificate and -key will be stored in kubernetes secrets +Fint Kafkarator is a Kubernetes operator that provisions Kafka service users and ACLs in Aiven, and publishes the client configuration and certificate material as Kubernetes secrets. + +Runtime stack: + +- Spring Boot `3.5.x` +- Java `25` +- Gradle `9.4.x` ## What does the operator do? -When a `KafkaUserAndAcl` CR is **created**: -* The operator will create a service user and ACL in Aiven. -* Username, password and ACL id will be generated and stored in secrets along with access certificate and -key. +When a `KafkaUserAndAcl` resource is created: +- The operator creates a service user and ACLs in Aiven. +- The operator creates a `-kafka` secret with Spring Kafka SSL configuration. +- The operator creates a `-kafka-certificates` secret with `client.keystore.p12` and `client.truststore.jks`. + +When a `KafkaUserAndAcl` resource is deleted: +- The operator deletes the user and ACLs from Aiven. +- The operator deletes the managed secrets from Kubernetes. + +When an existing certificate secret is reconciled: +- The operator inspects the current client certificate expiry date. +- The operator rotates the keystore and truststore if the certificate is missing, unreadable, or inside the configured rotation threshold. +- The operator annotates the secret with the observed certificate expiry and last rotation time. + +## Operational Improvements + +Operationally relevant improvements: -When a `KafkaUserAndAcl` CR is **deleted**: -* The operator will delete the user and ACL from Aiven. -* The operator will delete the secrets from Kubernetes. +- Expiry-aware certificate handling instead of only verifying that the keystore can be opened. +- Configurable certificate rotation threshold via `fint.aiven.certificate-rotation-threshold`. +- Prometheus metrics for certificate expiry, rotation pressure, inspections, rotations and reconcile duration. +- Grafana/PromQL documentation for dashboards and alerting. -## How to use the operator: +See: + +- [PromQL examples](docs/metrics-promql.md) +- [Grafana dashboard JSON](docs/kafkarator-grafana-dashboard.json) + +## Custom Resource ### KafkaUserAndAcl ```yaml @@ -56,9 +81,43 @@ spec: topic: '*sample-test2' ``` -#### Prerequisites -* Aiven account, project and service -* Aiven token and Aiven api base url in application.yaml +## Prerequisites + +- Aiven account, project and service +- Aiven token and Aiven API base URL in `application.yaml` + +## Configuration + +Relevant application properties: -### Using the operator -TODO \ No newline at end of file +```yaml +fint: + aiven: + base-url: https://api.aiven.io/v1 + project: fintlabs + service: kafka-alpha + kafka-bootstrap-servers: broker-1:9092,broker-2:9092 + certificate-rotation-threshold: 30d +``` + +## Metrics + +Kafkarator exposes Prometheus metrics on `/actuator/prometheus`. + +Key metrics: + +- `kafkarator_certificate_expiry_seconds` +- `kafkarator_certificate_days_until_expiry` +- `kafkarator_certificate_rotation_due` +- `kafkarator_certificate_oldest_days_until_expiry` +- `kafkarator_certificate_inspections_total` +- `kafkarator_certificate_rotations_total` +- `kafkarator_certificate_secret_reconcile_duration_seconds` + +## Building And Testing + +Run the full test suite: + +```bash +./gradlew test +``` diff --git a/build.gradle b/build.gradle index def3da5..78f0cff 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,9 @@ repositories { } dependencies { + implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-webflux' + runtimeOnly 'io.micrometer:micrometer-registry-prometheus' implementation 'no.fintlabs:flais-operator-starter:1.0.0' diff --git a/docs/certificate-rotation-flow.md b/docs/certificate-rotation-flow.md new file mode 100644 index 0000000..fa1da67 --- /dev/null +++ b/docs/certificate-rotation-flow.md @@ -0,0 +1,100 @@ +# Certificate Rotation Behavior + +This document describes how Kafkarator handles existing certificate secrets, expiry-aware rotation, and metadata updates. + +## Reconcile Decision Flow + +```mermaid +flowchart TD + A["KafkaUserAndAcl reconcile starts"] --> B["Load KafkaUserAndAcl secondary resource"] + B --> C["Load -kafka secret"] + C --> D["Read key store password and trust store password"] + D --> E["Load existing -kafka-certificates secret if present"] + E --> F{"Existing client.keystore.p12 present?"} + + F -- "No" --> G["Mark rotation required"] + F -- "Yes" --> H["Inspect keystore and extract leaf certificate notAfter"] + + H --> I{"Keystore readable?"} + I -- "No" --> G + I -- "Yes" --> J{"Certificate expires within rotation threshold?"} + J -- "Yes" --> G + J -- "No" --> K["Reuse existing keystore"] + + G --> L["Generate new keystore from Aiven access_cert/access_key and CA"] + L --> M["Generate new truststore from Aiven CA"] + + K --> N{"Existing truststore reusable?"} + N -- "Yes" --> O["Reuse existing truststore"] + N -- "No" --> P["Generate new truststore from Aiven CA"] + + M --> Q["Inspect resulting keystore"] + O --> Q + P --> Q + + Q --> R["Update annotations"] + R --> R1["Set certificate-not-after"] + R1 --> R2{"Did rotation happen?"} + R2 -- "Yes" --> R3["Set last-rotated-at"] + R2 -- "No" --> S["Keep existing last-rotated-at as-is"] + R3 --> T["Write -kafka-certificates secret"] + S --> T + + T --> U["Publish metrics for inspection, rotation, and reconcile duration"] + U --> V["Reconcile completes"] +``` + +## Existing Secret Adoption After Deploy + +This diagram shows what happens after deploying the new Kafkarator version into a cluster with existing secrets that do not yet have the new annotations. + +```mermaid +sequenceDiagram + participant O as "Kafkarator" + participant CR as "KafkaUserAndAcl" + participant KS as "-kafka Secret" + participant CS as "-kafka-certificates Secret" + participant A as "Aiven" + participant M as "Prometheus Metrics" + + O->>CR: Reconcile custom resource + O->>KS: Read Kafka SSL passwords + O->>CS: Read existing keystore/truststore and annotations + + alt "No certificate annotations on existing secret" + O->>CS: Inspect client.keystore.p12 + alt "Keystore readable and cert outside threshold" + O->>CS: Patch secret metadata with certificate-not-after + Note over O,CS: last-rotated-at remains absent until an actual rotation happens + else "Keystore unreadable, expired, missing, or inside threshold" + O->>A: Use current Aiven credentials and CA + O->>CS: Regenerate keystore and truststore + O->>CS: Write certificate-not-after and last-rotated-at + end + else "Annotations already present" + O->>CS: Re-evaluate actual keystore state + Note over O,CS: annotations are informative, not the source of truth + end + + O->>M: Publish inspection counters and resource gauges + O->>M: Publish rotation counters if rotation was attempted + O->>M: Publish reconcile duration +``` + +## Backward Compatibility Summary + +```mermaid +flowchart LR + A["Existing secret without annotations"] --> B["Secret is still accepted"] + B --> C["Kafkarator inspects actual keystore content"] + C --> D{"Healthy certificate?"} + D -- "Yes" --> E["Backfill certificate-not-after annotation only"] + D -- "No" --> F["Rotate keystore/truststore and write both annotations"] +``` + +## Notes + +- The annotations are derived metadata, not required input. +- Missing annotations do not break reconcile. +- The actual keystore content remains the source of truth. +- A large number of old secrets may be patched shortly after rollout, either to backfill annotations or to rotate certificates that are already due. diff --git a/docs/kafkarator-grafana-dashboard.json b/docs/kafkarator-grafana-dashboard.json new file mode 100644 index 0000000..dc08534 --- /dev/null +++ b/docs/kafkarator-grafana-dashboard.json @@ -0,0 +1,864 @@ +{ + "__inputs": [ + { + "name": "DS_PROMETHEUS", + "label": "Prometheus", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "11.0.0" + }, + { + "type": "panel", + "id": "timeseries", + "name": "Time series", + "version": "" + }, + { + "type": "panel", + "id": "stat", + "name": "Stat", + "version": "" + }, + { + "type": "panel", + "id": "table", + "name": "Table", + "version": "" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 7 + }, + { + "color": "yellow", + "value": 30 + }, + { + "color": "green", + "value": 60 + } + ] + }, + "unit": "d" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "min(kafkarator_certificate_days_until_expiry{job=~\"$job\",namespace=~\"$namespace\",team=~\"$team\",service=~\"$service\"})", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "Lowest Days Until Expiry", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 1 + }, + { + "color": "red", + "value": 10 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 6, + "y": 0 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "count(kafkarator_certificate_rotation_due{job=~\"$job\",namespace=~\"$namespace\",team=~\"$team\",service=~\"$service\"} > 0)", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "Certificates Due For Rotation", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + }, + { + "color": "red", + "value": 10 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 12, + "y": 0 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "count(kafkarator_certificate_days_until_expiry{job=~\"$job\",namespace=~\"$namespace\",team=~\"$team\",service=~\"$service\"} < 30)", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "Certificates Expiring < 30 Days", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 18, + "y": 0 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "increase(kafkarator_certificate_rotations_total{job=~\"$job\",team=~\"$team\",service=~\"$service\",result=\"failure\"}[24h])", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "Rotation Failures Last 24h", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "cellOptions": { + "type": "auto" + }, + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 7 + }, + { + "color": "yellow", + "value": 30 + }, + { + "color": "green", + "value": 60 + } + ] + }, + "unit": "d" + }, + "overrides": [] + }, + "gridPos": { + "h": 10, + "w": 12, + "x": 0, + "y": 5 + }, + "id": 5, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "enablePagination": true, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true, + "sortBy": [ + { + "desc": false, + "displayName": "Value" + } + ] + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "bottomk(20, kafkarator_certificate_days_until_expiry{job=~\"$job\",namespace=~\"$namespace\",team=~\"$team\",service=~\"$service\"})", + "format": "table", + "instant": true, + "legendFormat": "", + "range": false, + "refId": "A" + } + ], + "title": "Top 20 Certificates Closest To Expiry", + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 1 + }, + { + "color": "red", + "value": 5 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 10, + "w": 12, + "x": 12, + "y": 5 + }, + "id": 6, + "options": { + "displayMode": "gradient", + "minVizHeight": 10, + "minVizWidth": 0, + "namePlacement": "auto", + "orientation": "horizontal", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showUnfilled": true, + "sizing": "auto", + "valueMode": "color" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "sum by (team) (kafkarator_certificate_rotation_due{job=~\"$job\",namespace=~\"$namespace\",team=~\"$team\",service=~\"$service\"} > 0)", + "legendFormat": "{{team}}", + "range": true, + "refId": "A" + } + ], + "title": "Rotation Due By Team", + "type": "bargauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 15 + }, + "id": 7, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "sum by (result, reason) (rate(kafkarator_certificate_rotations_total{job=~\"$job\",team=~\"$team\",service=~\"$service\"}[5m]))", + "legendFormat": "{{result}} / {{reason}}", + "range": true, + "refId": "A" + } + ], + "title": "Rotation Activity By Result And Reason", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 15 + }, + "id": 8, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "sum by (result) (rate(kafkarator_certificate_inspections_total{job=~\"$job\",team=~\"$team\",service=~\"$service\"}[5m]))", + "legendFormat": "{{result}}", + "range": true, + "refId": "A" + } + ], + "title": "Certificate Inspections By Result", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 24 + }, + "id": 9, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "sum by (service) (rate(kafkarator_certificate_secret_reconcile_duration_seconds_sum{job=~\"$job\",team=~\"$team\",service=~\"$service\"}[5m])) / sum by (service) (rate(kafkarator_certificate_secret_reconcile_duration_seconds_count{job=~\"$job\",team=~\"$team\",service=~\"$service\"}[5m]))", + "legendFormat": "{{service}}", + "range": true, + "refId": "A" + } + ], + "title": "Average Reconcile Duration By Service", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 1 + }, + { + "color": "red", + "value": 5 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 24 + }, + "id": 10, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "max_over_time(kafkarator_certificate_secret_reconcile_duration_seconds_max{job=~\"$job\",team=~\"$team\",service=~\"$service\"}[15m])", + "legendFormat": "", + "range": true, + "refId": "A" + } + ], + "title": "Max Reconcile Duration Last 15m", + "type": "stat" + } + ], + "refresh": "1m", + "schemaVersion": 39, + "style": "dark", + "tags": [ + "kafkarator", + "aiven", + "kafka", + "certificates" + ], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "Prometheus", + "value": "${DS_PROMETHEUS}" + }, + "hide": 0, + "includeAll": false, + "label": "Datasource", + "name": "datasource", + "options": [], + "query": "prometheus", + "refresh": 1, + "type": "datasource" + }, + { + "current": { + "selected": true, + "text": "kafkarator", + "value": "kafkarator" + }, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "definition": "label_values(kafkarator_certificate_days_until_expiry, job)", + "hide": 0, + "includeAll": true, + "label": "Job", + "multi": false, + "name": "job", + "options": [], + "query": { + "query": "label_values(kafkarator_certificate_days_until_expiry, job)", + "refId": "Prometheus-job-Variable-Query" + }, + "refresh": 1, + "regex": "", + "sort": 1, + "type": "query" + }, + { + "current": { + "selected": true, + "text": "All", + "value": "$__all" + }, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "definition": "label_values(kafkarator_certificate_days_until_expiry{job=~\"$job\"}, namespace)", + "hide": 0, + "includeAll": true, + "label": "Namespace", + "multi": true, + "name": "namespace", + "options": [], + "query": { + "query": "label_values(kafkarator_certificate_days_until_expiry{job=~\"$job\"}, namespace)", + "refId": "Prometheus-namespace-Variable-Query" + }, + "refresh": 1, + "regex": "", + "sort": 1, + "type": "query" + }, + { + "current": { + "selected": true, + "text": "All", + "value": "$__all" + }, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "definition": "label_values(kafkarator_certificate_days_until_expiry{job=~\"$job\",namespace=~\"$namespace\"}, team)", + "hide": 0, + "includeAll": true, + "label": "Team", + "multi": true, + "name": "team", + "options": [], + "query": { + "query": "label_values(kafkarator_certificate_days_until_expiry{job=~\"$job\",namespace=~\"$namespace\"}, team)", + "refId": "Prometheus-team-Variable-Query" + }, + "refresh": 1, + "regex": "", + "sort": 1, + "type": "query" + }, + { + "current": { + "selected": true, + "text": "All", + "value": "$__all" + }, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "definition": "label_values(kafkarator_certificate_days_until_expiry{job=~\"$job\",namespace=~\"$namespace\",team=~\"$team\"}, service)", + "hide": 0, + "includeAll": true, + "label": "Service", + "multi": true, + "name": "service", + "options": [], + "query": { + "query": "label_values(kafkarator_certificate_days_until_expiry{job=~\"$job\",namespace=~\"$namespace\",team=~\"$team\"}, service)", + "refId": "Prometheus-service-Variable-Query" + }, + "refresh": 1, + "regex": "", + "sort": 1, + "type": "query" + } + ] + }, + "time": { + "from": "now-24h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Kafkarator Certificate Operations", + "uid": "kafkarator-cert-ops", + "version": 1, + "weekStart": "" +} diff --git a/docs/metrics-promql.md b/docs/metrics-promql.md new file mode 100644 index 0000000..c6cc662 --- /dev/null +++ b/docs/metrics-promql.md @@ -0,0 +1,261 @@ +# Kafkarator Metrics And PromQL + +Kafkarator exposes Prometheus metrics on `/actuator/prometheus`. + +The examples below assume the time series use the label `job="kafkarator"`. If your scrape configuration uses different labels, adjust the filters accordingly. + +## Available Metrics + +- `kafkarator_certificate_expiry_seconds` +- `kafkarator_certificate_days_until_expiry` +- `kafkarator_certificate_rotation_due` +- `kafkarator_certificate_oldest_days_until_expiry` +- `kafkarator_certificate_inspections_total` +- `kafkarator_certificate_rotations_total` +- `kafkarator_certificate_secret_reconcile_duration_seconds_count` +- `kafkarator_certificate_secret_reconcile_duration_seconds_sum` +- `kafkarator_certificate_secret_reconcile_duration_seconds_max` + +Resource gauges are tagged with: + +- `namespace` +- `name` +- `team` +- `service` + +Counters and the reconcile timer are tagged with: + +- `team` +- `service` + +Additional labels: + +- `result` on `kafkarator_certificate_inspections_total` +- `result` and `reason` on `kafkarator_certificate_rotations_total` + +## Grafana Panels + +### Lowest days until expiry + +```promql +min(kafkarator_certificate_days_until_expiry{job="kafkarator"}) +``` + +Use as a stat panel. + +### Number of certificates due for rotation now + +```promql +count(kafkarator_certificate_rotation_due{job="kafkarator"} > 0) +``` + +Use as a stat panel. + +### Number of certificates expiring within 30 days + +```promql +count(kafkarator_certificate_days_until_expiry{job="kafkarator"} < 30) +``` + +Use as a stat panel. + +### 20 certificates closest to expiry + +```promql +bottomk(20, kafkarator_certificate_days_until_expiry{job="kafkarator"}) +``` + +Use as a table panel. Show the labels `namespace`, `name`, `team`, `service`. + +### All certificates sorted by days until expiry + +```promql +sort(kafkarator_certificate_days_until_expiry{job="kafkarator"}) +``` + +Use as a table panel. + +### Certificates currently due for rotation + +```promql +kafkarator_certificate_rotation_due{job="kafkarator"} > 0 +``` + +Use as a table panel. Show the labels `namespace`, `name`, `team`, `service`. + +### Certificates due for rotation by team + +```promql +sum by (team) (kafkarator_certificate_rotation_due{job="kafkarator"} > 0) +``` + +Use as a bar chart. + +### Certificates due for rotation by namespace + +```promql +sum by (namespace) (kafkarator_certificate_rotation_due{job="kafkarator"} > 0) +``` + +Use as a bar chart. + +### Rotations by result and reason + +```promql +sum by (result, reason) ( + rate(kafkarator_certificate_rotations_total{job="kafkarator"}[5m]) +) +``` + +Use as a time series panel. + +### Rotations by team + +```promql +sum by (team, result) ( + rate(kafkarator_certificate_rotations_total{job="kafkarator"}[5m]) +) +``` + +Use as a stacked time series or bar chart. + +### Inspections by result + +```promql +sum by (result) ( + rate(kafkarator_certificate_inspections_total{job="kafkarator"}[5m]) +) +``` + +Use as a time series panel. + +### Average reconcile duration + +```promql +sum(rate(kafkarator_certificate_secret_reconcile_duration_seconds_sum{job="kafkarator"}[5m])) +/ +sum(rate(kafkarator_certificate_secret_reconcile_duration_seconds_count{job="kafkarator"}[5m])) +``` + +Use as a stat panel or time series panel. + +### Average reconcile duration by service + +```promql +sum by (service) ( + rate(kafkarator_certificate_secret_reconcile_duration_seconds_sum{job="kafkarator"}[5m]) +) +/ +sum by (service) ( + rate(kafkarator_certificate_secret_reconcile_duration_seconds_count{job="kafkarator"}[5m]) +) +``` + +Use as a time series panel. + +### Max reconcile duration over the last 15 minutes + +```promql +max_over_time(kafkarator_certificate_secret_reconcile_duration_seconds_max{job="kafkarator"}[15m]) +``` + +Use as a stat panel or time series panel. + +## Alert Queries + +### Critical: at least one certificate has expired + +```promql +min(kafkarator_certificate_days_until_expiry{job="kafkarator"}) < 0 +``` + +Recommended `for`: `15m` + +### High: certificate expires within 7 days + +```promql +kafkarator_certificate_days_until_expiry{job="kafkarator"} < 7 +``` + +Recommended `for`: `30m` + +### Warning: certificate expires within 30 days + +```promql +kafkarator_certificate_days_until_expiry{job="kafkarator"} < 30 +``` + +Recommended `for`: `2h` + +### High: certificate remains due for rotation over time + +```promql +kafkarator_certificate_rotation_due{job="kafkarator"} > 0 +``` + +Recommended `for`: `6h` + +### Critical: rotation fails + +```promql +increase(kafkarator_certificate_rotations_total{job="kafkarator",result="failure"}[15m]) > 0 +``` + +Recommended `for`: `0m` + +### Warning: no inspections in the last hour + +```promql +sum(increase(kafkarator_certificate_inspections_total{job="kafkarator"}[1h])) == 0 +``` + +Recommended `for`: `1h` + +## Useful Drilldown Queries + +### Show a single namespace + +```promql +sort(kafkarator_certificate_days_until_expiry{job="kafkarator",namespace="$namespace"}) +``` + +### Show a single team + +```promql +sort(kafkarator_certificate_days_until_expiry{job="kafkarator",team="$team"}) +``` + +### Find resources with negative time until expiry + +```promql +kafkarator_certificate_days_until_expiry{job="kafkarator"} < 0 +``` + +### Find resources reported as unreadable + +```promql +sum by (team, service) ( + increase(kafkarator_certificate_inspections_total{job="kafkarator",result="unreadable"}[1h]) +) +``` + +## Recommended Dashboard Layout + +Top row: + +- `min(kafkarator_certificate_days_until_expiry{job="kafkarator"})` +- `count(kafkarator_certificate_rotation_due{job="kafkarator"} > 0)` +- `count(kafkarator_certificate_days_until_expiry{job="kafkarator"} < 30)` +- `increase(kafkarator_certificate_rotations_total{job="kafkarator",result="failure"}[24h])` + +Middle row: + +- `bottomk(20, kafkarator_certificate_days_until_expiry{job="kafkarator"})` +- `sum by (team) (kafkarator_certificate_rotation_due{job="kafkarator"} > 0)` +- `sum by (result, reason) (rate(kafkarator_certificate_rotations_total{job="kafkarator"}[5m]))` + +Bottom row: + +- `sum by (result) (rate(kafkarator_certificate_inspections_total{job="kafkarator"}[5m]))` +- average reconcile duration +- max reconcile duration over the last 15 minutes diff --git a/src/main/java/no/fintlabs/aiven/AivenProperties.java b/src/main/java/no/fintlabs/aiven/AivenProperties.java index 61a011a..f528f3c 100644 --- a/src/main/java/no/fintlabs/aiven/AivenProperties.java +++ b/src/main/java/no/fintlabs/aiven/AivenProperties.java @@ -5,6 +5,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; +import java.time.Duration; + @Getter @Setter @Configuration @@ -16,5 +18,6 @@ public class AivenProperties { private String project = "fintlabs"; private String service; private String kafkaBootstrapServers; + private Duration certificateRotationThreshold = Duration.ofDays(30); } diff --git a/src/main/java/no/fintlabs/keystore/KeyStoreService.java b/src/main/java/no/fintlabs/keystore/KeyStoreService.java index 87acb51..a549684 100644 --- a/src/main/java/no/fintlabs/keystore/KeyStoreService.java +++ b/src/main/java/no/fintlabs/keystore/KeyStoreService.java @@ -16,13 +16,38 @@ import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.security.spec.PKCS8EncodedKeySpec; +import java.time.Duration; +import java.time.Instant; import java.util.Base64; +import java.util.Enumeration; import java.util.regex.Pattern; @Slf4j @Service public class KeyStoreService extends Store { + public record KeyStoreInspection(boolean readable, Instant notAfter, String errorMessage) { + + public static KeyStoreInspection valid(Instant notAfter) { + return new KeyStoreInspection(true, notAfter, null); + } + + public static KeyStoreInspection invalid(String errorMessage) { + return new KeyStoreInspection(false, null, errorMessage); + } + + public boolean needsRotation(Duration threshold) { + if (!readable || notAfter == null) { + return true; + } + + Duration normalizedThreshold = threshold == null ? Duration.ZERO : threshold; + Instant rotationDeadline = Instant.now().plus(normalizedThreshold); + + return !notAfter.isAfter(rotationDeadline); + } + } + private X509Certificate loadCertificate(String accessCert) throws GeneralSecurityException { CertificateFactory factory = CertificateFactory.getInstance("X.509"); @@ -65,21 +90,55 @@ public KeyStore createKeyStore(String cert, String key, String ca, char[] passwo } public String verifyKeyStore(String base64KeyStore, String password) { + KeyStoreInspection inspection = inspectKeyStore(base64KeyStore, password); + + if (!inspection.readable()) { + return null; + } + + log.debug("Key store is ok!"); + return base64KeyStore; + } + + public KeyStoreInspection inspectKeyStore(String base64KeyStore, String password) { try { byte[] decode = Base64.getDecoder().decode(base64KeyStore); KeyStore keyStore = KeyStore.getInstance("PKCS12"); keyStore.load(new ByteArrayInputStream(decode), password.toCharArray()); - log.debug("Key store is ok!"); - return base64KeyStore; - } catch (KeyStoreException | CertificateException | IOException | NoSuchAlgorithmException e) { + X509Certificate certificate = resolveLeafCertificate(keyStore); + + return KeyStoreInspection.valid(certificate.getNotAfter().toInstant()); + } catch (IllegalArgumentException | KeyStoreException | CertificateException | IOException | + NoSuchAlgorithmException e) { log.debug("Unable to open key store with error '{}'.", e.getMessage()); - log.error("We need to create a new one!"); - return null; + return KeyStoreInspection.invalid(e.getMessage()); } } public String createKeyStoreAndGetAsBase64(String cert, String key, String ca, char[] password) { return storeToBase64(createKeyStore(cert, key, ca, password), password); } + + private X509Certificate resolveLeafCertificate(KeyStore keyStore) throws KeyStoreException { + Enumeration aliases = keyStore.aliases(); + + while (aliases.hasMoreElements()) { + String alias = aliases.nextElement(); + + if (keyStore.isKeyEntry(alias)) { + Certificate[] certificateChain = keyStore.getCertificateChain(alias); + if (certificateChain != null && certificateChain.length > 0 && certificateChain[0] instanceof X509Certificate certificate) { + return certificate; + } + } + + Certificate certificate = keyStore.getCertificate(alias); + if (certificate instanceof X509Certificate x509Certificate) { + return x509Certificate; + } + } + + throw new KeyStoreException("No X509 certificate found in key store"); + } } diff --git a/src/main/java/no/fintlabs/operator/CertificateMetricsService.java b/src/main/java/no/fintlabs/operator/CertificateMetricsService.java new file mode 100644 index 0000000..e1ed8d5 --- /dev/null +++ b/src/main/java/no/fintlabs/operator/CertificateMetricsService.java @@ -0,0 +1,185 @@ +package no.fintlabs.operator; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; +import no.fintlabs.keystore.KeyStoreService; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class CertificateMetricsService { + + private final MeterRegistry meterRegistry; + private final Map resourceGauges = new ConcurrentHashMap<>(); + + public CertificateMetricsService(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + Gauge.builder("kafkarator_certificate_oldest_days_until_expiry", resourceGauges, this::oldestDaysUntilExpiry) + .description("Lowest number of days remaining before any managed Kafka client certificate expires") + .register(meterRegistry); + } + + public Timer.Sample startReconcile() { + return Timer.start(meterRegistry); + } + + public void recordReconcile(KafkaUserAndAclCrd primary, String serviceName, Timer.Sample sample) { + sample.stop(Timer.builder("kafkarator_certificate_secret_reconcile_duration_seconds") + .description("Duration of certificate secret reconcile operations") + .tags(aggregateTags(primary, serviceName)) + .register(meterRegistry)); + } + + public void recordInspection(KafkaUserAndAclCrd primary, + String serviceName, + KeyStoreService.KeyStoreInspection inspection, + Duration threshold) { + Counter.builder("kafkarator_certificate_inspections_total") + .description("Number of certificate inspections performed during reconcile") + .tags(withResultTags(primary, serviceName, inspectionResult(inspection, threshold))) + .register(meterRegistry) + .increment(); + } + + public void updateResourceState(KafkaUserAndAclCrd primary, + String serviceName, + KeyStoreService.KeyStoreInspection inspection, + Duration threshold) { + GaugeValues values = resourceGauges.computeIfAbsent( + new ResourceKey( + primary.getMetadata().getNamespace(), + primary.getMetadata().getName(), + team(primary), + service(serviceName) + ), + this::registerResourceGauges + ); + + values.expiryEpochSeconds = inspection.notAfter() == null ? Double.NaN : inspection.notAfter().getEpochSecond(); + values.daysUntilExpiry = inspection.notAfter() == null + ? Double.NaN + : Duration.between(Instant.now(), inspection.notAfter()).toSeconds() / 86400.0d; + values.rotationDue = inspection.needsRotation(threshold) ? 1.0d : 0.0d; + } + + public void recordRotation(KafkaUserAndAclCrd primary, String serviceName, boolean success, String reason) { + Counter.builder("kafkarator_certificate_rotations_total") + .description("Number of Kafka client certificate rotations attempted by Kafkarator") + .tags(withRotationTags(primary, serviceName, success, reason)) + .register(meterRegistry) + .increment(); + } + + private GaugeValues registerResourceGauges(ResourceKey key) { + GaugeValues values = new GaugeValues(); + Gauge.builder("kafkarator_certificate_expiry_seconds", values, GaugeValues::getExpiryEpochSeconds) + .description("Unix epoch seconds at which the managed Kafka client certificate expires") + .tags(resourceTags(key)) + .register(meterRegistry); + Gauge.builder("kafkarator_certificate_days_until_expiry", values, GaugeValues::getDaysUntilExpiry) + .description("Days remaining before the managed Kafka client certificate expires") + .tags(resourceTags(key)) + .register(meterRegistry); + Gauge.builder("kafkarator_certificate_rotation_due", values, GaugeValues::getRotationDue) + .description("Whether the managed Kafka client certificate is due for rotation within the configured threshold") + .tags(resourceTags(key)) + .register(meterRegistry); + return values; + } + + private Iterable resourceTags(ResourceKey key) { + return List.of( + Tag.of("namespace", key.namespace()), + Tag.of("name", key.name()), + Tag.of("team", key.team()), + Tag.of("service", key.service()) + ); + } + + private Iterable aggregateTags(KafkaUserAndAclCrd primary, String serviceName) { + return List.of( + Tag.of("team", team(primary)), + Tag.of("service", service(serviceName)) + ); + } + + private Iterable withResultTags(KafkaUserAndAclCrd primary, String serviceName, String result) { + return List.of( + Tag.of("team", team(primary)), + Tag.of("service", service(serviceName)), + Tag.of("result", result) + ); + } + + private Iterable withRotationTags(KafkaUserAndAclCrd primary, String serviceName, boolean success, String reason) { + return List.of( + Tag.of("team", team(primary)), + Tag.of("service", service(serviceName)), + Tag.of("result", success ? "success" : "failure"), + Tag.of("reason", sanitize(reason)) + ); + } + + private String inspectionResult(KeyStoreService.KeyStoreInspection inspection, Duration threshold) { + if (!inspection.readable()) { + return "unreadable"; + } + if (inspection.notAfter() != null && !inspection.notAfter().isAfter(Instant.now())) { + return "expired"; + } + if (inspection.needsRotation(threshold)) { + return "expiring"; + } + return "healthy"; + } + + private double oldestDaysUntilExpiry(Map gauges) { + return gauges.values().stream() + .mapToDouble(GaugeValues::getDaysUntilExpiry) + .filter(value -> !Double.isNaN(value)) + .min() + .orElse(Double.NaN); + } + + private static String team(KafkaUserAndAclCrd primary) { + String value = primary.getMetadata().getLabels().get("fintlabs.no/team"); + return sanitize(value); + } + + private static String service(String serviceName) { + return sanitize(serviceName); + } + + private static String sanitize(String value) { + return value == null || value.isBlank() ? "unknown" : value; + } + + private record ResourceKey(String namespace, String name, String team, String service) { + } + + private static final class GaugeValues { + private volatile double expiryEpochSeconds = Double.NaN; + private volatile double daysUntilExpiry = Double.NaN; + private volatile double rotationDue = Double.NaN; + + private double getExpiryEpochSeconds() { + return expiryEpochSeconds; + } + + private double getDaysUntilExpiry() { + return daysUntilExpiry; + } + + private double getRotationDue() { + return rotationDue; + } + } +} diff --git a/src/main/java/no/fintlabs/operator/CertificateSecretDependentResource.java b/src/main/java/no/fintlabs/operator/CertificateSecretDependentResource.java index 454dab5..b4cf7d7 100644 --- a/src/main/java/no/fintlabs/operator/CertificateSecretDependentResource.java +++ b/src/main/java/no/fintlabs/operator/CertificateSecretDependentResource.java @@ -9,105 +9,109 @@ import lombok.extern.slf4j.Slf4j; import no.fintlabs.FlaisKubernetesDependentResource; import no.fintlabs.FlaisWorkflow; +import no.fintlabs.aiven.AivenProperties; import no.fintlabs.aiven.AivenService; import no.fintlabs.keystore.KeyStoreService; import no.fintlabs.keystore.TrustStoreService; import org.springframework.stereotype.Component; +import java.time.Instant; +import java.util.Collections; import java.util.HashMap; +import java.util.Map; import java.util.Optional; +import static io.micrometer.core.instrument.Timer.Sample; + @Slf4j @Component public class CertificateSecretDependentResource extends FlaisKubernetesDependentResource { public static final String NAME_SUFFIX = "-kafka-certificates"; + public static final String CERTIFICATE_NOT_AFTER_ANNOTATION = "kafkarator.fintlabs.no/certificate-not-after"; + public static final String LAST_ROTATED_AT_ANNOTATION = "kafkarator.fintlabs.no/last-rotated-at"; private final AivenService aivenService; + private final AivenProperties aivenProperties; + private final CertificateMetricsService certificateMetricsService; private final KeyStoreService keyStoreService; private final TrustStoreService trustStoreService; - public CertificateSecretDependentResource( - FlaisWorkflow workflow, - KubernetesClient kubernetesClient, - KafkaSecretDependentResource kafkaSecretDependentResource, - KafkaUserAndAclDependentResource kafkaUserAndAclDependentResource, - AivenService aivenService, - KeyStoreService keyStoreService, - CertificateSecretDiscriminator discriminator, TrustStoreService trustStoreService) { + public CertificateSecretDependentResource(FlaisWorkflow workflow, KubernetesClient kubernetesClient, KafkaSecretDependentResource kafkaSecretDependentResource, KafkaUserAndAclDependentResource kafkaUserAndAclDependentResource, AivenService aivenService, AivenProperties aivenProperties, CertificateMetricsService certificateMetricsService, KeyStoreService keyStoreService, CertificateSecretDiscriminator discriminator, TrustStoreService trustStoreService) { super(Secret.class, workflow, kubernetesClient); this.aivenService = aivenService; + this.aivenProperties = aivenProperties; + this.certificateMetricsService = certificateMetricsService; this.keyStoreService = keyStoreService; this.trustStoreService = trustStoreService; dependsOn(kafkaSecretDependentResource, kafkaUserAndAclDependentResource); setResourceDiscriminator(discriminator); configureWith(new KubernetesDependentResourceConfig().setLabelSelector("app.kubernetes.io/managed-by=kafkarator")); - - } @Override protected Secret desired(KafkaUserAndAclCrd resource, Context context) { log.debug("Desired certificate secret for {}", resource.getMetadata().getName()); - - KafkaUserAndAcl kafkaUserAndAcl = context.getSecondaryResource(KafkaUserAndAcl.class).orElseThrow(); - Secret kafkaSecret = context.getSecondaryResources(Secret.class) - .stream() - .filter(secret -> secret.getMetadata().getName().equals(KafkaSecretDependentResource.getResourceName(resource)/*resource.getMetadata().getName() + KafkaSecretDependentResource.NAME_SUFFIX*/)) - .findFirst() - .orElseThrow(); - - Optional thisSecret = context.getSecondaryResources(Secret.class) - .stream() - .filter(secret -> secret.getMetadata().getName().equals(getResourceName(resource)/*resource.getMetadata().getName() + NAME_SUFFIX)*/)) - .findFirst(); - - String keyStorePassword = decode(kafkaSecret.getData().get("spring.kafka.ssl.key-store-password")); - String trustStorePassword = decode(kafkaSecret.getData().get("spring.kafka.ssl.trust-store-password")); - - String keyStore = thisSecret - .map(ks -> ks.getData().get("client.keystore.p12")) - .map(ks -> keyStoreService.verifyKeyStore(ks, keyStorePassword)) - .orElseGet(() -> { - log.info("No key store available. Creating a new one!"); - - return keyStoreService.createKeyStoreAndGetAsBase64( - kafkaUserAndAcl.getUser().getAccessCert(), - kafkaUserAndAcl.getUser().getAccessKey(), - aivenService.getCa(), - keyStorePassword.toCharArray() - ); - } - ); - - String trustStore = thisSecret - .map(ts -> ts.getData().get("client.truststore.jks")) - .map(ts -> trustStoreService.verifyTrustStore(ts, trustStorePassword)) - .orElseGet(() -> { - log.info("No trust store available. Creating a new one!"); - - return trustStoreService.createTrustStoreAndGetAsBase64( - aivenService.getCa(), - trustStorePassword.toCharArray() - ); - } - ); - - HashMap labels = new HashMap<>(resource.getMetadata().getLabels()); - labels.put("app.kubernetes.io/managed-by", "kafkarator"); - - - return new SecretBuilder() - .withNewMetadata() - .withName(getResourceName(resource)) - .withNamespace(resource.getMetadata().getNamespace()) - .withLabels(labels) - .endMetadata() - .withType("Opaque") - .addToData("client.keystore.p12", keyStore) - .addToData("client.truststore.jks", trustStore) - .build(); - + Sample timerSample = certificateMetricsService.startReconcile(); + boolean rotationAttempted = false; + + try { + KafkaUserAndAcl kafkaUserAndAcl = context.getSecondaryResource(KafkaUserAndAcl.class).orElseThrow(); + Secret kafkaSecret = context.getSecondaryResources(Secret.class) + .stream() + .filter(secret -> secret.getMetadata().getName().equals(KafkaSecretDependentResource.getResourceName(resource))) + .findFirst() + .orElseThrow(); + + Optional thisSecret = context.getSecondaryResources(Secret.class) + .stream() + .filter(secret -> secret.getMetadata().getName().equals(getResourceName(resource))) + .findFirst(); + + String keyStorePassword = decode(kafkaSecret.getData().get("spring.kafka.ssl.key-store-password")); + String trustStorePassword = decode(kafkaSecret.getData().get("spring.kafka.ssl.trust-store-password")); + + String existingKeyStore = thisSecret.map(secret -> secret.getData().get("client.keystore.p12")).orElse(null); + KeyStoreService.KeyStoreInspection keyStoreInspection = existingKeyStore == null ? KeyStoreService.KeyStoreInspection.invalid("No existing key store") : keyStoreService.inspectKeyStore(existingKeyStore, keyStorePassword); + certificateMetricsService.recordInspection(resource, aivenProperties.getService(), keyStoreInspection, aivenProperties.getCertificateRotationThreshold()); + boolean rotateCredentials = keyStoreInspection.needsRotation(aivenProperties.getCertificateRotationThreshold()); + rotationAttempted = rotateCredentials; + + if (rotateCredentials) { + log.info("Rotating Kafka client certificate for {} because {}", resource.getMetadata().getName(), rotationReason(existingKeyStore, keyStoreInspection)); + } + + String keyStore = rotateCredentials ? keyStoreService.createKeyStoreAndGetAsBase64(kafkaUserAndAcl.getUser().getAccessCert(), kafkaUserAndAcl.getUser().getAccessKey(), aivenService.getCa(), keyStorePassword.toCharArray()) : existingKeyStore; + + String trustStore = rotateCredentials ? trustStoreService.createTrustStoreAndGetAsBase64(aivenService.getCa(), trustStorePassword.toCharArray()) : thisSecret.map(ts -> ts.getData().get("client.truststore.jks")).map(ts -> trustStoreService.verifyTrustStore(ts, trustStorePassword)).orElseGet(() -> { + log.info("No trust store available. Creating a new one!"); + + return trustStoreService.createTrustStoreAndGetAsBase64(aivenService.getCa(), trustStorePassword.toCharArray()); + }); + + HashMap labels = new HashMap<>(resource.getMetadata().getLabels()); + labels.put("app.kubernetes.io/managed-by", "kafkarator"); + Map annotations = new HashMap<>(thisSecret.map(Secret::getMetadata).map(metadata -> Optional.ofNullable(metadata.getAnnotations()).orElse(Collections.emptyMap())).orElse(Collections.emptyMap())); + KeyStoreService.KeyStoreInspection resultingKeyStoreInspection = keyStoreService.inspectKeyStore(keyStore, keyStorePassword); + certificateMetricsService.updateResourceState(resource, aivenProperties.getService(), resultingKeyStoreInspection, aivenProperties.getCertificateRotationThreshold()); + if (resultingKeyStoreInspection.notAfter() != null) { + annotations.put(CERTIFICATE_NOT_AFTER_ANNOTATION, resultingKeyStoreInspection.notAfter().toString()); + } + if (rotateCredentials) { + annotations.put(LAST_ROTATED_AT_ANNOTATION, Instant.now().toString()); + certificateMetricsService.recordRotation(resource, aivenProperties.getService(), resultingKeyStoreInspection.readable(), rotationReason(existingKeyStore, keyStoreInspection)); + } + + + return new SecretBuilder().withNewMetadata().withName(getResourceName(resource)).withNamespace(resource.getMetadata().getNamespace()).withLabels(labels).withAnnotations(annotations).endMetadata().withType("Opaque").addToData("client.keystore.p12", keyStore).addToData("client.truststore.jks", trustStore).build(); + } catch (RuntimeException e) { + if (rotationAttempted) { + certificateMetricsService.recordRotation(resource, aivenProperties.getService(), false, "reconcile-failure"); + } + throw e; + } finally { + certificateMetricsService.recordReconcile(resource, aivenProperties.getService(), timerSample); + } } public static String getResourceName(KafkaUserAndAclCrd resource) { @@ -118,4 +122,14 @@ public static String getResourceName(KafkaUserAndAclCrd resource) { public Matcher.Result match(Secret actualResource, KafkaUserAndAclCrd primary, Context context) { return super.match(actualResource, primary, context); } + + private String rotationReason(String existingKeyStore, KeyStoreService.KeyStoreInspection inspection) { + if (existingKeyStore == null) { + return "no existing key store is present"; + } + if (!inspection.readable()) { + return "the existing key store is unreadable"; + } + return "the certificate expires before the configured rotation threshold"; + } } diff --git a/src/main/java/no/fintlabs/operator/CertificateSecretDiscriminator.java b/src/main/java/no/fintlabs/operator/CertificateSecretDiscriminator.java index 6ef608c..aefa3a1 100644 --- a/src/main/java/no/fintlabs/operator/CertificateSecretDiscriminator.java +++ b/src/main/java/no/fintlabs/operator/CertificateSecretDiscriminator.java @@ -18,7 +18,7 @@ public Optional distinguish(Class resource, KafkaUserAndAclCrd p (InformerEventSource) context .eventSourceRetriever().getResourceEventSourceFor(Secret.class, CertificateSecretDependentResource.class.getSimpleName()); - return ies.get(new ResourceID(CertificateSecretDependentResource.getResourceName(primary)/*primary.getMetadata().getName() + NAME_SUFFIX*/, + return ies.get(new ResourceID(CertificateSecretDependentResource.getResourceName(primary), primary.getMetadata().getNamespace())); } } diff --git a/src/main/java/no/fintlabs/operator/KafkaSecretDiscriminator.java b/src/main/java/no/fintlabs/operator/KafkaSecretDiscriminator.java index de2880a..cb46209 100644 --- a/src/main/java/no/fintlabs/operator/KafkaSecretDiscriminator.java +++ b/src/main/java/no/fintlabs/operator/KafkaSecretDiscriminator.java @@ -9,7 +9,6 @@ import java.util.Optional; - @Component public class KafkaSecretDiscriminator implements ResourceDiscriminator { @Override @@ -19,7 +18,7 @@ public Optional distinguish(Class resource, KafkaUserAndAclCrd p (InformerEventSource) context .eventSourceRetriever().getResourceEventSourceFor(Secret.class, KafkaSecretDependentResource.class.getSimpleName()); - return ies.get(new ResourceID(KafkaSecretDependentResource.getResourceName(primary)/*primary.getMetadata().getName() + NAME_SUFFIX*/, + return ies.get(new ResourceID(KafkaSecretDependentResource.getResourceName(primary), primary.getMetadata().getNamespace())); } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 583151f..642e85b 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -2,6 +2,12 @@ spring: application: name: kafkarator +management: + endpoints: + web: + exposure: + include: health,info,prometheus + logging: level: no.fintlabs: debug @@ -13,3 +19,4 @@ fint: project: fintlabs service: kafka-bootstrap-servers: + certificate-rotation-threshold: 30d diff --git a/src/test/groovy/no/fintlabs/ApplicationContextTest.java b/src/test/groovy/no/fintlabs/ApplicationContextTest.java index cf519d8..cc06daa 100644 --- a/src/test/groovy/no/fintlabs/ApplicationContextTest.java +++ b/src/test/groovy/no/fintlabs/ApplicationContextTest.java @@ -6,12 +6,15 @@ import no.fintlabs.keystore.KeyStoreService; import no.fintlabs.keystore.TrustStoreService; import no.fintlabs.operator.CertificateSecretDependentResource; +import no.fintlabs.operator.CertificateMetricsService; import no.fintlabs.operator.CertificateSecretDiscriminator; import no.fintlabs.operator.KafkaSecretDependentResource; import no.fintlabs.operator.KafkaSecretDiscriminator; import no.fintlabs.operator.KafkaUserAclReconciler; import no.fintlabs.operator.KafkaUserAndAclDependentResource; import no.fintlabs.operator.KafkaUserAndAclWorkflow; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; @@ -54,6 +57,7 @@ void contextLoadsCriticalOperatorBeans() { KafkaUserAndAclDependentResource.class, KafkaSecretDependentResource.class, CertificateSecretDependentResource.class, + CertificateMetricsService.class, KafkaSecretDiscriminator.class, CertificateSecretDiscriminator.class, KeyStoreService.class, @@ -70,5 +74,10 @@ KubernetesClient kubernetesClient() { AivenService aivenService() { return mock(AivenService.class); } + + @Bean + MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } } } diff --git a/src/test/groovy/no/fintlabs/keystore/KeyStoreServiceSpec.groovy b/src/test/groovy/no/fintlabs/keystore/KeyStoreServiceSpec.groovy index 74046e1..668b120 100644 --- a/src/test/groovy/no/fintlabs/keystore/KeyStoreServiceSpec.groovy +++ b/src/test/groovy/no/fintlabs/keystore/KeyStoreServiceSpec.groovy @@ -4,6 +4,7 @@ import org.apache.commons.lang3.RandomStringUtils import spock.lang.Specification import java.security.UnrecoverableKeyException +import java.time.Duration import static no.fintlabs.keystore.TestUtil.isBase64 @@ -168,4 +169,32 @@ class KeyStoreServiceSpec extends Specification { store base64 } + + def "Inspect keystore returns notAfter and flags expired certificates for rotation"() { + given: + def service = new KeyStoreService() + def password = RandomStringUtils.randomAscii(32).toCharArray() + def keyStore = service.storeToBase64(service.createKeyStore(cert, key, ca, password), password) + + when: + def inspection = service.inspectKeyStore(keyStore, new String(password)) + + then: + inspection.readable() + inspection.notAfter() != null + inspection.needsRotation(Duration.ZERO) + } + + def "Inspect keystore returns unreadable when payload is invalid"() { + given: + def service = new KeyStoreService() + + when: + def inspection = service.inspectKeyStore("not-a-keystore", "password") + + then: + !inspection.readable() + inspection.notAfter() == null + inspection.needsRotation(Duration.ofDays(30)) + } } diff --git a/src/test/groovy/no/fintlabs/operator/CertificateSecretDependentResourceSpec.groovy b/src/test/groovy/no/fintlabs/operator/CertificateSecretDependentResourceSpec.groovy index 13274a9..0d37a7b 100644 --- a/src/test/groovy/no/fintlabs/operator/CertificateSecretDependentResourceSpec.groovy +++ b/src/test/groovy/no/fintlabs/operator/CertificateSecretDependentResourceSpec.groovy @@ -4,6 +4,7 @@ import io.fabric8.kubernetes.api.model.Secret import io.fabric8.kubernetes.api.model.SecretBuilder import io.fabric8.kubernetes.client.KubernetesClient import io.javaoperatorsdk.operator.api.reconciler.Context +import io.micrometer.core.instrument.simple.SimpleMeterRegistry import no.fintlabs.aiven.AivenProperties import no.fintlabs.aiven.AivenService import no.fintlabs.aiven.AivenServiceUser @@ -11,9 +12,14 @@ import no.fintlabs.keystore.KeyStoreService import no.fintlabs.keystore.TrustStoreService import spock.lang.Specification +import java.time.Duration +import java.time.Instant + class CertificateSecretDependentResourceSpec extends Specification { private AivenService aivenService + private AivenProperties aivenProperties + private CertificateMetricsService certificateMetricsService private KeyStoreService keyStoreService private TrustStoreService trustStoreService private CertificateSecretDependentResource resource @@ -21,6 +27,8 @@ class CertificateSecretDependentResourceSpec extends Specification { def setup() { aivenService = Mock() + aivenProperties = new AivenProperties(certificateRotationThreshold: Duration.ofDays(30)) + certificateMetricsService = new CertificateMetricsService(new SimpleMeterRegistry()) keyStoreService = Mock() trustStoreService = Mock() def workflow = new KafkaUserAndAclWorkflow() @@ -42,6 +50,8 @@ class CertificateSecretDependentResourceSpec extends Specification { kafkaSecretDependentResource, kafkaUserAndAclDependentResource, aivenService, + aivenProperties, + certificateMetricsService, keyStoreService, new CertificateSecretDiscriminator(), trustStoreService @@ -68,6 +78,8 @@ class CertificateSecretDependentResourceSpec extends Specification { then: 2 * aivenService.getCa() >> "ca-cert" + 1 * keyStoreService.inspectKeyStore("generated-key-store", "key-pass") >> + KeyStoreService.KeyStoreInspection.valid(Instant.parse("2028-01-01T00:00:00Z")) 1 * keyStoreService.createKeyStoreAndGetAsBase64("client-cert", "client-key", "ca-cert", { new String(it) == "key-pass" }) >> "generated-key-store" @@ -78,6 +90,8 @@ class CertificateSecretDependentResourceSpec extends Specification { secret.metadata.labels["app.kubernetes.io/managed-by"] == "kafkarator" secret.data["client.keystore.p12"] == "generated-key-store" secret.data["client.truststore.jks"] == "generated-trust-store" + secret.metadata.annotations[CertificateSecretDependentResource.CERTIFICATE_NOT_AFTER_ANNOTATION] == "2028-01-01T00:00:00Z" + secret.metadata.annotations[CertificateSecretDependentResource.LAST_ROTATED_AT_ANNOTATION] != null } def "desired reuses existing stores when verification succeeds"() { @@ -106,7 +120,8 @@ class CertificateSecretDependentResourceSpec extends Specification { def secret = resource.desired(primary, context) then: - 1 * keyStoreService.verifyKeyStore("existing-key-store", "key-pass") >> "existing-key-store" + 2 * keyStoreService.inspectKeyStore("existing-key-store", "key-pass") >> + KeyStoreService.KeyStoreInspection.valid(Instant.now().plus(Duration.ofDays(90))) 1 * trustStoreService.verifyTrustStore("existing-trust-store", "trust-pass") >> "existing-trust-store" 0 * keyStoreService.createKeyStoreAndGetAsBase64(_, _, _, _) 0 * trustStoreService.createTrustStoreAndGetAsBase64(_, _) @@ -114,7 +129,7 @@ class CertificateSecretDependentResourceSpec extends Specification { secret.data["client.truststore.jks"] == "existing-trust-store" } - def "desired regenerates stores when existing data fails verification"() { + def "desired regenerates stores when existing key store is unreadable"() { given: def primary = primaryResource() def kafkaUserAndAcl = KafkaUserAndAcl.builder() @@ -140,8 +155,10 @@ class CertificateSecretDependentResourceSpec extends Specification { def secret = resource.desired(primary, context) then: - 1 * keyStoreService.verifyKeyStore("broken-key-store", "key-pass") >> null - 1 * trustStoreService.verifyTrustStore("broken-trust-store", "trust-pass") >> null + 1 * keyStoreService.inspectKeyStore("broken-key-store", "key-pass") >> + KeyStoreService.KeyStoreInspection.invalid("bad key store") + 1 * keyStoreService.inspectKeyStore("regenerated-key-store", "key-pass") >> + KeyStoreService.KeyStoreInspection.valid(Instant.parse("2028-02-01T00:00:00Z")) 2 * aivenService.getCa() >> "ca-cert" 1 * keyStoreService.createKeyStoreAndGetAsBase64("client-cert", "client-key", "ca-cert", { new String(it) == "key-pass" @@ -153,6 +170,49 @@ class CertificateSecretDependentResourceSpec extends Specification { secret.data["client.truststore.jks"] == "regenerated-trust-store" } + def "desired regenerates stores when existing certificate expires within threshold"() { + given: + def primary = primaryResource() + def kafkaUserAndAcl = KafkaUserAndAcl.builder() + .user(AivenServiceUser.builder() + .username("resolved-user") + .accessCert("client-cert") + .accessKey("client-key") + .build()) + .build() + def kafkaSecret = kafkaSecret(primary, "key-pass", "trust-pass") + def existingSecret = new SecretBuilder() + .withNewMetadata() + .withName("sample-user-kafka-certificates") + .withNamespace("default") + .endMetadata() + .addToData("client.keystore.p12", "soon-expiring-key-store") + .addToData("client.truststore.jks", "existing-trust-store") + .build() + context.getSecondaryResource(KafkaUserAndAcl.class) >> Optional.of(kafkaUserAndAcl) + context.getSecondaryResources(Secret.class) >> ([kafkaSecret, existingSecret] as Set) + + when: + def secret = resource.desired(primary, context) + + then: + 1 * keyStoreService.inspectKeyStore("soon-expiring-key-store", "key-pass") >> + KeyStoreService.KeyStoreInspection.valid(Instant.now().plus(Duration.ofDays(7))) + 1 * keyStoreService.inspectKeyStore("rotated-key-store", "key-pass") >> + KeyStoreService.KeyStoreInspection.valid(Instant.parse("2028-03-01T00:00:00Z")) + 2 * aivenService.getCa() >> "ca-cert" + 1 * keyStoreService.createKeyStoreAndGetAsBase64("client-cert", "client-key", "ca-cert", { + new String(it) == "key-pass" + }) >> "rotated-key-store" + 1 * trustStoreService.createTrustStoreAndGetAsBase64("ca-cert", { + new String(it) == "trust-pass" + }) >> "rotated-trust-store" + 0 * trustStoreService.verifyTrustStore(_, _) + secret.data["client.keystore.p12"] == "rotated-key-store" + secret.data["client.truststore.jks"] == "rotated-trust-store" + secret.metadata.annotations[CertificateSecretDependentResource.CERTIFICATE_NOT_AFTER_ANNOTATION] == "2028-03-01T00:00:00Z" + } + private static KafkaUserAndAclCrd primaryResource() { def primary = new KafkaUserAndAclCrd() primary.metadata.name = "sample-user"