diff --git a/L38-webflux-chat/client-service/src/main/java/ru/petrelevich/controllers/MessageController.java b/L38-webflux-chat/client-service/src/main/java/ru/petrelevich/controllers/MessageController.java index 4d5d34b..e1bb42d 100755 --- a/L38-webflux-chat/client-service/src/main/java/ru/petrelevich/controllers/MessageController.java +++ b/L38-webflux-chat/client-service/src/main/java/ru/petrelevich/controllers/MessageController.java @@ -26,6 +26,8 @@ public class MessageController { private final WebClient datastoreClient; private final SimpMessagingTemplate template; + private final String SECRET_ROOM_ID = "1408"; + public MessageController(WebClient datastoreClient, SimpMessagingTemplate template) { this.datastoreClient = datastoreClient; this.template = template; @@ -34,10 +36,20 @@ public MessageController(WebClient datastoreClient, SimpMessagingTemplate templa @MessageMapping("/message.{roomId}") public void getMessage(@DestinationVariable("roomId") String roomId, Message message) { logger.info("get message:{}, roomId:{}", message, roomId); - saveMessage(roomId, message).subscribe(msgId -> logger.info("message send id:{}", msgId)); - template.convertAndSend( - String.format("%s%s", TOPIC_TEMPLATE, roomId), new Message(HtmlUtils.htmlEscape(message.messageStr()))); + switch (roomId) { + case SECRET_ROOM_ID -> logger.info("нельзя писать сообщения в комнату %s".formatted(roomId)); + default -> { + saveMessage(roomId, message).subscribe(msgId -> logger.info("message send id:{}", msgId)); + + template.convertAndSend( + String.format("%s%s", TOPIC_TEMPLATE, roomId), + new Message(HtmlUtils.htmlEscape(message.messageStr()))); + template.convertAndSend( + String.format("%s%s", TOPIC_TEMPLATE, SECRET_ROOM_ID), + new Message(HtmlUtils.htmlEscape(message.messageStr()))); + } + } } @EventListener @@ -59,11 +71,20 @@ public void handleSessionSubscribeEvent(SessionSubscribeEvent event) { } logger.info("subscription for:{}, roomId:{}, user:{}", simpDestination, roomId, principal.getName()); // /user/f6532733-51db-4d0e-bd00-1267dddc7b21/topic/response.1 - getMessagesByRoomId(roomId) + + getMessagesFlux(roomId) .doOnError(ex -> logger.error("getting messages for roomId:{} failed", roomId, ex)) .subscribe(message -> template.convertAndSendToUser(principal.getName(), simpDestination, message)); } + private Flux getMessagesFlux(final long roomId) { + if (String.valueOf(roomId).equals(SECRET_ROOM_ID)) { + return getAllMessages(); + } else { + return getMessagesByRoomId(roomId); + } + } + private long parseRoomId(String simpDestination) { try { var idxRoom = simpDestination.lastIndexOf(TOPIC_TEMPLATE); @@ -96,4 +117,18 @@ private Flux getMessagesByRoomId(long roomId) { } }); } + + private Flux getAllMessages() { + return datastoreClient + .get() + .uri("/msg/all") + .accept(MediaType.APPLICATION_NDJSON) + .exchangeToFlux(response -> { + if (response.statusCode().equals(HttpStatus.OK)) { + return response.bodyToFlux(Message.class); + } else { + return response.createException().flatMapMany(Mono::error); + } + }); + } } diff --git a/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/api/DataController.java b/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/api/DataController.java index 8202958..bfa2ebc 100755 --- a/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/api/DataController.java +++ b/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/api/DataController.java @@ -51,4 +51,16 @@ public Flux getMessagesByRoomId(@PathVariable("roomId") String roomI .doOnNext(msgDto -> log.info("msgDto:{}", msgDto)) .subscribeOn(workerPool); } + + @GetMapping(value = "/msg/all", produces = MediaType.APPLICATION_NDJSON_VALUE) + public Flux getAllMessages() { + log.info("getAllMessages"); + + return Mono.just("ignored") + .doOnNext(ignored -> log.info("getAllMessages")) + .flatMapMany(ignored -> dataStore.loadAllMessages()) + .map(message -> new MessageDto(message.msgText())) + .doOnNext(msgDto -> log.info("msgDto:{}", msgDto)) + .subscribeOn(workerPool); + } } diff --git a/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/repository/MessageRepository.java b/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/repository/MessageRepository.java index 7be4ec6..e44e4b0 100755 --- a/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/repository/MessageRepository.java +++ b/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/repository/MessageRepository.java @@ -10,4 +10,6 @@ public interface MessageRepository extends ReactiveCrudRepository @Query("select * from message where room_id = :room_id order by id") Flux findByRoomId(@Param("roomId") String roomId); + + Flux findAllByOrderByIdAsc(); } diff --git a/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/service/DataStore.java b/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/service/DataStore.java index 1509963..259a770 100755 --- a/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/service/DataStore.java +++ b/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/service/DataStore.java @@ -9,4 +9,6 @@ public interface DataStore { Mono saveMessage(Message message); Flux loadMessages(String roomId); + + Flux loadAllMessages(); } diff --git a/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/service/DataStoreR2dbc.java b/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/service/DataStoreR2dbc.java index 2c5ff2c..2ba4053 100755 --- a/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/service/DataStoreR2dbc.java +++ b/L38-webflux-chat/datastore-service/src/main/java/ru/petrelevich/service/DataStoreR2dbc.java @@ -1,6 +1,6 @@ package ru.petrelevich.service; -import static java.time.temporal.ChronoUnit.SECONDS; +import static java.time.temporal.ChronoUnit.MILLIS; import java.time.Duration; import org.slf4j.Logger; @@ -32,6 +32,11 @@ public Mono saveMessage(Message message) { @Override public Flux loadMessages(String roomId) { log.info("loadMessages roomId:{}", roomId); - return messageRepository.findByRoomId(roomId).delayElements(Duration.of(3, SECONDS), workerPool); + return messageRepository.findByRoomId(roomId).delayElements(Duration.of(300, MILLIS), workerPool); + } + + @Override + public Flux loadAllMessages() { + return messageRepository.findAllByOrderByIdAsc().delayElements(Duration.of(300, MILLIS), workerPool); } }