diff --git a/src/main/java/com/classes/config/kafka/KafkaConsumerConfig.java b/src/main/java/com/classes/config/kafka/KafkaConsumerConfig.java index ffc1d0d..4f5914c 100644 --- a/src/main/java/com/classes/config/kafka/KafkaConsumerConfig.java +++ b/src/main/java/com/classes/config/kafka/KafkaConsumerConfig.java @@ -17,6 +17,7 @@ import java.util.Map; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; @@ -57,6 +58,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFa ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setCommonErrorHandler(errorHandler); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; } diff --git a/src/main/java/com/classes/entities/ClassEntity.java b/src/main/java/com/classes/entities/ClassEntity.java index e1edbcd..a72e817 100644 --- a/src/main/java/com/classes/entities/ClassEntity.java +++ b/src/main/java/com/classes/entities/ClassEntity.java @@ -5,6 +5,7 @@ import jakarta.persistence.*; import lombok.*; +import java.time.LocalDate; import java.time.LocalTime; import java.util.ArrayList; import java.util.List; diff --git a/src/main/java/com/classes/entities/TrainerEntity.java b/src/main/java/com/classes/entities/TrainerEntity.java index 1c455f8..3e47d02 100644 --- a/src/main/java/com/classes/entities/TrainerEntity.java +++ b/src/main/java/com/classes/entities/TrainerEntity.java @@ -24,7 +24,8 @@ @EntityListeners(AuditListener.class) public class TrainerEntity { @Id - private UUID userid; +// private UUID userid; + private UUID id; private String firstName; private String lastName; private String dni; diff --git a/src/main/java/com/classes/events/TrainerCreatedEvent.java b/src/main/java/com/classes/events/TrainerCreatedEvent.java index 011da9f..3394635 100644 --- a/src/main/java/com/classes/events/TrainerCreatedEvent.java +++ b/src/main/java/com/classes/events/TrainerCreatedEvent.java @@ -9,7 +9,7 @@ @Builder public record TrainerCreatedEvent( - UUID userId, + UUID id, String email, String firstName, String lastName, @@ -26,7 +26,7 @@ public record TrainerCreatedEvent( public static TrainerCreatedEvent create(UUID userId, String email, String firstName, String lastName, String dni, String phone) { return TrainerCreatedEvent.builder() - .userId(userId) + .id(userId) .email(email) .firstName(firstName) .lastName(lastName) diff --git a/src/main/java/com/classes/listeners/TrainerEventHandler.java b/src/main/java/com/classes/listeners/TrainerEventHandler.java index 2677808..ed62fcc 100644 --- a/src/main/java/com/classes/listeners/TrainerEventHandler.java +++ b/src/main/java/com/classes/listeners/TrainerEventHandler.java @@ -20,11 +20,7 @@ public class TrainerEventHandler { private final TrainerRepository trainerRepository; - @KafkaListener( - topics = "trainer-events", - groupId = "msvc-classes-group", - containerFactory = "kafkaListenerContainerFactory" - ) + @KafkaListener(topics = "trainer-created-event-topic") @Transactional public void consumeTrainerCreatedEvent( ConsumerRecord record, @@ -32,17 +28,17 @@ public void consumeTrainerCreatedEvent( TrainerCreatedEvent event = record.value(); log.info("Recibido evento TRAINER_CREATED: userId={}, eventId={}", - event.userId(), event.eventId()); + event.id(), event.eventId()); try { - if (trainerRepository.existsById(event.userId())) { - log.warn("Trainer con userId={} ya existe. Evento duplicado ignorado.", event.userId()); + if (trainerRepository.existsById(event.id())) { + log.warn("Trainer con userId={} ya existe. Evento duplicado ignorado.", event.id()); acknowledgment.acknowledge(); return; } TrainerEntity trainer = TrainerEntity.builder() - .id(event.userId()) + .id(event.id()) .firstName(event.firstName()) .lastName(event.lastName()) .dni(event.dni()) @@ -56,14 +52,12 @@ public void consumeTrainerCreatedEvent( trainerRepository.save(trainer); log.info("Trainer creado exitosamente con ID: {}", trainer.getId()); - // Confirmar procesamiento acknowledgment.acknowledge(); } catch ( Exception e) { - log.error("Error procesando evento TRAINER_CREATED para userId={}", event.userId(), e); - // No hacer acknowledge para que Kafka reintente - // O enviar a DLQ (Dead Letter Queue) + log.error("Error procesando evento TRAINER_CREATED para userId={}", event.id(), e); + throw e; } }