Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,7 @@
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

<!-- <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency> -->

<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
Expand Down Expand Up @@ -153,6 +145,15 @@
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-oauth2-jose</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<!-- Importar BOM de Spring Cloud Azure y Spring Cloud -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package com.classes.config;
package com.classes.config.audit;

import jakarta.persistence.Column;
import jakarta.persistence.Embeddable;
import jakarta.persistence.PrePersist;
import jakarta.persistence.PreUpdate;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.Instant;
import java.time.LocalDateTime;

@Embeddable
@Data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.classes.config;
package com.classes.config.audit;

import jakarta.persistence.PrePersist;
import jakarta.persistence.PreUpdate;
Expand Down
76 changes: 76 additions & 0 deletions src/main/java/com/classes/config/kafka/KafkaConsumerConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.classes.config.kafka;

import com.classes.exceptions.NotRetryableException;
import com.classes.exceptions.RetryableException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.HashMap;
import java.util.Map;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.backoff.FixedBackOff;

@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(JsonDeserializer.TYPE_MAPPINGS, "TrainerCreatedEvent:com.classes.events.TrainerCreatedEvent");
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory, KafkaTemplate<String, Object> kafkaTemplate) {

DefaultErrorHandler errorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate),
new FixedBackOff(500, 3));
errorHandler.addNotRetryableExceptions(NotRetryableException.class);
errorHandler.addRetryableExceptions(RetryableException.class);
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
return factory;
}

@Bean
KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

@Bean
ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
}
5 changes: 2 additions & 3 deletions src/main/java/com/classes/entities/ClassEntity.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.classes.entities;

import com.classes.config.Audit;
import com.classes.config.AuditListener;
import com.classes.config.audit.Audit;
import com.classes.config.audit.AuditListener;
import jakarta.persistence.*;
import lombok.*;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/classes/entities/LocationEntity.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.classes.entities;

import com.classes.config.Audit;
import com.classes.config.AuditListener;
import com.classes.config.audit.Audit;
import com.classes.config.audit.AuditListener;
import jakarta.persistence.*;
import lombok.*;

Expand Down
7 changes: 2 additions & 5 deletions src/main/java/com/classes/entities/TrainerEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.classes.enums.DayAvailability;
import com.classes.enums.Gender;
import com.classes.enums.TrainerStatus;
import com.classes.config.Audit;
import com.classes.config.AuditListener;
import com.classes.config.audit.Audit;
import com.classes.config.audit.AuditListener;
import jakarta.persistence.*;
import lombok.*;

Expand All @@ -24,10 +24,7 @@
@EntityListeners(AuditListener.class)
public class TrainerEntity {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
private UUID userid;

private String firstName;
private String lastName;
private String dni;
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/com/classes/events/TrainerCreatedEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.classes.events;

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;

import java.io.Serializable;
import java.time.Instant;
import java.util.UUID;

@Builder
public record TrainerCreatedEvent(
UUID userId,
String email,
String firstName,
String lastName,
String dni,
String phone,

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ", timezone = "UTC")
Instant occurredAt,

String eventId,
String eventType
) implements Serializable {

public static TrainerCreatedEvent create(UUID userId, String email, String firstName,
String lastName, String dni, String phone) {
return TrainerCreatedEvent.builder()
.userId(userId)
.email(email)
.firstName(firstName)
.lastName(lastName)
.dni(dni)
.phone(phone)
.occurredAt(Instant.now())
.eventId(UUID.randomUUID().toString())
.eventType("TRAINER_CREATED")
.build();
}
}
12 changes: 12 additions & 0 deletions src/main/java/com/classes/exceptions/NotRetryableException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.classes.exceptions;


public class NotRetryableException extends RuntimeException {
public NotRetryableException(String message) {
super(message);
}

public NotRetryableException(Throwable cause) {
super(cause);
}
}
7 changes: 7 additions & 0 deletions src/main/java/com/classes/exceptions/RetryableException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.classes.exceptions;

public class RetryableException extends RuntimeException {
public RetryableException(String message) {
super(message);
}
}
70 changes: 70 additions & 0 deletions src/main/java/com/classes/listeners/TrainerEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.classes.listeners;

import com.classes.entities.TrainerEntity;
import com.classes.enums.TrainerStatus;
import com.classes.events.TrainerCreatedEvent;
import com.classes.repositories.TrainerRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDate;

@Slf4j
@Component
@RequiredArgsConstructor
public class TrainerEventHandler {
private final TrainerRepository trainerRepository;

@KafkaListener(
topics = "trainer-events",
groupId = "msvc-classes-group",
containerFactory = "kafkaListenerContainerFactory"
)
@Transactional
public void consumeTrainerCreatedEvent(
ConsumerRecord<String, TrainerCreatedEvent> record,
Acknowledgment acknowledgment) {

TrainerCreatedEvent event = record.value();
log.info("Recibido evento TRAINER_CREATED: userId={}, eventId={}",
event.userId(), event.eventId());

try {
if (trainerRepository.existsById(event.userId())) {
log.warn("Trainer con userId={} ya existe. Evento duplicado ignorado.", event.userId());
acknowledgment.acknowledge();
return;
}

TrainerEntity trainer = TrainerEntity.builder()
.id(event.userId())
.firstName(event.firstName())
.lastName(event.lastName())
.dni(event.dni())
.email(event.email())
.phone(event.phone())
.hireDate(LocalDate.now())
.status(TrainerStatus.ACTIVO)
.yearsOfExperience(0)
.build();

trainerRepository.save(trainer);
log.info("Trainer creado exitosamente con ID: {}", trainer.getId());

// Confirmar procesamiento
acknowledgment.acknowledge();

} catch (
Exception e) {
log.error("Error procesando evento TRAINER_CREATED para userId={}", event.userId(), e);
// No hacer acknowledge para que Kafka reintente
// O enviar a DLQ (Dead Letter Queue)
throw e;
}
}
}
8 changes: 3 additions & 5 deletions src/main/java/com/classes/repositories/TrainerRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@

@Repository
public interface TrainerRepository extends JpaRepository<TrainerEntity, UUID> {


Optional<TrainerEntity> findByEmail(String email);
Optional<TrainerEntity> findByDni(String dni);
boolean existsByEmail(String email);
Page<TrainerEntity> findByFirstNameContainingIgnoreCaseOrLastNameContainingIgnoreCase(
String firstName, String lastName, Pageable pageable);


Page<TrainerEntity> findByStatus(TrainerStatus status, Pageable pageable);

}
Loading