diff --git a/pom.xml b/pom.xml
index a747dc8..20b332e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,15 +97,7 @@
spring-boot-starter-aop
-
+
org.springdoc
springdoc-openapi-starter-webmvc-ui
@@ -153,6 +145,15 @@
org.springframework.security
spring-security-oauth2-jose
+
+ org.springframework.kafka
+ spring-kafka
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
diff --git a/src/main/java/com/classes/config/Audit.java b/src/main/java/com/classes/config/audit/Audit.java
similarity index 82%
rename from src/main/java/com/classes/config/Audit.java
rename to src/main/java/com/classes/config/audit/Audit.java
index 1f93dec..cb2093a 100644
--- a/src/main/java/com/classes/config/Audit.java
+++ b/src/main/java/com/classes/config/audit/Audit.java
@@ -1,16 +1,13 @@
-package com.classes.config;
+package com.classes.config.audit;
import jakarta.persistence.Column;
import jakarta.persistence.Embeddable;
-import jakarta.persistence.PrePersist;
-import jakarta.persistence.PreUpdate;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
-import java.time.LocalDateTime;
@Embeddable
@Data
diff --git a/src/main/java/com/classes/config/AuditListener.java b/src/main/java/com/classes/config/audit/AuditListener.java
similarity index 98%
rename from src/main/java/com/classes/config/AuditListener.java
rename to src/main/java/com/classes/config/audit/AuditListener.java
index 75426eb..84ba125 100644
--- a/src/main/java/com/classes/config/AuditListener.java
+++ b/src/main/java/com/classes/config/audit/AuditListener.java
@@ -1,4 +1,4 @@
-package com.classes.config;
+package com.classes.config.audit;
import jakarta.persistence.PrePersist;
import jakarta.persistence.PreUpdate;
diff --git a/src/main/java/com/classes/config/kafka/KafkaConsumerConfig.java b/src/main/java/com/classes/config/kafka/KafkaConsumerConfig.java
new file mode 100644
index 0000000..ffc1d0d
--- /dev/null
+++ b/src/main/java/com/classes/config/kafka/KafkaConsumerConfig.java
@@ -0,0 +1,76 @@
+package com.classes.config.kafka;
+
+import com.classes.exceptions.NotRetryableException;
+import com.classes.exceptions.RetryableException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.*;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.backoff.FixedBackOff;
+
+@EnableKafka
+@Configuration
+@Slf4j
+public class KafkaConsumerConfig {
+ @Value("${spring.kafka.consumer.bootstrap-servers}")
+ private String bootstrapServers;
+ @Value("${spring.kafka.consumer.group-id}")
+ private String groupId;
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ Map config = new HashMap<>();
+ config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
+ config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
+ config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
+ config.put(JsonDeserializer.TYPE_MAPPINGS, "TrainerCreatedEvent:com.classes.events.TrainerCreatedEvent");
+ config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ return new DefaultKafkaConsumerFactory<>(config);
+ }
+
+ @Bean
+ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
+ ConsumerFactory consumerFactory, KafkaTemplate kafkaTemplate) {
+
+ DefaultErrorHandler errorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate),
+ new FixedBackOff(500, 3));
+ errorHandler.addNotRetryableExceptions(NotRetryableException.class);
+ errorHandler.addRetryableExceptions(RetryableException.class);
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory);
+ factory.setCommonErrorHandler(errorHandler);
+ return factory;
+ }
+
+ @Bean
+ KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {
+ return new KafkaTemplate<>(producerFactory);
+ }
+
+ @Bean
+ ProducerFactory producerFactory() {
+ Map config = new HashMap<>();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return new DefaultKafkaProducerFactory<>(config);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/classes/entities/ClassEntity.java b/src/main/java/com/classes/entities/ClassEntity.java
index bdca8b6..e1edbcd 100644
--- a/src/main/java/com/classes/entities/ClassEntity.java
+++ b/src/main/java/com/classes/entities/ClassEntity.java
@@ -1,11 +1,10 @@
package com.classes.entities;
-import com.classes.config.Audit;
-import com.classes.config.AuditListener;
+import com.classes.config.audit.Audit;
+import com.classes.config.audit.AuditListener;
import jakarta.persistence.*;
import lombok.*;
-import java.time.LocalDate;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
diff --git a/src/main/java/com/classes/entities/LocationEntity.java b/src/main/java/com/classes/entities/LocationEntity.java
index 0d56a84..1ffc0e0 100644
--- a/src/main/java/com/classes/entities/LocationEntity.java
+++ b/src/main/java/com/classes/entities/LocationEntity.java
@@ -1,7 +1,7 @@
package com.classes.entities;
-import com.classes.config.Audit;
-import com.classes.config.AuditListener;
+import com.classes.config.audit.Audit;
+import com.classes.config.audit.AuditListener;
import jakarta.persistence.*;
import lombok.*;
diff --git a/src/main/java/com/classes/entities/TrainerEntity.java b/src/main/java/com/classes/entities/TrainerEntity.java
index 60e8c2d..1c455f8 100644
--- a/src/main/java/com/classes/entities/TrainerEntity.java
+++ b/src/main/java/com/classes/entities/TrainerEntity.java
@@ -4,8 +4,8 @@
import com.classes.enums.DayAvailability;
import com.classes.enums.Gender;
import com.classes.enums.TrainerStatus;
-import com.classes.config.Audit;
-import com.classes.config.AuditListener;
+import com.classes.config.audit.Audit;
+import com.classes.config.audit.AuditListener;
import jakarta.persistence.*;
import lombok.*;
@@ -24,10 +24,7 @@
@EntityListeners(AuditListener.class)
public class TrainerEntity {
@Id
- @GeneratedValue(strategy = GenerationType.UUID)
- private UUID id;
private UUID userid;
-
private String firstName;
private String lastName;
private String dni;
diff --git a/src/main/java/com/classes/events/TrainerCreatedEvent.java b/src/main/java/com/classes/events/TrainerCreatedEvent.java
new file mode 100644
index 0000000..011da9f
--- /dev/null
+++ b/src/main/java/com/classes/events/TrainerCreatedEvent.java
@@ -0,0 +1,40 @@
+package com.classes.events;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.*;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.UUID;
+
+@Builder
+public record TrainerCreatedEvent(
+ UUID userId,
+ String email,
+ String firstName,
+ String lastName,
+ String dni,
+ String phone,
+
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ", timezone = "UTC")
+ Instant occurredAt,
+
+ String eventId,
+ String eventType
+) implements Serializable {
+
+ public static TrainerCreatedEvent create(UUID userId, String email, String firstName,
+ String lastName, String dni, String phone) {
+ return TrainerCreatedEvent.builder()
+ .userId(userId)
+ .email(email)
+ .firstName(firstName)
+ .lastName(lastName)
+ .dni(dni)
+ .phone(phone)
+ .occurredAt(Instant.now())
+ .eventId(UUID.randomUUID().toString())
+ .eventType("TRAINER_CREATED")
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/classes/exceptions/NotRetryableException.java b/src/main/java/com/classes/exceptions/NotRetryableException.java
new file mode 100644
index 0000000..33a7bdf
--- /dev/null
+++ b/src/main/java/com/classes/exceptions/NotRetryableException.java
@@ -0,0 +1,12 @@
+package com.classes.exceptions;
+
+
+public class NotRetryableException extends RuntimeException {
+ public NotRetryableException(String message) {
+ super(message);
+ }
+
+ public NotRetryableException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/classes/exceptions/RetryableException.java b/src/main/java/com/classes/exceptions/RetryableException.java
new file mode 100644
index 0000000..65f8b6f
--- /dev/null
+++ b/src/main/java/com/classes/exceptions/RetryableException.java
@@ -0,0 +1,7 @@
+package com.classes.exceptions;
+
+public class RetryableException extends RuntimeException {
+ public RetryableException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/classes/listeners/TrainerEventHandler.java b/src/main/java/com/classes/listeners/TrainerEventHandler.java
new file mode 100644
index 0000000..2677808
--- /dev/null
+++ b/src/main/java/com/classes/listeners/TrainerEventHandler.java
@@ -0,0 +1,70 @@
+package com.classes.listeners;
+
+import com.classes.entities.TrainerEntity;
+import com.classes.enums.TrainerStatus;
+import com.classes.events.TrainerCreatedEvent;
+import com.classes.repositories.TrainerRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.LocalDate;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class TrainerEventHandler {
+ private final TrainerRepository trainerRepository;
+
+ @KafkaListener(
+ topics = "trainer-events",
+ groupId = "msvc-classes-group",
+ containerFactory = "kafkaListenerContainerFactory"
+ )
+ @Transactional
+ public void consumeTrainerCreatedEvent(
+ ConsumerRecord record,
+ Acknowledgment acknowledgment) {
+
+ TrainerCreatedEvent event = record.value();
+ log.info("Recibido evento TRAINER_CREATED: userId={}, eventId={}",
+ event.userId(), event.eventId());
+
+ try {
+ if (trainerRepository.existsById(event.userId())) {
+ log.warn("Trainer con userId={} ya existe. Evento duplicado ignorado.", event.userId());
+ acknowledgment.acknowledge();
+ return;
+ }
+
+ TrainerEntity trainer = TrainerEntity.builder()
+ .id(event.userId())
+ .firstName(event.firstName())
+ .lastName(event.lastName())
+ .dni(event.dni())
+ .email(event.email())
+ .phone(event.phone())
+ .hireDate(LocalDate.now())
+ .status(TrainerStatus.ACTIVO)
+ .yearsOfExperience(0)
+ .build();
+
+ trainerRepository.save(trainer);
+ log.info("Trainer creado exitosamente con ID: {}", trainer.getId());
+
+ // Confirmar procesamiento
+ acknowledgment.acknowledge();
+
+ } catch (
+ Exception e) {
+ log.error("Error procesando evento TRAINER_CREATED para userId={}", event.userId(), e);
+ // No hacer acknowledge para que Kafka reintente
+ // O enviar a DLQ (Dead Letter Queue)
+ throw e;
+ }
+ }
+}
diff --git a/src/main/java/com/classes/repositories/TrainerRepository.java b/src/main/java/com/classes/repositories/TrainerRepository.java
index 0da0629..eb584a7 100644
--- a/src/main/java/com/classes/repositories/TrainerRepository.java
+++ b/src/main/java/com/classes/repositories/TrainerRepository.java
@@ -13,12 +13,10 @@
@Repository
public interface TrainerRepository extends JpaRepository {
-
-
+ Optional findByEmail(String email);
+ Optional findByDni(String dni);
+ boolean existsByEmail(String email);
Page findByFirstNameContainingIgnoreCaseOrLastNameContainingIgnoreCase(
String firstName, String lastName, Pageable pageable);
-
-
Page findByStatus(TrainerStatus status, Pageable pageable);
-
}