Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
Expand Down Expand Up @@ -57,6 +58,7 @@ ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFa
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/classes/entities/ClassEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import jakarta.persistence.*;
import lombok.*;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/classes/entities/TrainerEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
@EntityListeners(AuditListener.class)
public class TrainerEntity {
@Id
private UUID userid;
// private UUID userid;
private UUID id;
private String firstName;
private String lastName;
private String dni;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/classes/events/TrainerCreatedEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

@Builder
public record TrainerCreatedEvent(
UUID userId,
UUID id,
String email,
String firstName,
String lastName,
Expand All @@ -26,7 +26,7 @@ public record TrainerCreatedEvent(
public static TrainerCreatedEvent create(UUID userId, String email, String firstName,
String lastName, String dni, String phone) {
return TrainerCreatedEvent.builder()
.userId(userId)
.id(userId)
.email(email)
.firstName(firstName)
.lastName(lastName)
Expand Down
20 changes: 7 additions & 13 deletions src/main/java/com/classes/listeners/TrainerEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,25 @@
public class TrainerEventHandler {
private final TrainerRepository trainerRepository;

@KafkaListener(
topics = "trainer-events",
groupId = "msvc-classes-group",
containerFactory = "kafkaListenerContainerFactory"
)
@KafkaListener(topics = "trainer-created-event-topic")
@Transactional
public void consumeTrainerCreatedEvent(
ConsumerRecord<String, TrainerCreatedEvent> record,
Acknowledgment acknowledgment) {

TrainerCreatedEvent event = record.value();
log.info("Recibido evento TRAINER_CREATED: userId={}, eventId={}",
event.userId(), event.eventId());
event.id(), event.eventId());

try {
if (trainerRepository.existsById(event.userId())) {
log.warn("Trainer con userId={} ya existe. Evento duplicado ignorado.", event.userId());
if (trainerRepository.existsById(event.id())) {
log.warn("Trainer con userId={} ya existe. Evento duplicado ignorado.", event.id());
acknowledgment.acknowledge();
return;
}

TrainerEntity trainer = TrainerEntity.builder()
.id(event.userId())
.id(event.id())
.firstName(event.firstName())
.lastName(event.lastName())
.dni(event.dni())
Expand All @@ -56,14 +52,12 @@ public void consumeTrainerCreatedEvent(
trainerRepository.save(trainer);
log.info("Trainer creado exitosamente con ID: {}", trainer.getId());

// Confirmar procesamiento
acknowledgment.acknowledge();

} catch (
Exception e) {
log.error("Error procesando evento TRAINER_CREATED para userId={}", event.userId(), e);
// No hacer acknowledge para que Kafka reintente
// O enviar a DLQ (Dead Letter Queue)
log.error("Error procesando evento TRAINER_CREATED para userId={}", event.id(), e);

throw e;
}
}
Expand Down