Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
FROM openjdk:21-jdk-slim
WORKDIR /app
COPY pom.xml .
COPY mvnw .
COPY .mvn .mvn
COPY mvnw pom.xml ./
RUN chmod +x mvnw
RUN ./mvnw dependency:go-offline
RUN ./mvnw -T 4 dependency:go-offline
COPY src ./src
EXPOSE 9096 5005
CMD ["./mvnw", "spring-boot:run", "-Dspring-boot.run.jvmArguments=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"]
74 changes: 42 additions & 32 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
<properties>
<java.version>21</java.version>
<spring-cloud.version>2025.0.0</spring-cloud.version>
<mapstruct.version>1.5.5.Final</mapstruct.version>
<maven.compiler.release>21</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
Expand All @@ -45,18 +48,18 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
Expand Down Expand Up @@ -89,32 +92,39 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>1.5.5.Final</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>1.5.5.Final</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webflux-api</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
<version>2.8.11</version>
</dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>${mapstruct.version}</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
<scope>provided</scope>
</dependency>


</dependencies>
<dependencyManagement>
<dependencies>
Expand All @@ -135,15 +145,15 @@
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>1.5.5.Final</version>
</path>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</path>
<path>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
Expand Down
Empty file.
25 changes: 25 additions & 0 deletions src/main/java/com/msvcchat/config/CorsConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.msvcchat.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.reactive.CorsWebFilter;
import org.springframework.web.cors.reactive.UrlBasedCorsConfigurationSource;

@Configuration
public class CorsConfig {
@Bean
public CorsWebFilter corsWebFilter() {
CorsConfiguration configuration = new CorsConfiguration();

// Especifica los orígenes permitidos
configuration.addAllowedOrigin("http://localhost:5173"); // Cambia esto según tu frontend
configuration.addAllowedMethod("*");
configuration.addAllowedHeader("*");
configuration.setAllowCredentials(true);

UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", configuration);
return new CorsWebFilter(source);
}
}
8 changes: 0 additions & 8 deletions src/main/java/com/msvcchat/config/ReactiveMapperConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,4 @@
nullValuePropertyMappingStrategy = NullValuePropertyMappingStrategy.IGNORE
)
public interface ReactiveMapperConfig {

default <T, R> Mono<R> mapMono(Mono<T> mono, Function<T, R> mapper) {
return mono.map(mapper);
}

default <T, R> Flux<R> mapFlux(Flux<T> flux, Function<T, R> mapper) {
return flux.map(mapper);
}
}
15 changes: 1 addition & 14 deletions src/main/java/com/msvcchat/config/SwaggerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,13 @@
servers = {
@Server(
description = "Local Server",
url = "http://localhost:9005"
url = "http://localhost:9096"
),
@Server(
description = "Production Server",
url = "https://"
)
}
// ,
// security = @SecurityRequirement(
// name = "securityToken"
// )
//)
//@SecurityScheme(
// name = "securityToken",
// description = "Access Token For My API",
// type = SecuritySchemeType.HTTP,
// paramName = HttpHeaders.AUTHORIZATION,
// in = SecuritySchemeIn.HEADER,
// scheme = "bearer",
// bearerFormat = "JWT"
)

public class SwaggerConfig {
Expand Down
105 changes: 80 additions & 25 deletions src/main/java/com/msvcchat/config/websockets/ChatWebSocketHandler.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package com.msvcchat.config.websockets;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.msvcchat.dtos.ChatMessageDto;
import com.msvcchat.dtos.CreateChatMessageDto;
import com.msvcchat.entity.ChatMessage;
import com.msvcchat.mappers.ChatMessageMapper;
import com.msvcchat.repositories.ChatMessageRepository;
import com.msvcchat.service.ChatRoomManager;
import com.msvcchat.service.ChatService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
Expand All @@ -18,12 +27,15 @@

@Component
@RequiredArgsConstructor
@Slf4j
public class ChatWebSocketHandler implements WebSocketHandler {

private final ChatService chatService;
private final ChatMessageRepository repo;
private final ObjectMapper mapper = new ObjectMapper();
private final ChatRoomManager roomManager;
private final ChatMessageMapper mapper;
private final Map<String, Sinks.Many<ChatMessage>> sinks = new ConcurrentHashMap<>();
private final ReactiveMongoTemplate mongoTemplate;
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

// public ChatWebSocketHandler(ChatMessageRepository repo, ReactiveMongoTemplate mongoTemplate) {
// this.repo = repo;
Expand All @@ -39,48 +51,67 @@ private Sinks.Many<ChatMessage> sinkFor(String roomId) {
public Mono<Void> handle(WebSocketSession session) {
String path = session.getHandshakeInfo().getUri().getPath();
String roomId = path.substring(path.lastIndexOf('/') + 1);
Sinks.Many<ChatMessage> sink = sinkFor(roomId);

Sinks.Many<ChatMessage> sink = roomManager.sinkFor(roomId);

Mono<Void> inbound = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(text -> {
try {
ChatMessage msg = mapper.readValue(text, ChatMessage.class);
msg.setRoomId(roomId);
return repo.save(msg)
.doOnNext(saved -> sink.tryEmitNext(saved));
} catch (Exception e) {
CreateChatMessageDto createDto = objectMapper.readValue(text, CreateChatMessageDto.class);
log.info("**** ENVIANDO MENSAJE {} ****", text);
log.info("***** CHAT DTO ****");
log.info(createDto.toString());
Mono<Void> response = chatService.saveMessage(roomId, createDto).then();
log.info("***** RESPONSE ****");
log.info(response.toString());
return response;
} catch (
Exception e) {
return Mono.empty();
}
})
.then();

Flux<WebSocketMessage> outbound = sink.asFlux()
.distinct(ChatMessage::getId)
.map(m -> {
try { return mapper.writeValueAsString(m); } catch (Exception e) { return "{}";}
try {
ChatMessageDto dto = mapper.toDto(m);
dto.setId(m.getId());
log.info("****** CHAT MESSAGE DTO {} ", dto);
return objectMapper.writeValueAsString(dto);
} catch (
Exception e) {
e.printStackTrace();
log.error("*************ERRROR *********");
log.error(e.getMessage());
return "{}";
}
})
.map(session::textMessage);

// cuando cliente se conecta, opcional: enviar historial reciente
Flux<WebSocketMessage> history = repo.findByRoomIdOrderByCreatedAtAsc(roomId)
.map(m -> {
try { return mapper.writeValueAsString(m); } catch (Exception e) { return "{}"; }
Flux<WebSocketMessage> history = chatService.getHistory(roomId)
.map(dto -> {
try {
return objectMapper.writeValueAsString(dto);
} catch (
Exception e) {
return "{}";
}
})
.map(session::textMessage);

return session.send(Flux.concat(history, outbound)).and(inbound);
return session.send(history.concatWith(outbound)).and(inbound);
}

/**
* Change Stream listener: cuando hay inserts en collection "messages",
* emitimos a los sinks locales (para propagar mensajes entre instancias).
* Requiere replica set.
*/
private void startChangeStreamListener() {
// escucha sin filtro (puedes filtrar por ns/collection o por roomId)

@PostConstruct
void startChangeStreamListener() {
mongoTemplate.changeStream(ChatMessage.class)
.listen() // devuelve Flux<ChangeStreamEvent<ChatMessage>>
.map(event -> event.getBody()) // ChatMessage
.listen()
.mapNotNull(ChangeStreamEvent::getBody)
.distinct(ChatMessage::getId) // Evita procesar mensajes duplicados
.subscribe(msg -> {
if (msg != null && msg.getRoomId() != null) {
Sinks.Many<ChatMessage> s = sinks.get(msg.getRoomId());
Expand All @@ -89,8 +120,32 @@ private void startChangeStreamListener() {
}
}
}, err -> {
// en prod haz reintentos/monitorización
err.printStackTrace();
log.error("Error en ChangeStream: {}", err.getMessage(), err);
});
}

}

/**
* Change Stream listener: cuando hay inserts en collection "messages",
* emitimos a los sinks locales (para propagar mensajes entre instancias).
* Requiere replica set.
*/
//private void startChangeStreamListener() {
// // escucha sin filtro (puedes filtrar por ns/collection o por roomId)
// mongoTemplate.changeStream(ChatMessage.class)
// .listen() // devuelve Flux<ChangeStreamEvent<ChatMessage>>
// .map(event -> event.getBody()) // ChatMessage
// .subscribe(msg -> {
// if (msg != null && msg.getRoomId() != null) {
// Sinks.Many<ChatMessage> s = sinks.get(msg.getRoomId());
// if (s != null) {
// s.tryEmitNext(msg);
// }
// }
// }, err -> {
// // en prod haz reintentos/monitorización
// err.printStackTrace();
// });
//}
//}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class WebSocketConfig {

@Bean
public SimpleUrlHandlerMapping webSocketMapping(ChatWebSocketHandler handler) {
Map<String, WebSocketHandler> map = Map.of("/ws/chat/{roomId}", handler);
Map<String, WebSocketHandler> map = Map.of("/ws/chat/*", handler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(10);
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/msvcchat/controller/ChatController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.msvcchat.controller;

import com.msvcchat.dtos.UserDto;
import lombok.RequiredArgsConstructor;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
@RequiredArgsConstructor
@RequestMapping("/chat/users")
public class ChatController {
private final ReactiveMongoTemplate mongoTemplate;

@GetMapping
public Flux<UserDto> getAllUsers() {
return mongoTemplate.findAll(UserDto.class, "users");
}

}
Loading