Skip to content

Commit 7e3920e

Browse files
committed
fix NotificationBehavior for WS, update recipients
1 parent 73288d1 commit 7e3920e

File tree

6 files changed

+111
-70
lines changed

6 files changed

+111
-70
lines changed

common/src/main/protobuf/model/notifications.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ message Push{
140140

141141
message Ws{
142142
option (scalapb.message).extends = "ProtobufDomainObject";
143-
option (scalapb.message).extends = "Notification";
143+
option (scalapb.message).extends = "WsDecorator";
144144
required string uuid = 1;
145145
required google.protobuf.Timestamp createdDate = 2 [(scalapb.field).type = "java.time.Instant"];
146146
required google.protobuf.Timestamp lastUpdated = 3 [(scalapb.field).type = "java.time.Instant"];

common/src/main/scala/app/softnetwork/notification/model/Notification.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package app.softnetwork.notification.model
22

3+
import app.softnetwork.notification.spi.WsChannels
34
import app.softnetwork.persistence.model.State
45

56
import java.time.Instant
@@ -25,9 +26,9 @@ trait Notification extends State with NotificationDecorator {
2526

2627
def results: Seq[NotificationStatusResult]
2728

28-
def removeOnSuccess: Option[Boolean] = None
29+
def removeOnSuccess(): Option[Boolean] = None
2930

30-
def removeAfterMaxTries: Option[Boolean] = None
31+
def removeAfterMaxTries(): Option[Boolean] = None
3132
}
3233

3334
trait NotificationDecorator { _: Notification =>
@@ -56,6 +57,15 @@ trait NotificationDecorator { _: Notification =>
5657
)
5758
)
5859
.withLastUpdated(ack.date)
60+
61+
def recipients(): Seq[String] = to
62+
63+
def recipientsAsString(): String =
64+
if (recipients().isEmpty) {
65+
"__unknown__"
66+
} else {
67+
recipients().mkString(",")
68+
}
5969
}
6070

6171
trait NotificationAckDecorator { _: NotificationAck =>
@@ -80,3 +90,11 @@ object NotificationAckDecorator {
8090
}
8191
}
8292
}
93+
94+
trait WsDecorator extends Notification { self: Ws =>
95+
override def recipients(): Seq[String] =
96+
self.channel match {
97+
case Some(channel) => WsChannels.lookupClients(channel).getOrElse(self.to).toSeq
98+
case None => self.to
99+
}
100+
}

common/src/main/scala/app/softnetwork/notification/spi/package.scala

Lines changed: 64 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -147,67 +147,71 @@ package object spi {
147147
def sendWs(ws: Ws)(implicit system: ActorSystem[_]): NotificationAck = {
148148
implicit val ec: ExecutionContext = system.executionContext
149149
val classicSystem: ClassicSystem = system
150-
val updatedWs =
151-
ws.channel match {
152-
case Some(channel) =>
153-
val clients = WsChannels.lookupClients(channel) match {
154-
case Some(clients) if clients.nonEmpty => clients // ++ ws.to
155-
case _ => ws.to
156-
}
157-
ws.withTo(clients.toSeq)
158-
case None => ws
159-
}
160-
val skipped = updatedWs.results
150+
val skipped = ws.results
161151
.filter(result => result.status.isSent || result.status.isDelivered)
162152
.map(_.recipient)
163-
val clients = updatedWs.to.filter(clientId => !skipped.contains(clientId))
153+
val recipients = ws.recipients().filter(recipient => !skipped.contains(recipient))
164154
val results: Seq[Future[NotificationStatusResult]] = {
165-
for (clientId <- clients) yield {
166-
WsClients.lookup(clientId) match {
167-
case Some(actorRef) =>
168-
actorRef ! TextMessage.Strict(updatedWs.message)
169-
Future.successful(
170-
NotificationStatusResult.defaultInstance
171-
.withUuid(updatedWs.uuid)
172-
.withRecipient(clientId)
173-
.withStatus(NotificationStatus.Sent)
174-
)
175-
case None =>
176-
wsClientsDao.lookupKeyValue(clientId) flatMap {
177-
case Some(ref) =>
178-
classicSystem
179-
.actorSelection(ref)
180-
.resolveOne(FiniteDuration(5, "seconds"))
181-
.map(actorRef => {
182-
actorRef ! TextMessage.Strict(ws.message)
183-
NotificationStatusResult.defaultInstance
184-
.withUuid(ws.uuid)
185-
.withRecipient(clientId)
186-
.withStatus(NotificationStatus.Sent)
187-
})
188-
.recover { case e: Throwable =>
189-
wsClientsDao.removeKeyValue(clientId)
190-
Console.err.println(
191-
s"Error while sending notification to client $clientId: ${e.getMessage}"
192-
)
155+
if (recipients.nonEmpty) {
156+
for (recipient <- recipients) yield {
157+
WsClients.lookup(recipient) match {
158+
case Some(actorRef) =>
159+
actorRef ! TextMessage.Strict(ws.message)
160+
Future.successful(
161+
NotificationStatusResult.defaultInstance
162+
.withUuid(ws.uuid)
163+
.withRecipient(recipient)
164+
.withStatus(NotificationStatus.Sent)
165+
)
166+
case None =>
167+
wsClientsDao.lookupKeyValue(recipient) flatMap {
168+
case Some(ref) =>
169+
classicSystem
170+
.actorSelection(ref)
171+
.resolveOne(FiniteDuration(5, "seconds"))
172+
.map(actorRef => {
173+
actorRef ! TextMessage.Strict(ws.message)
174+
NotificationStatusResult.defaultInstance
175+
.withUuid(ws.uuid)
176+
.withRecipient(recipient)
177+
.withStatus(NotificationStatus.Sent)
178+
})
179+
.recover { case e: Throwable =>
180+
wsClientsDao.removeKeyValue(recipient)
181+
Console.err.println(
182+
s"Error while sending notification to client $recipient: ${e.getMessage}"
183+
)
184+
NotificationStatusResult.defaultInstance
185+
.withUuid(ws.uuid)
186+
.withRecipient(recipient)
187+
.withStatus(NotificationStatus.Rejected)
188+
.withError(e.getMessage)
189+
}
190+
case None =>
191+
val error = s"ActorRef for client $recipient not found"
192+
Console.err.println(error)
193+
Future.successful(
193194
NotificationStatusResult.defaultInstance
194195
.withUuid(ws.uuid)
195-
.withRecipient(clientId)
196+
.withRecipient(recipient)
196197
.withStatus(NotificationStatus.Rejected)
197-
.withError(e.getMessage)
198-
}
199-
case None =>
200-
val error = s"ActorRef for client $clientId not found"
201-
Console.err.println(error)
202-
Future.successful(
203-
NotificationStatusResult.defaultInstance
204-
.withUuid(ws.uuid)
205-
.withRecipient(clientId)
206-
.withStatus(NotificationStatus.Rejected)
207-
.withError(error)
208-
)
209-
}
198+
.withError(error)
199+
)
200+
}
201+
}
210202
}
203+
} else {
204+
val error = s"No recipient found for notification ${ws.uuid}"
205+
Console.err.println(error)
206+
Seq(
207+
Future.successful(
208+
NotificationStatusResult.defaultInstance
209+
.withUuid(ws.uuid)
210+
.withRecipient("__unknown__")
211+
.withStatus(NotificationStatus.Rejected)
212+
.withError(error)
213+
)
214+
)
211215
}
212216
}
213217
Future.sequence(results).flatMap(results => Future.successful(results)) complete () match {
@@ -234,10 +238,14 @@ package object spi {
234238

235239
object WsChannels {
236240
private[this] var channels: Map[String, Set[String]] = Map.empty
237-
def addSession(channel: String, session: String): Unit =
241+
def addSession(channel: String, session: String): Unit = {
242+
Console.out.println(s"Adding session $session to channel $channel")
238243
channels += channel -> channels.get(channel).map(_ + session).getOrElse(Set(session))
239-
def removeSession(channel: String, session: String): Unit =
244+
}
245+
def removeSession(channel: String, session: String): Unit = {
246+
Console.out.println(s"Removing session $session from channel $channel")
240247
channels += channel -> channels.get(channel).map(_ - session).getOrElse(Set.empty)
248+
}
241249
def lookupClients(channel: String): Option[Set[String]] = channels
242250
.get(channel)
243251
.map(sessions => sessions.flatMap(session => WsSessions.lookupClients(session)).flatten)

core/src/main/resources/reference.conf

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,7 @@ credentials{
6868
password = "changeit"
6969
password = ${?CREDENTIALS_MAIL_PASSWORD}
7070
}
71-
}
71+
}
72+
73+
akka.http.server.websocket.periodic-keep-alive-max-idle = 1 second
74+
akka.http.server.websocket.periodic-keep-alive-mode = pong

core/src/main/scala/app/softnetwork/notification/persistence/typed/NotificationBehavior.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,10 @@ trait NotificationBehavior[T <: Notification]
327327
Some(
328328
NotificationRecordedEvent.Wrapped.Push(n.copyWithAck(notificationAck).asInstanceOf[Push])
329329
)
330+
case n: Ws =>
331+
Some(
332+
NotificationRecordedEvent.Wrapped.Ws(n.copyWithAck(notificationAck).asInstanceOf[Ws])
333+
)
330334
case _ => None
331335
}) match {
332336
case Some(event) =>
@@ -380,7 +384,7 @@ trait NotificationBehavior[T <: Notification]
380384
persistenceId,
381385
entityId,
382386
status.name,
383-
to.mkString(", ")
387+
recipientsAsString()
384388
)
385389
Some((send(notification), 1))
386390
}
@@ -391,11 +395,11 @@ trait NotificationBehavior[T <: Notification]
391395
Some((NotificationAck(notification.ackUuid, notification.results, Instant.now()), 1))
392396
} else {
393397
log.info(
394-
"Sending {}#{} in {} status to {} recipients",
398+
s"Sending {}#{} in {} status to {} recipients with $nbTries tries",
395399
persistenceId,
396400
entityId,
397401
status.name,
398-
to.mkString(", ")
402+
recipientsAsString()
399403
)
400404
Some((send(notification), 1))
401405
}
@@ -432,22 +436,28 @@ trait NotificationBehavior[T <: Notification]
432436
NotificationRecordedEvent.Wrapped.Push(updatedNotification.asInstanceOf[Push])
433437
)
434438
)
439+
case _: Ws =>
440+
Some(
441+
NotificationRecordedEvent(
442+
NotificationRecordedEvent.Wrapped.Ws(updatedNotification.asInstanceOf[Ws])
443+
)
444+
)
435445
case _ => None
436446
}
437447

438448
val events: List[ExternalSchedulerEvent] =
439449
List(event).flatten ++ {
440450
if (
441451
(updatedNotification.status.isSent || updatedNotification.status.isDelivered)
442-
&& updatedNotification.removeOnSuccess.getOrElse(false)
452+
&& updatedNotification.removeOnSuccess().getOrElse(false)
443453
) {
444454
List(NotificationRemovedEvent(entityId))
445455
} else if (
446456
!updatedNotification.status.isSent
447457
&& !updatedNotification.status.isDelivered
448458
&& updatedNotification.maxTries > 0
449459
&& updatedNotification.nbTries > updatedNotification.maxTries
450-
&& updatedNotification.removeAfterMaxTries.getOrElse(false)
460+
&& updatedNotification.removeAfterMaxTries().getOrElse(false)
451461
) {
452462
List(NotificationRemovedEvent(entityId))
453463
} else {

core/src/main/scala/app/softnetwork/notification/service/NotificationService.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import de.heikoseeberger.akkahttpjson4s.Json4sSupport
2121
import org.json4s.jackson.Serialization
2222
import org.json4s.{jackson, Formats}
2323

24-
import scala.concurrent.ExecutionContext
25-
2624
trait NotificationService[SD <: SessionData with SessionDataDecorator[SD]]
2725
extends Directives
2826
with DefaultComplete
@@ -114,10 +112,14 @@ trait NotificationService[SD <: SessionData with SessionDataDecorator[SD]]
114112
WsSessions.addClient(sessionId, clientId) // add client to session
115113
WsClients.add(clientId, actorRef) // add actor to client
116114
channel match {
117-
case Some(ch) => WsChannels.addSession(ch, sessionId) // add session to channel
118-
case _ =>
115+
case Some(ch) =>
116+
WsChannels.addSession(ch, sessionId) // add session to channel
117+
Console.out.println(
118+
s"Client $clientId connected to channel $channel with session $sessionId"
119+
)
120+
case _ =>
121+
Console.out.println(s"Client $clientId connected with session $sessionId")
119122
}
120-
Console.out.println(s"Client $clientId connected")
121123
actorRef
122124
}
123125

0 commit comments

Comments
 (0)