diff --git a/pom.xml b/pom.xml
index 2aacd74..3568b7f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,24 @@
${mapstruct.version}
provided
+
+ org.springframework.boot
+ spring-boot-starter-security
+
+
+ org.springframework.boot
+ spring-boot-starter-oauth2-resource-server
+
+
+
+ org.springframework.security
+ spring-security-oauth2-jose
+
+
+ org.springframework.security
+ spring-security-test
+ test
+
diff --git a/src/main/java/com/msvcchat/config/CorsConfig.java b/src/main/java/com/msvcchat/config/CorsConfig.java
deleted file mode 100644
index c0e3edb..0000000
--- a/src/main/java/com/msvcchat/config/CorsConfig.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.msvcchat.config;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.cors.CorsConfiguration;
-import org.springframework.web.cors.reactive.CorsWebFilter;
-import org.springframework.web.cors.reactive.UrlBasedCorsConfigurationSource;
-
-@Configuration
-public class CorsConfig {
- @Bean
- public CorsWebFilter corsWebFilter() {
- CorsConfiguration configuration = new CorsConfiguration();
-
- // Especifica los orígenes permitidos
- configuration.addAllowedOrigin("http://localhost:5173"); // Cambia esto según tu frontend
- configuration.addAllowedMethod("*");
- configuration.addAllowedHeader("*");
- configuration.setAllowCredentials(true);
-
- UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
- source.registerCorsConfiguration("/**", configuration);
- return new CorsWebFilter(source);
- }
-}
diff --git a/src/main/java/com/msvcchat/config/ReactiveMapperConfig.java b/src/main/java/com/msvcchat/config/ReactiveMapperConfig.java
index fee16c1..40d312a 100644
--- a/src/main/java/com/msvcchat/config/ReactiveMapperConfig.java
+++ b/src/main/java/com/msvcchat/config/ReactiveMapperConfig.java
@@ -4,9 +4,6 @@
import org.mapstruct.ReportingPolicy;
import org.mapstruct.NullValueCheckStrategy;
import org.mapstruct.NullValuePropertyMappingStrategy;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import java.util.function.Function;
@MapperConfig(
componentModel = "spring",
diff --git a/src/main/java/com/msvcchat/config/SwaggerConfig.java b/src/main/java/com/msvcchat/config/SwaggerConfig.java
index b0de96c..96895ea 100644
--- a/src/main/java/com/msvcchat/config/SwaggerConfig.java
+++ b/src/main/java/com/msvcchat/config/SwaggerConfig.java
@@ -9,7 +9,7 @@
@OpenAPIDefinition(
info = @Info(
title = "Microservicio Chat",
- description = "",
+ description = "API para el microservicio de Chat para entrenadores y miembros",
termsOfService = "",
version = "1.0.0",
contact = @Contact(
@@ -27,13 +27,8 @@
@Server(
description = "Local Server",
url = "http://localhost:9096"
- ),
- @Server(
- description = "Production Server",
- url = "https://"
)
}
)
-
public class SwaggerConfig {
}
\ No newline at end of file
diff --git a/src/main/java/com/msvcchat/config/aop/ReactiveMongoTransactionAspect.java b/src/main/java/com/msvcchat/config/aop/ReactiveMongoTransactionAspect.java
deleted file mode 100644
index aea0c0e..0000000
--- a/src/main/java/com/msvcchat/config/aop/ReactiveMongoTransactionAspect.java
+++ /dev/null
@@ -1,50 +0,0 @@
-//package com.msvcchat.config.aop;
-//
-//import com.mongodb.reactivestreams.client.ClientSession;
-//import org.aspectj.lang.ProceedingJoinPoint;
-//import org.aspectj.lang.annotation.Around;
-//import org.aspectj.lang.annotation.Aspect;
-//import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
-//import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
-//import org.springframework.stereotype.Component;
-//import reactor.core.publisher.Flux;
-//import reactor.core.publisher.Mono;
-//
-//@Aspect
-//@Component
-//public class ReactiveMongoTransactionAspect {
-// private final ReactiveMongoTemplate mongoTemplate;
-// private final ReactiveMongoDatabaseFactory mongoDatabaseFactory;
-// public ReactiveMongoTransactionAspect(ReactiveMongoTemplate mongoTemplate, ReactiveMongoDatabaseFactory mongoDatabaseFactory) {
-// this.mongoTemplate = mongoTemplate;
-// this.mongoDatabaseFactory = mongoDatabaseFactory;
-// }
-// @Around("@annotation(ReactiveMongoTransactional)")
-// public Object manageTransaction(ProceedingJoinPoint joinPoint) throws Throwable {
-// return Mono.usingWhen(
-// mongoDatabaseFactory.getSession(),
-// session -> {
-// session.startTransaction();
-// try {
-// Object result = joinPoint.proceed();
-// if (result instanceof Mono) {
-// return ((Mono>) result)
-// .doOnSuccess(unused -> session.commitTransaction())
-// .doOnError(e -> session.abortTransaction());
-// } else if (result instanceof Flux) {
-// return ((Flux>) result)
-// .doOnComplete(session::commitTransaction)
-// .doOnError(e -> session.abortTransaction());
-// } else {
-// session.abortTransaction();
-// throw new UnsupportedOperationException("Reactive transactions only support Mono/Flux return types");
-// }
-// } catch (Throwable ex) {
-// session.abortTransaction();
-// return Mono.error(ex);
-// }
-// },
-// ClientSession::close
-// );
-// }
-//}
\ No newline at end of file
diff --git a/src/main/java/com/msvcchat/config/aop/ReactiveMongoTransactional.java b/src/main/java/com/msvcchat/config/aop/ReactiveMongoTransactional.java
deleted file mode 100644
index 1e27818..0000000
--- a/src/main/java/com/msvcchat/config/aop/ReactiveMongoTransactional.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.msvcchat.config.aop;
-
-import java.lang.annotation.*;
-
-@Target(ElementType.METHOD)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface ReactiveMongoTransactional {
-}
diff --git a/src/main/java/com/msvcchat/config/security/JwtService.java b/src/main/java/com/msvcchat/config/security/JwtService.java
new file mode 100644
index 0000000..4d7160f
--- /dev/null
+++ b/src/main/java/com/msvcchat/config/security/JwtService.java
@@ -0,0 +1,17 @@
+package com.msvcchat.config.security;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.security.oauth2.jwt.Jwt;
+import org.springframework.security.oauth2.jwt.ReactiveJwtDecoder;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+@Service
+@RequiredArgsConstructor
+public class JwtService {
+ private final ReactiveJwtDecoder jwtDecoder;
+
+ public Mono validateToken(String token) {
+ return jwtDecoder.decode(token);
+ }
+}
diff --git a/src/main/java/com/msvcchat/config/security/SecurityConfig.java b/src/main/java/com/msvcchat/config/security/SecurityConfig.java
new file mode 100644
index 0000000..969ac53
--- /dev/null
+++ b/src/main/java/com/msvcchat/config/security/SecurityConfig.java
@@ -0,0 +1,28 @@
+package com.msvcchat.config.security;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
+import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
+import org.springframework.security.config.web.server.ServerHttpSecurity;
+import org.springframework.security.oauth2.jwt.ReactiveJwtDecoder;
+import org.springframework.security.web.server.SecurityWebFilterChain;
+
+@Configuration
+@EnableWebFluxSecurity
+@EnableMethodSecurity
+public class SecurityConfig {
+ @Bean
+ public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http, ReactiveJwtDecoder jwtDecoder) {
+ return http.authorizeExchange(exchanges -> exchanges.pathMatchers(
+ "/actuator/**",
+ "swagger-ui/**",
+ "v3/api-docs/**",
+ "/ws/chat",
+ "/test/**").permitAll()
+ .anyExchange().authenticated())
+ .oauth2ResourceServer(oauth2 -> oauth2.jwt(jwt -> jwt.jwtDecoder(jwtDecoder)))
+ .csrf(ServerHttpSecurity.CsrfSpec::disable)
+ .build();
+ }
+}
diff --git a/src/main/java/com/msvcchat/config/webclient/WebClientConfig.java b/src/main/java/com/msvcchat/config/webclient/WebClientConfig.java
new file mode 100644
index 0000000..1d5418d
--- /dev/null
+++ b/src/main/java/com/msvcchat/config/webclient/WebClientConfig.java
@@ -0,0 +1,30 @@
+package com.msvcchat.config.webclient;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.reactive.function.client.WebClient;
+
+@Configuration
+@Slf4j
+public class WebClientConfig {
+ @Value("${services.security.url:http://msvc-security:9091}")
+ private String securityServiceUrl;
+
+ @Value("${services.members.url:http://msvc-members:9098}")
+
+ private String memberServiceUrl;
+
+ @Bean("securityWebClient")
+ public WebClient securityWebClient(WebClient.Builder builder) {
+ return builder.baseUrl(securityServiceUrl).build();
+ }
+
+ @Bean("membersWebClient")
+ public WebClient membersWebClient(WebClient.Builder builder) {
+ return builder.baseUrl(memberServiceUrl).build();
+ }
+
+
+}
diff --git a/src/main/java/com/msvcchat/config/websockets/ChatWebSocketHandler.java b/src/main/java/com/msvcchat/config/websockets/ChatWebSocketHandler.java
index 7932bda..b0a5b1d 100644
--- a/src/main/java/com/msvcchat/config/websockets/ChatWebSocketHandler.java
+++ b/src/main/java/com/msvcchat/config/websockets/ChatWebSocketHandler.java
@@ -2,19 +2,20 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import com.msvcchat.dtos.ChatMessageDto;
+import com.msvcchat.config.security.JwtService;
import com.msvcchat.dtos.CreateChatMessageDto;
import com.msvcchat.entity.ChatMessage;
-import com.msvcchat.mappers.ChatMessageMapper;
-import com.msvcchat.repositories.ChatMessageRepository;
-import com.msvcchat.service.ChatRoomManager;
+import com.msvcchat.entity.ConversationDocument;
+import com.msvcchat.service.Impl.ChatRoomManagerImpl;
import com.msvcchat.service.ChatService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
@@ -22,6 +23,7 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,122 +32,167 @@
@Slf4j
public class ChatWebSocketHandler implements WebSocketHandler {
private final ChatService chatService;
- private final ChatMessageRepository repo;
- private final ChatRoomManager roomManager;
- private final ChatMessageMapper mapper;
+ private final ChatRoomManagerImpl roomManager;
+ private final JwtService jwtService;
private final Map> sinks = new ConcurrentHashMap<>();
private final ReactiveMongoTemplate mongoTemplate;
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
-// public ChatWebSocketHandler(ChatMessageRepository repo, ReactiveMongoTemplate mongoTemplate) {
-// this.repo = repo;
-// this.mongoTemplate = mongoTemplate;
-// startChangeStreamListener(); // inicializa escucha global
-// }
private Sinks.Many sinkFor(String roomId) {
return sinks.computeIfAbsent(roomId, rid -> Sinks.many().multicast().onBackpressureBuffer());
}
+
@Override
public Mono handle(WebSocketSession session) {
String path = session.getHandshakeInfo().getUri().getPath();
String roomId = path.substring(path.lastIndexOf('/') + 1);
- Sinks.Many sink = roomManager.sinkFor(roomId);
-
- Mono inbound = session.receive()
- .map(WebSocketMessage::getPayloadAsText)
- .flatMap(text -> {
- try {
- CreateChatMessageDto createDto = objectMapper.readValue(text, CreateChatMessageDto.class);
- log.info("**** ENVIANDO MENSAJE {} ****", text);
- log.info("***** CHAT DTO ****");
- log.info(createDto.toString());
- Mono response = chatService.saveMessage(roomId, createDto).then();
- log.info("***** RESPONSE ****");
- log.info(response.toString());
- return response;
- } catch (
- Exception e) {
- return Mono.empty();
- }
- })
- .then();
+ log.info("🔌 WebSocket: Intentando conectar a sala: {}", roomId);
- Flux outbound = sink.asFlux()
- .distinct(ChatMessage::getId)
- .map(m -> {
- try {
- ChatMessageDto dto = mapper.toDto(m);
- dto.setId(m.getId());
- log.info("****** CHAT MESSAGE DTO {} ", dto);
- return objectMapper.writeValueAsString(dto);
- } catch (
- Exception e) {
- e.printStackTrace();
- log.error("*************ERRROR *********");
- log.error(e.getMessage());
- return "{}";
- }
- })
- .map(session::textMessage);
-
- Flux history = chatService.getHistory(roomId)
- .map(dto -> {
- try {
- return objectMapper.writeValueAsString(dto);
- } catch (
- Exception e) {
- return "{}";
+ // ✅ NUEVO: Verificar que la conversación existe en MongoDB
+ return mongoTemplate.findById(roomId, ConversationDocument.class)
+ .switchIfEmpty(Mono.defer(() -> {
+ log.error("❌ Conversación no encontrada en MongoDB: {}", roomId);
+ return session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Conversación no encontrada"))
+ .then(Mono.empty());
+ }))
+ .flatMap(conversation -> {
+ log.info("✅ Conversación encontrada: {}, participantes: {}",
+ roomId, conversation.getParticipants());
+
+ String token = extractToken(session);
+
+ if (token == null) {
+ log.warn("❌ No se pudo obtener token JWT");
+ return session.close(CloseStatus.BAD_DATA.withReason("Token no encontrado"));
}
- })
- .map(session::textMessage);
+ log.info("Token encontrado");
+ return jwtService.validateToken(token)
+ .flatMap(jwt -> {
+ String email = jwt.getClaimAsString("email");
+ String role = jwt.getClaimAsString("authorities");
+
+ if (email == null || role == null) {
+ log.error("❌ Claims inválidos en JWT");
+ return session.close(CloseStatus.BAD_DATA.withReason("Claims inválidos"));
+ }
+
+ // ✅ Verificar que el usuario es participante de la conversación
+ if (!conversation.getParticipants().contains(email)) {
+ log.error("❌ Usuario {} no es participante de la conversación {}",
+ email, roomId);
+ return session.close(CloseStatus.NOT_ACCEPTABLE
+ .withReason("No eres participante de esta conversación"));
+ }
+
+ log.info("✅ Usuario {} autorizado para la conversación {}", email, roomId);
+
+ // Asociar al usuario con el room
+ roomManager.addUserToRoom(roomId, email);
+
+ Sinks.Many sink = roomManager.sinkFor(roomId);
+
+ // Recuperar el historial del chat
+ Flux history = chatService.getHistory(roomId)
+ .map(dto -> {
+ try {
+ return session.textMessage(objectMapper.writeValueAsString(dto));
+ } catch (
+ Exception e) {
+ log.error("Error serializando mensaje", e);
+ return session.textMessage("{}");
+ }
+ });
- return session.send(history.concatWith(outbound)).and(inbound);
+ // Configurar mensajes en tiempo real
+ Mono inbound = session.receive()
+ .map(WebSocketMessage::getPayloadAsText)
+ .flatMap(text -> {
+ try {
+ CreateChatMessageDto dto = objectMapper.readValue(text, CreateChatMessageDto.class);
+ dto.setFromEmail(email);
+ return chatService.saveMessage(roomId, dto);
+ } catch (
+ Exception e) {
+ log.error("Error procesando mensaje: {}", e.getMessage());
+ return Mono.empty();
+ }
+ })
+ .then();
+
+ Flux outbound = sink.asFlux()
+ .map(chatMessage -> {
+ try {
+ return session.textMessage(objectMapper.writeValueAsString(chatMessage));
+ } catch (
+ Exception e) {
+ log.error("Error enviando mensaje", e);
+ return session.textMessage("{}");
+ }
+ });
+
+ // Enviar historial seguido de mensajes en tiempo real
+ return session.send(history.concatWith(outbound)).and(inbound);
+ })
+ .onErrorResume(e -> {
+ log.error("❌ Error al validar token JWT: {}", e.getMessage());
+ return session.close(CloseStatus.BAD_DATA.withReason("Token inválido"));
+ });
+ });
}
+ private String extractToken(WebSocketSession session) {
+ var headers = session.getHandshakeInfo().getHeaders();
+
+ // 1. Intentar desde header Authorization (enviado por el Gateway)
+ List authHeaders = headers.get(HttpHeaders.AUTHORIZATION);
+ if (authHeaders != null && !authHeaders.isEmpty()) {
+ String authHeader = authHeaders.get(0);
+ if (authHeader.startsWith("Bearer ")) {
+ log.debug("✅ Token encontrado en header Authorization");
+ return authHeader.substring(7);
+ }
+ log.debug("✅ Token encontrado en header Authorization (sin Bearer)");
+ return authHeader;
+ }
+
+ // 2. Intentar desde header custom del Gateway
+ List customHeaders = headers.get("X-Auth-Token");
+ if (customHeaders != null && !customHeaders.isEmpty()) {
+ log.debug("✅ Token encontrado en header X-Auth-Token");
+ return customHeaders.get(0);
+ }
+
+ // 3. Fallback: Intentar desde cookies (por si acaso)
+ var cookies = session.getHandshakeInfo().getCookies();
+ var accessTokenCookie = cookies.getFirst("access_token");
+ if (accessTokenCookie != null) {
+ log.debug("✅ Token encontrado en cookie 'access_token'");
+ return accessTokenCookie.getValue();
+ }
+
+ log.warn("⚠️ No se encontró token en ninguna fuente");
+ log.debug("Headers disponibles: {}", headers.keySet());
+ log.debug("Cookies disponibles: {}", cookies.keySet());
+
+ return null;
+ }
+
@PostConstruct
void startChangeStreamListener() {
mongoTemplate.changeStream(ChatMessage.class)
.listen()
.mapNotNull(ChangeStreamEvent::getBody)
- .distinct(ChatMessage::getId) // Evita procesar mensajes duplicados
+ .distinct(ChatMessage::getId)
.subscribe(msg -> {
if (msg != null && msg.getRoomId() != null) {
- Sinks.Many s = sinks.get(msg.getRoomId());
- if (s != null) {
- s.tryEmitNext(msg);
- }
+ Sinks.Many sink = sinkFor(msg.getRoomId());
+ sink.tryEmitNext(msg);
}
- }, err -> {
- log.error("Error en ChangeStream: {}", err.getMessage(), err);
- });
+ }, err -> log.error("Error en ChangeStream: {}", err.getMessage()));
}
-}
-
-/**
- * Change Stream listener: cuando hay inserts en collection "messages",
- * emitimos a los sinks locales (para propagar mensajes entre instancias).
- * Requiere replica set.
- */
-//private void startChangeStreamListener() {
-// // escucha sin filtro (puedes filtrar por ns/collection o por roomId)
-// mongoTemplate.changeStream(ChatMessage.class)
-// .listen() // devuelve Flux>
-// .map(event -> event.getBody()) // ChatMessage
-// .subscribe(msg -> {
-// if (msg != null && msg.getRoomId() != null) {
-// Sinks.Many s = sinks.get(msg.getRoomId());
-// if (s != null) {
-// s.tryEmitNext(msg);
-// }
-// }
-// }, err -> {
-// // en prod haz reintentos/monitorización
-// err.printStackTrace();
-// });
-//}
-//}
+}
\ No newline at end of file
diff --git a/src/main/java/com/msvcchat/controller/ChatController.java b/src/main/java/com/msvcchat/controller/ChatController.java
index 7790a64..5a83df8 100644
--- a/src/main/java/com/msvcchat/controller/ChatController.java
+++ b/src/main/java/com/msvcchat/controller/ChatController.java
@@ -1,22 +1,97 @@
package com.msvcchat.controller;
-import com.msvcchat.dtos.UserDto;
+import com.msvcchat.dtos.*;
+import com.msvcchat.dtos.members.MemberDto;
+import com.msvcchat.service.ChatService;
+import com.msvcchat.service.ConversationService;
import lombok.RequiredArgsConstructor;
-import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationToken;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
@RestController
@RequiredArgsConstructor
-@RequestMapping("/chat/users")
+@RequestMapping
+@Slf4j
public class ChatController {
- private final ReactiveMongoTemplate mongoTemplate;
- @GetMapping
- public Flux getAllUsers() {
- return mongoTemplate.findAll(UserDto.class, "users");
+ private final ChatService chatService;
+ private final ConversationService conversationService;
+ @Qualifier("securityWebClient")
+ private final WebClient securityWebClient;
+ @Qualifier("membersWebClient")
+ private final WebClient membersWebClient;
+
+
+ @GetMapping("/conversations")
+ public Flux getConversations(Authentication authentication) {
+ String userEmail = authentication.getName();
+ return conversationService.getConversationsByUser(userEmail);
}
-}
+ @PostMapping("/conversations")
+ public Mono createConversation(
+ @RequestBody CreateConversationDto dto,
+ Authentication authentication) {
+ String userEmail = authentication.getName();
+ return conversationService.createConversation(userEmail, dto);
+ }
+
+ @GetMapping("/conversations/{conversationId}/messages")
+ public Flux getMessages(
+ @PathVariable String conversationId,
+ Authentication authentication) {
+ return chatService.getHistory(conversationId);
+ }
+
+ @PostMapping("/conversations/{conversationId}/messages")
+ public Mono sendMessage(
+ @PathVariable String conversationId,
+ @RequestBody CreateChatMessageDto dto,
+ Authentication authentication) {
+
+ String userEmail = authentication.getName();
+ UUID userId = getUserIdFromToken(authentication);
+
+ dto.setFromEmail(userEmail);
+ dto.setFromId(userId.toString());
+
+ return chatService.saveMessage(conversationId, dto);
+ }
+
+ @PatchMapping("/conversations/{conversationId}/read")
+ public Mono markAsRead(
+ @PathVariable String conversationId,
+ Authentication authentication) {
+ String userEmail = authentication.getName();
+ return conversationService.markAsRead(conversationId, userEmail);
+ }
+
+
+ @GetMapping("/users/{role}")
+ public Flux getUsersByRole(
+ @PathVariable String role,
+ Authentication authentication) {
+
+ String normalizedRole = role.toUpperCase();
+ log.info("Obteniendo usuarios con rol: {}", normalizedRole);
+
+ return conversationService.getAllUsersByRole(normalizedRole);
+ }
+
+
+ private UUID getUserIdFromToken(Authentication authentication) {
+ if (authentication instanceof JwtAuthenticationToken jwtAuth) {
+ String userId = jwtAuth.getToken().getClaim("user_id");
+ return UUID.fromString(userId);
+ }
+ throw new IllegalStateException("No se pudo obtener el user_id del token");
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/msvcchat/controller/HelloController.java b/src/main/java/com/msvcchat/controller/HelloController.java
index 88d06d4..846f924 100644
--- a/src/main/java/com/msvcchat/controller/HelloController.java
+++ b/src/main/java/com/msvcchat/controller/HelloController.java
@@ -1,12 +1,15 @@
package com.msvcchat.controller;
+import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
+@RequestMapping("/test")
public class HelloController {
@GetMapping("/saludo")
- public String saludo() {
- return "Hola Microservicio Chat";
+ public ResponseEntity saludo() {
+ return ResponseEntity.ok("Hola Microservicio Chat");
}
}
diff --git a/src/main/java/com/msvcchat/dtos/ConversationDto.java b/src/main/java/com/msvcchat/dtos/ConversationDto.java
new file mode 100644
index 0000000..ec12eed
--- /dev/null
+++ b/src/main/java/com/msvcchat/dtos/ConversationDto.java
@@ -0,0 +1,22 @@
+package com.msvcchat.dtos;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Instant;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ConversationDto {
+ private String id;
+ private UserDto participant;
+ private ChatMessageDto lastMessage;
+ private Instant lastMessageDate;
+ private int unreadCount;
+ private Boolean isFavorite;
+ private Boolean isConnected;
+ private Instant createdAt;
+ private Instant updatedAt;
+}
diff --git a/src/main/java/com/msvcchat/dtos/CreateChatMessageDto.java b/src/main/java/com/msvcchat/dtos/CreateChatMessageDto.java
index 960a30c..721dd28 100644
--- a/src/main/java/com/msvcchat/dtos/CreateChatMessageDto.java
+++ b/src/main/java/com/msvcchat/dtos/CreateChatMessageDto.java
@@ -9,5 +9,5 @@ public class CreateChatMessageDto {
private String fromId;
private String fromRole;
private String text;
-
+ private String fromEmail;
}
diff --git a/src/main/java/com/msvcchat/dtos/CreateConversationDto.java b/src/main/java/com/msvcchat/dtos/CreateConversationDto.java
new file mode 100644
index 0000000..a405eaa
--- /dev/null
+++ b/src/main/java/com/msvcchat/dtos/CreateConversationDto.java
@@ -0,0 +1,7 @@
+package com.msvcchat.dtos;
+
+public record CreateConversationDto(
+ String participantId,
+ String participantRole
+) {
+}
diff --git a/src/main/java/com/msvcchat/dtos/UserConnectionDto.java b/src/main/java/com/msvcchat/dtos/UserConnectionDto.java
new file mode 100644
index 0000000..401bbc5
--- /dev/null
+++ b/src/main/java/com/msvcchat/dtos/UserConnectionDto.java
@@ -0,0 +1,10 @@
+package com.msvcchat.dtos;
+
+public record UserConnectionDto(
+ String id,
+ String username,
+ boolean enabled,
+ String avatar,
+ String initials
+) {
+}
diff --git a/src/main/java/com/msvcchat/dtos/UserDto.java b/src/main/java/com/msvcchat/dtos/UserDto.java
index 4630d53..2f4048e 100644
--- a/src/main/java/com/msvcchat/dtos/UserDto.java
+++ b/src/main/java/com/msvcchat/dtos/UserDto.java
@@ -3,7 +3,7 @@
public record UserDto(
String id,
String name,
- String role,
- String avatar
+ String avatar,
+ String initials
) {
}
diff --git a/src/main/java/com/msvcchat/dtos/members/MemberDto.java b/src/main/java/com/msvcchat/dtos/members/MemberDto.java
new file mode 100644
index 0000000..0098003
--- /dev/null
+++ b/src/main/java/com/msvcchat/dtos/members/MemberDto.java
@@ -0,0 +1,12 @@
+package com.msvcchat.dtos.members;
+
+public record MemberDto(
+ String userId,
+ String firstName,
+ String lastName,
+ String dni,
+ String phone,
+ String profileImageUrl,
+ String status
+) {
+}
diff --git a/src/main/java/com/msvcchat/dtos/security/RoleDto.java b/src/main/java/com/msvcchat/dtos/security/RoleDto.java
new file mode 100644
index 0000000..e965b84
--- /dev/null
+++ b/src/main/java/com/msvcchat/dtos/security/RoleDto.java
@@ -0,0 +1,14 @@
+package com.msvcchat.dtos.security;
+
+import lombok.*;
+
+import java.util.UUID;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class RoleDto {
+ private UUID id;
+ private String name;
+ private String description;
+}
\ No newline at end of file
diff --git a/src/main/java/com/msvcchat/dtos/security/SimpleRoleDto.java b/src/main/java/com/msvcchat/dtos/security/SimpleRoleDto.java
new file mode 100644
index 0000000..32c92d0
--- /dev/null
+++ b/src/main/java/com/msvcchat/dtos/security/SimpleRoleDto.java
@@ -0,0 +1,6 @@
+package com.msvcchat.dtos.security;
+
+public record SimpleRoleDto(
+ String name
+) {
+}
diff --git a/src/main/java/com/msvcchat/dtos/security/SimpleUserDto.java b/src/main/java/com/msvcchat/dtos/security/SimpleUserDto.java
new file mode 100644
index 0000000..cf13bfc
--- /dev/null
+++ b/src/main/java/com/msvcchat/dtos/security/SimpleUserDto.java
@@ -0,0 +1,14 @@
+package com.msvcchat.dtos.security;
+
+import java.util.Set;
+
+public record SimpleUserDto(
+ String id,
+ String username,
+ String email,
+ String firstName,
+ String lastName,
+ Boolean enabled,
+ Set roles
+) {
+}
diff --git a/src/main/java/com/msvcchat/dtos/security/UserSecurityDto.java b/src/main/java/com/msvcchat/dtos/security/UserSecurityDto.java
new file mode 100644
index 0000000..8004e3b
--- /dev/null
+++ b/src/main/java/com/msvcchat/dtos/security/UserSecurityDto.java
@@ -0,0 +1,20 @@
+package com.msvcchat.dtos.security;
+
+
+import lombok.*;
+
+import java.util.Set;
+import java.util.UUID;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class UserSecurityDto {
+ private UUID id;
+ private String username;
+ private String email;
+ private String firstName;
+ private String lastName;
+ private Boolean enabled;
+ private Set roles;
+}
diff --git a/src/main/java/com/msvcchat/entity/ConversationDocument.java b/src/main/java/com/msvcchat/entity/ConversationDocument.java
new file mode 100644
index 0000000..6d04837
--- /dev/null
+++ b/src/main/java/com/msvcchat/entity/ConversationDocument.java
@@ -0,0 +1,25 @@
+package com.msvcchat.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.time.Instant;
+import java.util.Set;
+
+@Document(collection = "conversations")
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ConversationDocument {
+ @Id
+ private String id;
+ private Set participants;
+ private String lastMessageId;
+// private Boolean isFavorite;
+ private Instant lastActivity;
+ private Instant createdAt = Instant.now();
+ private Instant updatedAt = Instant.now();
+}
diff --git a/src/main/java/com/msvcchat/exceptions/GlobalExceptionController.java b/src/main/java/com/msvcchat/exceptions/GlobalExceptionController.java
index 1f67c09..b3ea2e8 100644
--- a/src/main/java/com/msvcchat/exceptions/GlobalExceptionController.java
+++ b/src/main/java/com/msvcchat/exceptions/GlobalExceptionController.java
@@ -16,6 +16,7 @@
import org.springframework.web.server.MethodNotAllowedException;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
+import java.nio.file.AccessDeniedException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -97,17 +98,17 @@ public ResponseEntity handleResponseStatusException(ResponseStatu
return ResponseEntity.status(ex.getStatusCode()).body(errorResponse);
}
-// @ExceptionHandler(AccessDeniedException.class)
-// public ResponseEntity handleAccessDeniedException(AccessDeniedException ex) {
-// ErrorResponse errorResponse = new ErrorResponse(
-// "ACCESS_DENIED",
-// "Acceso denegado",
-// Collections.singletonList(
-// "No tienes los permisos necesarios para realizar esta acción"));
-//
-// log.warn("Access denied for user");
-// return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorResponse);
-// }
+ @ExceptionHandler(AccessDeniedException.class)
+ public ResponseEntity handleAccessDeniedException(AccessDeniedException ex) {
+ ErrorResponse errorResponse = new ErrorResponse(
+ "ACCESS_DENIED",
+ "Acceso denegado",
+ Collections.singletonList(
+ "No tienes los permisos necesarios para realizar esta acción"));
+
+ log.warn("Access denied for user");
+ return ResponseEntity.status(HttpStatus.FORBIDDEN).body(errorResponse);
+ }
@ExceptionHandler(Exception.class)
public ResponseEntity handleException(Exception ex) {
@@ -143,4 +144,28 @@ public ResponseEntity handleCallNotPermittedException(CallNotPerm
log.error("Circuit breaker abierto: {}", ex.getMessage());
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(errorResponse);
}
+
+ @ExceptionHandler(ParticipantNotFoundException.class)
+ public ResponseEntity handleParticipantNotFoundException(ParticipantNotFoundException ex) {
+ ErrorResponse errorResponse = new ErrorResponse(
+ "PARTICIPANT_NOT_FOUND",
+ "Participante no encontrado en la conversación",
+ Collections.singletonList(ex.getMessage()));
+
+ log.warn("Participant not found: {}", ex.getMessage());
+ return ResponseEntity.status(HttpStatus.NOT_FOUND).body(errorResponse);
+ }
+
+ @ExceptionHandler(UserEnrichmentException.class)
+ public ResponseEntity handleUserEnrichmentException(UserEnrichmentException ex) {
+ ErrorResponse errorResponse = new ErrorResponse(
+ "USER_ENRICHMENT_ERROR",
+ "Error enriqueciendo datos de usuario",
+ Collections.singletonList(ex.getMessage()));
+
+ log.error("User enrichment error: {}", ex.getMessage());
+ return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorResponse);
+ }
+
+
}
\ No newline at end of file
diff --git a/src/main/java/com/msvcchat/exceptions/ParticipantNotFoundException.java b/src/main/java/com/msvcchat/exceptions/ParticipantNotFoundException.java
new file mode 100644
index 0000000..23a4618
--- /dev/null
+++ b/src/main/java/com/msvcchat/exceptions/ParticipantNotFoundException.java
@@ -0,0 +1,7 @@
+package com.msvcchat.exceptions;
+
+public class ParticipantNotFoundException extends RuntimeException {
+ public ParticipantNotFoundException(String conversationId) {
+ super("No se encontró participante en la conversación: " + conversationId);
+ }
+}
diff --git a/src/main/java/com/msvcchat/exceptions/UserEnrichmentException.java b/src/main/java/com/msvcchat/exceptions/UserEnrichmentException.java
new file mode 100644
index 0000000..d31981d
--- /dev/null
+++ b/src/main/java/com/msvcchat/exceptions/UserEnrichmentException.java
@@ -0,0 +1,7 @@
+package com.msvcchat.exceptions;
+
+public class UserEnrichmentException extends RuntimeException {
+ public UserEnrichmentException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/msvcchat/helpers/ChatHelper.java b/src/main/java/com/msvcchat/helpers/ChatHelper.java
new file mode 100644
index 0000000..6a91e96
--- /dev/null
+++ b/src/main/java/com/msvcchat/helpers/ChatHelper.java
@@ -0,0 +1,42 @@
+package com.msvcchat.helpers;
+
+public class ChatHelper {
+ public static String generateInitials(String firstName, String lastName) {
+ StringBuilder initials = new StringBuilder();
+
+ if (firstName != null && !firstName.isBlank()) {
+ initials.append(firstName.charAt(0));
+ }
+
+ if (lastName != null && !lastName.isBlank()) {
+ initials.append(lastName.charAt(0));
+ }
+
+ return !initials.isEmpty() ? initials.toString().toUpperCase() : "?";
+ }
+
+
+ public static String generateInitialsFromEmail(String email) {
+ if (email != null && !email.isBlank()) {
+ return email.substring(0, 1).toUpperCase();
+ }
+ return "?";
+ }
+
+ public static String buildDisplayName(String firstName, String lastName) {
+ if (firstName != null && !firstName.isBlank() &&
+ lastName != null && !lastName.isBlank()) {
+ return firstName + " " + lastName;
+ }
+
+ if (firstName != null && !firstName.isBlank()) {
+ return firstName;
+ }
+
+ if (lastName != null && !lastName.isBlank()) {
+ return lastName;
+ }
+
+ return "Usuario sin nombre";
+ }
+}
diff --git a/src/main/java/com/msvcchat/mappers/ConversationMapper.java b/src/main/java/com/msvcchat/mappers/ConversationMapper.java
new file mode 100644
index 0000000..5f2a3e6
--- /dev/null
+++ b/src/main/java/com/msvcchat/mappers/ConversationMapper.java
@@ -0,0 +1,20 @@
+package com.msvcchat.mappers;
+
+import com.msvcchat.config.ReactiveMapperConfig;
+import com.msvcchat.dtos.ConversationDto;
+import com.msvcchat.entity.ConversationDocument;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+
+@Mapper(config = ReactiveMapperConfig.class)
+public interface ConversationMapper {
+
+ @Mapping(target = "participant", ignore = true)
+ @Mapping(target = "lastMessage", ignore = true)
+ @Mapping(target = "lastMessageDate", ignore = true)
+ @Mapping(target = "unreadCount", constant = "0")
+ @Mapping(target = "isFavorite", constant = "false")
+ @Mapping(target = "isConnected", constant = "false")
+ ConversationDto toDto(ConversationDocument entity);
+
+}
diff --git a/src/main/java/com/msvcchat/repositories/ConversationRepository.java b/src/main/java/com/msvcchat/repositories/ConversationRepository.java
new file mode 100644
index 0000000..d18a7f1
--- /dev/null
+++ b/src/main/java/com/msvcchat/repositories/ConversationRepository.java
@@ -0,0 +1,15 @@
+package com.msvcchat.repositories;
+
+import com.msvcchat.entity.ConversationDocument;
+import org.springframework.data.mongodb.repository.Query;
+import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Set;
+
+public interface ConversationRepository extends ReactiveMongoRepository {
+ Flux findByParticipantsContaining(String userEmail);
+ @Query("{'participants': {$all: ?0}}")
+ Mono findByParticipantsContainingAll(Set participants);
+}
diff --git a/src/main/java/com/msvcchat/service/ChatRoomManager.java b/src/main/java/com/msvcchat/service/ChatRoomManager.java
index 56c247d..1d1eb06 100644
--- a/src/main/java/com/msvcchat/service/ChatRoomManager.java
+++ b/src/main/java/com/msvcchat/service/ChatRoomManager.java
@@ -1,54 +1,14 @@
package com.msvcchat.service;
import com.msvcchat.entity.ChatMessage;
-import jakarta.annotation.PostConstruct;
-import lombok.RequiredArgsConstructor;
-import org.springframework.data.mongodb.core.ChangeStreamEvent;
-import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
-import org.springframework.stereotype.Component;
import reactor.core.publisher.Sinks;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-@Component
-@RequiredArgsConstructor
-public class ChatRoomManager {
-
- private final ReactiveMongoTemplate mongoTemplate;
- private final Map> sinks = new ConcurrentHashMap<>();
-
- public Sinks.Many sinkFor(String roomId) {
- return sinks.computeIfAbsent(roomId, rid -> Sinks.many().multicast().onBackpressureBuffer());
- }
-
-
- public void broadcast(ChatMessage msg) {
- if (msg == null || msg.getRoomId() == null)
- return;
- Sinks.Many s = sinks.get(msg.getRoomId());
- if (s != null) {
- s.tryEmitNext(msg);
- }
- }
-
-
- @PostConstruct
- void startChangeStreamListener() {
- //Va a estar esuchando inserts o updates en la colencion de ChatMessage
- mongoTemplate.changeStream(ChatMessage.class)
- .listen()
- .mapNotNull(ChangeStreamEvent::getBody)
- .subscribe(msg -> {
- if (msg != null && msg.getRoomId() != null) {
- Sinks.Many s = sinks.get(msg.getRoomId());
- if (s != null)
- s.tryEmitNext(msg);
- }
- }, err -> {
- err.printStackTrace();
- });
- }
-
+import java.util.Set;
+public interface ChatRoomManager {
+ Sinks.Many sinkFor(String roomId);
+ void addUserToRoom(String roomId, String userId);
+ Set getUsersInRoom(String roomId);
+ void broadcast(ChatMessage msg);
+ Set getAllRooms();
}
diff --git a/src/main/java/com/msvcchat/service/ConversationService.java b/src/main/java/com/msvcchat/service/ConversationService.java
new file mode 100644
index 0000000..63db053
--- /dev/null
+++ b/src/main/java/com/msvcchat/service/ConversationService.java
@@ -0,0 +1,15 @@
+package com.msvcchat.service;
+
+import com.msvcchat.dtos.ConversationDto;
+import com.msvcchat.dtos.CreateConversationDto;
+import com.msvcchat.dtos.UserConnectionDto;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface ConversationService {
+ Flux getConversationsByUser(String userEmail);
+ Mono createConversation(String userEmail, CreateConversationDto dto);
+ Mono markAsRead(String conversationId,String userEmail);
+ Mono getOrCreateRoomId(String user1Email,String user2Email);
+ Flux getAllUsersByRole(String role);
+}
diff --git a/src/main/java/com/msvcchat/service/ExternalServiceClient.java b/src/main/java/com/msvcchat/service/ExternalServiceClient.java
new file mode 100644
index 0000000..0d5f86b
--- /dev/null
+++ b/src/main/java/com/msvcchat/service/ExternalServiceClient.java
@@ -0,0 +1,14 @@
+package com.msvcchat.service;
+
+import com.msvcchat.dtos.members.MemberDto;
+import com.msvcchat.dtos.security.SimpleUserDto;
+import com.msvcchat.dtos.security.UserSecurityDto;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface ExternalServiceClient {
+ Mono getUserByEmailFromSecurity(String email);
+ Mono getMemberFromMembers(String userId);
+ Mono getUserByIdFromSecurity(String userId);
+ Flux getUsersByRoleFromSecurity(String role);
+}
diff --git a/src/main/java/com/msvcchat/service/Impl/ChatRoomManagerImpl.java b/src/main/java/com/msvcchat/service/Impl/ChatRoomManagerImpl.java
new file mode 100644
index 0000000..3ac06f9
--- /dev/null
+++ b/src/main/java/com/msvcchat/service/Impl/ChatRoomManagerImpl.java
@@ -0,0 +1,73 @@
+package com.msvcchat.service.Impl;
+
+import com.msvcchat.entity.ChatMessage;
+import com.msvcchat.service.ChatRoomManager;
+import jakarta.annotation.PostConstruct;
+import lombok.RequiredArgsConstructor;
+import org.springframework.data.mongodb.core.ChangeStreamEvent;
+import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+import reactor.core.publisher.Sinks;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+@RequiredArgsConstructor
+public class ChatRoomManagerImpl implements ChatRoomManager {
+
+ private final ReactiveMongoTemplate mongoTemplate;
+ private final Map> sinks = new ConcurrentHashMap<>();
+ private final Map> roomUsers = new ConcurrentHashMap<>();
+
+ @Override
+ public Sinks.Many sinkFor(String roomId) {
+ return sinks.computeIfAbsent(roomId, rid -> Sinks.many().multicast().onBackpressureBuffer());
+ }
+
+ @Override
+ @Transactional
+ public void addUserToRoom(String roomId, String userId) {
+ roomUsers.computeIfAbsent(roomId, rid -> ConcurrentHashMap.newKeySet()).add(userId);
+ }
+
+ @Override
+ @Transactional(readOnly = true)
+ public Set getUsersInRoom(String roomId) {
+ return roomUsers.getOrDefault(roomId, Set.of());
+ }
+
+ @Override
+ public void broadcast(ChatMessage msg) {
+ if (msg == null || msg.getRoomId() == null)
+ return;
+ Sinks.Many s = sinks.get(msg.getRoomId());
+ if (s != null) {
+ s.tryEmitNext(msg);
+ }
+ }
+
+ @Override
+ public Set getAllRooms() {
+ return roomUsers.keySet();
+ }
+
+
+ @PostConstruct
+ void startChangeStreamListener() {
+ mongoTemplate.changeStream(ChatMessage.class)
+ .listen()
+ .mapNotNull(ChangeStreamEvent::getBody)
+ .subscribe(msg -> {
+ if (msg != null && msg.getRoomId() != null) {
+ Sinks.Many s = sinks.get(msg.getRoomId());
+ if (s != null)
+ s.tryEmitNext(msg);
+ }
+ }, Throwable::printStackTrace);
+ }
+
+
+}
diff --git a/src/main/java/com/msvcchat/service/Impl/ChatServiceImpl.java b/src/main/java/com/msvcchat/service/Impl/ChatServiceImpl.java
index ced8c40..bb4ac3d 100644
--- a/src/main/java/com/msvcchat/service/Impl/ChatServiceImpl.java
+++ b/src/main/java/com/msvcchat/service/Impl/ChatServiceImpl.java
@@ -5,7 +5,7 @@
import com.msvcchat.entity.ChatMessage;
import com.msvcchat.mappers.ChatMessageMapper;
import com.msvcchat.repositories.ChatMessageRepository;
-import com.msvcchat.service.ChatRoomManager;
+import com.msvcchat.repositories.ConversationRepository;
import com.msvcchat.service.ChatService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -19,8 +19,9 @@
public class ChatServiceImpl implements ChatService {
private final ChatMessageRepository chatMessageRepository;
+ private final ConversationRepository conversationRepository;
private final ChatMessageMapper mapper;
- private final ChatRoomManager chatRoomManager;
+ private final ChatRoomManagerImpl chatRoomManagerImpl;
@Override
public Mono saveMessage(String roomId, CreateChatMessageDto dto) {
@@ -28,7 +29,18 @@ public Mono saveMessage(String roomId, CreateChatMessageDto dto)
entity.setRoomId(roomId);
return chatMessageRepository
.save(entity)
- .doOnNext(chatRoomManager::broadcast)
+ .flatMap(savedMessage -> {
+ return conversationRepository.findById(roomId)
+ .flatMap(conversation -> {
+ conversation.setLastMessageId(savedMessage.getId());
+ conversation.setLastActivity(savedMessage.getCreatedAt());
+ return conversationRepository.save(conversation);
+ })
+ .onErrorResume(error -> {
+ log.warn("Error al actualizar la converzacion {} :{}", roomId, error.getMessage());
+ return Mono.empty();
+ }).thenReturn(savedMessage);
+ }).doOnNext(chatRoomManagerImpl::broadcast)
.map(mapper::toDto);
}
diff --git a/src/main/java/com/msvcchat/service/Impl/ConversationServiceImpl.java b/src/main/java/com/msvcchat/service/Impl/ConversationServiceImpl.java
new file mode 100644
index 0000000..f466017
--- /dev/null
+++ b/src/main/java/com/msvcchat/service/Impl/ConversationServiceImpl.java
@@ -0,0 +1,227 @@
+package com.msvcchat.service.Impl;
+
+import com.msvcchat.dtos.*;
+import com.msvcchat.dtos.security.SimpleUserDto;
+import com.msvcchat.entity.ChatMessage;
+import com.msvcchat.entity.ConversationDocument;
+import com.msvcchat.mappers.ChatMessageMapper;
+import com.msvcchat.mappers.ConversationMapper;
+import com.msvcchat.repositories.ConversationRepository;
+import com.msvcchat.service.ChatRoomManager;
+import com.msvcchat.service.ConversationService;
+import com.msvcchat.service.ExternalServiceClient;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.domain.Sort;
+import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Instant;
+import java.util.Set;
+
+import static com.msvcchat.helpers.ChatHelper.*;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class ConversationServiceImpl implements ConversationService {
+
+ private final ConversationRepository conversationRepository;
+ private final ReactiveMongoTemplate mongoTemplate;
+ private final ConversationMapper conversationMapper;
+ private final ChatRoomManager chatRoomManager;
+ private final ExternalServiceClient externalServiceClient;
+ private final ChatMessageMapper chatMessageMapper;
+
+ @Qualifier("securityWebClient")
+ private final WebClient securityWebClient;
+
+ @Qualifier("membersWebClient")
+ private final WebClient membersWebClient;
+
+
+ @Override
+ public Flux getConversationsByUser(String userEmail) {
+ return conversationRepository.findByParticipantsContaining(userEmail)
+ .flatMap(conv -> enrichConversationDto(conv, userEmail));
+ }
+
+ @Override
+ public Mono createConversation(String userEmail, CreateConversationDto dto) {
+ return externalServiceClient.getUserByIdFromSecurity(dto.participantId())
+ .flatMap(participantUser -> {
+ String participantEmail = participantUser.getEmail();
+ return conversationRepository.findByParticipantsContainingAll(Set.of(userEmail, participantEmail))
+ .switchIfEmpty(
+ conversationRepository.save(new ConversationDocument(
+ null,
+ Set.of(userEmail, participantEmail),
+ null,
+ Instant.now(),
+ Instant.now(),
+ Instant.now()
+ ))
+ )
+ .flatMap(conv -> enrichConversationDto(conv, userEmail));
+ });
+ }
+
+ @Override
+ public Mono markAsRead(String conversationId, String userEmail) {
+
+ return Mono.empty();
+ }
+
+ @Override
+ public Mono getOrCreateRoomId(String user1Email, String user2Email) {
+ Set participants = Set.of(user1Email, user2Email);
+ return conversationRepository.findByParticipantsContainingAll(participants)
+ .switchIfEmpty(
+ conversationRepository.save(new ConversationDocument(
+ null,
+ participants,
+ null,
+ Instant.now(),
+ Instant.now(),
+ Instant.now()
+ ))
+ )
+ .map(ConversationDocument::getId);
+ }
+
+ @Override
+ public Flux getAllUsersByRole(String role) {
+ return securityWebClient.get()
+ .uri("/users/by-role/{role}", role)
+ .retrieve()
+ .bodyToFlux(SimpleUserDto.class)
+ .flatMap(user -> {
+ log.info("Usuario traído de security {}", user);
+ return externalServiceClient.getMemberFromMembers(user.id())
+ .map(memberDto -> {
+ String displayName = buildDisplayName(memberDto.firstName(), memberDto.lastName());
+ String initials = generateInitials(memberDto.firstName(), memberDto.lastName());
+ boolean isConnected = checkUserConnection(user.id());
+
+ return new UserConnectionDto(
+ user.id(),
+ displayName,
+ isConnected,
+ memberDto.profileImageUrl(),
+ initials
+ );
+ })
+ .onErrorResume(error -> {
+ log.warn("Error obteniendo member para userId {}: {}", user.id(), error.getMessage());
+ String fallbackName = user.email() != null ? user.email() : "Usuario";
+ String initials = generateInitialsFromEmail(user.email());
+
+ return Mono.just(new UserConnectionDto(
+ user.id(),
+ fallbackName,
+ false,
+ null,
+ initials
+ ));
+ });
+ })
+ .doOnError(error -> log.error("Error obteniendo usuarios por rol {}: {}", role, error.getMessage()));
+ }
+
+
+ private Mono enrichConversationDto(ConversationDocument conversation, String currentUserEmail) {
+ ConversationDto dto = conversationMapper.toDto(conversation);
+
+ String participantEmail = conversation.getParticipants().stream()
+ .filter(email -> !email.equals(currentUserEmail))
+ .findFirst()
+ .orElse(null);
+
+ if (participantEmail == null) {
+ log.warn("⚠️ No se encontró email del participante en conversación {}", conversation.getId());
+ return Mono.just(dto);
+ }
+
+ return externalServiceClient.getUserByEmailFromSecurity(participantEmail)
+ .flatMap(userSecurityDto -> {
+ String userId = userSecurityDto.id();
+
+ return externalServiceClient.getMemberFromMembers(userId)
+ .map(memberDto -> {
+ String displayName = buildDisplayName(
+ memberDto.firstName(),
+ memberDto.lastName()
+ );
+ String initials = generateInitials(memberDto.firstName(), memberDto.lastName());
+ UserDto enrichedUser = new UserDto(
+ userId,
+ displayName,
+ memberDto.profileImageUrl(),
+ initials
+
+ );
+
+ dto.setParticipant(enrichedUser);
+ return dto;
+ })
+ .onErrorResume(error -> {
+ log.warn("No se pudo obtener datos de members para userId={}: {}",
+ userId, error.getMessage());
+
+ String fallbackName = buildDisplayName(
+ userSecurityDto.firstName(),
+ userSecurityDto.lastName()
+ );
+
+ String initials = generateInitialsFromEmail(
+ userSecurityDto.email()
+ );
+
+ UserDto partialUser = new UserDto(
+ userId,
+ fallbackName,
+ null,
+ initials
+ );
+
+ dto.setParticipant(partialUser);
+ return Mono.just(dto);
+ });
+ })
+ .flatMap(enrichedDto -> {
+ return mongoTemplate.find(
+ Query.query(
+ Criteria.where("roomId")
+ .is(conversation.getId())
+ ).with(Sort.by(
+ Sort.Direction.DESC, "createdAt"
+ )).limit(1),
+ ChatMessage.class
+ )
+ .next()
+ .map(lastMsg -> {
+ ChatMessageDto lastMessageDto = chatMessageMapper.toDto(lastMsg);
+ enrichedDto.setLastMessage(lastMessageDto);
+ enrichedDto.setLastMessageDate(lastMsg.getCreatedAt());
+ return enrichedDto;
+ })
+ .defaultIfEmpty(enrichedDto);
+ })
+ .onErrorResume(error -> {
+ log.error("❌ Error enriqueciendo conversación {}: {}",
+ conversation.getId(), error.getMessage());
+ return Mono.just(dto);
+ });
+ }
+
+ private boolean checkUserConnection(String userId) {
+ return chatRoomManager.getAllRooms().stream().anyMatch(roomId -> chatRoomManager.getUsersInRoom(roomId).contains(userId));
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/msvcchat/service/Impl/ExternalServiceClientImpl.java b/src/main/java/com/msvcchat/service/Impl/ExternalServiceClientImpl.java
new file mode 100644
index 0000000..4bbad7e
--- /dev/null
+++ b/src/main/java/com/msvcchat/service/Impl/ExternalServiceClientImpl.java
@@ -0,0 +1,65 @@
+package com.msvcchat.service.Impl;
+
+import com.msvcchat.dtos.members.MemberDto;
+import com.msvcchat.dtos.security.SimpleUserDto;
+import com.msvcchat.dtos.security.UserSecurityDto;
+import com.msvcchat.service.ExternalServiceClient;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class ExternalServiceClientImpl implements ExternalServiceClient {
+
+ @Qualifier("securityWebClient")
+ private final WebClient securityWebClient;
+
+ @Qualifier("membersWebClient")
+ private final WebClient membersWebClient;
+
+ @Override
+ public Mono getUserByEmailFromSecurity(String email) {
+ return securityWebClient.get()
+ .uri("/users/by-email/{email}", email)
+ .retrieve()
+ .bodyToMono(SimpleUserDto.class)
+ .doOnError(error -> log.error("❌ Error obteniendo usuario de security por email {}: {}",
+ email, error.getMessage()));
+ }
+
+ @Override
+ public Mono getMemberFromMembers(String userId) {
+ return membersWebClient.get()
+ .uri("/public/member/{userId}", userId)
+ .retrieve()
+ .bodyToMono(MemberDto.class)
+ .doOnError(error -> log.error("❌ Error obteniendo miembro de members para userId {}: {}",
+ userId, error.getMessage()));
+ }
+
+ @Override
+ public Mono getUserByIdFromSecurity(String userId) {
+ return securityWebClient.get()
+ .uri("/users/{id}", userId)
+ .retrieve()
+ .bodyToMono(UserSecurityDto.class)
+ .doOnError(error -> log.error("❌ Error obteniendo usuario de security por ID {}: {}",
+ userId, error.getMessage()));
+ }
+
+ @Override
+ public Flux getUsersByRoleFromSecurity(String role) {
+ return securityWebClient.get()
+ .uri("/users/by-role/{role}", role)
+ .retrieve()
+ .bodyToFlux(SimpleUserDto.class)
+ .doOnError(error -> log.error("❌ Error obteniendo usuarios por rol {}: {}",
+ role, error.getMessage()));
+ }
+}