Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class MessageController {
private final WebClient datastoreClient;
private final SimpMessagingTemplate template;

private final String SECRET_ROOM_ID = "1408";

public MessageController(WebClient datastoreClient, SimpMessagingTemplate template) {
this.datastoreClient = datastoreClient;
this.template = template;
Expand All @@ -34,10 +36,20 @@ public MessageController(WebClient datastoreClient, SimpMessagingTemplate templa
@MessageMapping("/message.{roomId}")
public void getMessage(@DestinationVariable("roomId") String roomId, Message message) {
logger.info("get message:{}, roomId:{}", message, roomId);
saveMessage(roomId, message).subscribe(msgId -> logger.info("message send id:{}", msgId));

template.convertAndSend(
String.format("%s%s", TOPIC_TEMPLATE, roomId), new Message(HtmlUtils.htmlEscape(message.messageStr())));
switch (roomId) {
case SECRET_ROOM_ID -> logger.info("нельзя писать сообщения в комнату %s".formatted(roomId));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут лучше просто if

default -> {
saveMessage(roomId, message).subscribe(msgId -> logger.info("message send id:{}", msgId));

template.convertAndSend(
String.format("%s%s", TOPIC_TEMPLATE, roomId),
new Message(HtmlUtils.htmlEscape(message.messageStr())));
template.convertAndSend(
String.format("%s%s", TOPIC_TEMPLATE, SECRET_ROOM_ID),
new Message(HtmlUtils.htmlEscape(message.messageStr())));
}
}
}

@EventListener
Expand All @@ -59,11 +71,20 @@ public void handleSessionSubscribeEvent(SessionSubscribeEvent event) {
}
logger.info("subscription for:{}, roomId:{}, user:{}", simpDestination, roomId, principal.getName());
// /user/f6532733-51db-4d0e-bd00-1267dddc7b21/topic/response.1
getMessagesByRoomId(roomId)

getMessagesFlux(roomId)
.doOnError(ex -> logger.error("getting messages for roomId:{} failed", roomId, ex))
.subscribe(message -> template.convertAndSendToUser(principal.getName(), simpDestination, message));
}

private Flux<Message> getMessagesFlux(final long roomId) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flux в названии лишний.

if (String.valueOf(roomId).equals(SECRET_ROOM_ID)) {
return getAllMessages();
} else {
return getMessagesByRoomId(roomId);
}
}

private long parseRoomId(String simpDestination) {
try {
var idxRoom = simpDestination.lastIndexOf(TOPIC_TEMPLATE);
Expand Down Expand Up @@ -96,4 +117,18 @@ private Flux<Message> getMessagesByRoomId(long roomId) {
}
});
}

private Flux<Message> getAllMessages() {
return datastoreClient
.get()
.uri("/msg/all")
.accept(MediaType.APPLICATION_NDJSON)
.exchangeToFlux(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
return response.bodyToFlux(Message.class);
} else {
return response.createException().flatMapMany(Mono::error);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,16 @@ public Flux<MessageDto> getMessagesByRoomId(@PathVariable("roomId") String roomI
.doOnNext(msgDto -> log.info("msgDto:{}", msgDto))
.subscribeOn(workerPool);
}

@GetMapping(value = "/msg/all", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<MessageDto> getAllMessages() {
log.info("getAllMessages");

return Mono.just("ignored")
.doOnNext(ignored -> log.info("getAllMessages"))
.flatMapMany(ignored -> dataStore.loadAllMessages())
.map(message -> new MessageDto(message.msgText()))
.doOnNext(msgDto -> log.info("msgDto:{}", msgDto))
.subscribeOn(workerPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ public interface MessageRepository extends ReactiveCrudRepository<Message, Long>

@Query("select * from message where room_id = :room_id order by id")
Flux<Message> findByRoomId(@Param("roomId") String roomId);

Flux<Message> findAllByOrderByIdAsc();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public interface DataStore {
Mono<Message> saveMessage(Message message);

Flux<Message> loadMessages(String roomId);

Flux<Message> loadAllMessages();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ru.petrelevich.service;

import static java.time.temporal.ChronoUnit.SECONDS;
import static java.time.temporal.ChronoUnit.MILLIS;

import java.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -32,6 +32,11 @@ public Mono<Message> saveMessage(Message message) {
@Override
public Flux<Message> loadMessages(String roomId) {
log.info("loadMessages roomId:{}", roomId);
return messageRepository.findByRoomId(roomId).delayElements(Duration.of(3, SECONDS), workerPool);
return messageRepository.findByRoomId(roomId).delayElements(Duration.of(300, MILLIS), workerPool);
}

@Override
public Flux<Message> loadAllMessages() {
return messageRepository.findAllByOrderByIdAsc().delayElements(Duration.of(300, MILLIS), workerPool);
}
}