A comprehensive, production-ready example of integrating Spring Boot 3.x with Confluent Cloud. This repository demonstrates how to manage multiple Kafka topics using different serialization formats (standard JSON/String and Avro), secure connections using SASL_SSL, and handle robust dependency management.
This project serves as a robust template for developing microservices that interact with Confluent Cloud. It tackles common challenges developers face when working with a multi-protocol (JSON/Avro) environment, providing clear, tested configurations for producers, consumers, and even Kafka Streams.
Key Features:
- Dual Serialization: Configuration for both standard
String/Integerserialization andAvroserialization via the Confluent Schema Registry. - Secure Connectivity: Uses SASL_SSL (PLAIN mechanism) for authentication with Confluent Cloud Kafka brokers and the Schema Registry.
- Modern Spring Boot: Built with Spring Boot 3.5.x and Java 17+, leveraging Jakarta EE APIs.
- Reactive Producers: Uses Project Reactor (
Flux) for non-blocking message generation at startup. - Lombok & DI: Utilizes
@RequiredArgsConstructorfor clean dependency injection and@Qualifierfor managing multiple Kafka template beans.
To run this application, you will need:
- Java 17+ installed.
- Gradle (managed by the wrapper
./gradlew). - A Confluent Cloud Account with a Kafka Cluster and a Schema Registry instance running.
- API Keys for both your Kafka Cluster and your Schema Registry.
All configuration is managed within the KafkaConfig.java class using hardcoded values for clarity in this example.
-
Clone the repository: bash
git clone github.com cd Confluent-Kafka-SpringbootUse code with caution.
-
**Update
src/main/java/com/hibersoft/springkafka/KafkaConfig.java:**Replace the placeholder values with your actual Confluent Cloud credentials: java// --- HARDCODED CONFIGURATION VALUES --- private final String BOOTSTRAP_SERVERS = "pkc-xxxx.us-east-1.gcp.confluent.cloud:9092"; private final String API_KEY = "YOUR_KAFKA_CLUSTER_API_KEY"; // Cluster Key private final String API_SECRET = "YOUR_KAFKA_CLUSTER_API_SECRET"; // Cluster Secret // ... private final String SCHEMA_REGISTRY_URL = "https://psrc-xxxx.us-east-1.confluent.cloud"; private final String SCHEMA_REGISTRY_API_KEY = "YOUR_SR_API_KEY"; // Schema Registry Key private final String SCHEMA_REGISTRY_API_SECRET = "YOUR_SR_API_SECRET"; // Schema Registry Secret // ...
Use the Gradle bootRun task to compile, build, and run the Spring Boot application:
bash
./gradlew bootRun
Use code with caution.
Expected Behavior:
- Startup: The application will quiet down verbose Spring logs (configured via
application.properties) and print application-specific logs. - Production: The
Producerservice uses reactive streams (Flux.interval) to asynchronously send messages every 10 seconds to two topics:hobbit(String/Integer serialization)hobbit-avro(Avro serialization)
- Consumption: The
Consumerservice is running listeners configured to handle the specific schemas for both topics correctly. - Logs: You will see continuous logs confirming messages being produced and consumed.
| File/Class | Purpose | Key Annotations/Concepts |
|---|---|---|
KafkaConfig.java |
Centralized bean definitions for all Kafka components. | @Configuration, @Bean, @Qualifier |
Producer.java |
Handles outbound messages. Uses reactive streams for continuous production. | @Service, @RequiredArgsConstructor, @EventListener |
Consumer.java |
Handles inbound messages from topics. | @Service, @KafkaListener, @Qualifier |
build.gradle |
Manages dependencies, including crucial Confluent 7.5.0 compatibility. | io.confluent:kafka-avro-serializer:7.5.0 |
application.properties |
Manages core Spring behavior and logging levels (logging.level.root=WARN, spring.main.lazy-initialization=false). |
Properties management |
This project was built iteratively, resolving several complex issues encountered in the wild:
- Dependency Hell: A major challenge was aligning older Confluent 6.x libraries (using
javaxAPIs) with modern Spring Boot 3.x (usingjakartaAPIs). The fix was upgrading Confluent libraries to version 7.5.0+. - Authentication 401s: We resolved connection errors by ensuring dedicated API keys were used for the Kafka Cluster AND the Schema Registry separately.
- Debugging Silent Errors: We found that silent failures in configuration required forcing eager bean initialization (
spring.main.lazy-initialization=false) and reducing overall logging verbosity (logging.level.root=WARN).
Feel free to open issues, submit PRs, or fork this repository for your own learning purposes.