Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Implement energy limit flex options and adapt optimization [#1572](https://github.com/ie3-institute/simona/issues/1572)
- Introducing `onePU` as default quantity [#1607](https://github.com/ie3-institute/simona/issues/1607)
- Introducing energy demand for warm water heating [#856](https://github.com/ie3-institute/simona/issues/856)
- Added external primary service worker [#1545](https://github.com/ie3-institute/simona/issues/1545)

### Changed
- Upgraded `scala2` to `scala3` [#53](https://github.com/ie3-institute/simona/issues/53)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* © 2024. TU Dortmund University,
* Institute of Energy Systems, Energy Efficiency and Energy Economics,
* Research group Distribution grid planning and operation
*/

package edu.ie3.simona.service.primary

import edu.ie3.simona.agent.participant.ParticipantAgent
import edu.ie3.simona.agent.participant.ParticipantAgent.{
DataProvision,
PrimaryRegistrationSuccessfulMessage,
}
import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection
import edu.ie3.simona.api.ontology.DataMessageFromExt
import edu.ie3.simona.api.ontology.primary.{
PrimaryDataMessageFromExt,
ProvidePrimaryData,
}
import edu.ie3.simona.exceptions.WeatherServiceException.InvalidRegistrationRequestException
import edu.ie3.simona.exceptions.{InitializationException, ServiceException}
import edu.ie3.simona.ontology.messages.ServiceMessage.{
PrimaryServiceRegistrationMessage,
ServiceRegistrationMessage,
ServiceResponseMessage,
}
import edu.ie3.simona.service.Data.PrimaryData
import edu.ie3.simona.service.Data.PrimaryData.RichValue
import edu.ie3.simona.service.ServiceStateData.{
InitializeServiceStateData,
ServiceBaseStateData,
}
import edu.ie3.simona.service.{ExtDataSupport, ServiceStateData, SimonaService}
import org.apache.pekko.actor.typed.ActorRef
import org.apache.pekko.actor.typed.scaladsl.ActorContext
import org.slf4j.Logger

import java.util.UUID
import scala.jdk.CollectionConverters.MapHasAsScala
import scala.jdk.OptionConverters.RichOptional
import scala.util.{Failure, Success, Try}

object ExtPrimaryServiceWorker extends SimonaService with ExtDataSupport {

override type S = ExtPrimaryDataStateData

final case class ExtPrimaryDataStateData(
extPrimaryData: ExtPrimaryDataConnection,
uuidToActorRef: Map[UUID, ActorRef[ParticipantAgent.Request]] =
Map.empty, // subscribers in SIMONA
extPrimaryDataMessage: Option[PrimaryDataMessageFromExt] = None,
) extends ServiceBaseStateData

case class InitExtPrimaryData(
extPrimaryData: ExtPrimaryDataConnection
) extends InitializeServiceStateData

override def init(
initServiceData: ServiceStateData.InitializeServiceStateData
)(using log: Logger): Try[(ExtPrimaryDataStateData, Option[Long])] =
initServiceData match {
case InitExtPrimaryData(extPrimaryData) =>
val primaryDataInitializedStateData = ExtPrimaryDataStateData(
extPrimaryData
)
Success(
primaryDataInitializedStateData,
None,
)

case invalidData =>
Failure(
new InitializationException(
s"Provided init data '${invalidData.getClass.getSimpleName}' for ExtPrimaryService are invalid!"
)
)
}

override protected def handleRegistrationRequest(
registrationMessage: ServiceRegistrationMessage
)(using
serviceStateData: ExtPrimaryDataStateData,
ctx: ActorContext[Message],
): Try[ExtPrimaryDataStateData] = registrationMessage match {
case PrimaryServiceRegistrationMessage(
requestingActor,
modelUuid,
) =>
Success(handleRegistrationRequest(requestingActor, modelUuid))
case invalidMessage =>
Failure(
InvalidRegistrationRequestException(
s"A primary service provider is not able to handle registration request '$invalidMessage'."
)
)
}

private def handleRegistrationRequest(
agentToBeRegistered: ActorRef[ParticipantAgent.Request],
agentUUID: UUID,
)(using
serviceStateData: ExtPrimaryDataStateData,
ctx: ActorContext[Message],
): ExtPrimaryDataStateData = {
serviceStateData.uuidToActorRef.get(agentUUID) match {
case None =>
// checks if a value class was specified for the agent
val valueClass = serviceStateData.extPrimaryData
.getValueClass(agentUUID)
.toScala
.getOrElse(
throw InvalidRegistrationRequestException(
s"A primary service provider is not able to handle registration request, because there was no value class specified for the agent with id: '$agentUUID'."
)
)

agentToBeRegistered ! PrimaryRegistrationSuccessfulMessage(
ctx.self,
0L,
PrimaryData.getPrimaryDataExtra(valueClass),
)
ctx.log.info(s"Successful registration for $agentUUID")

serviceStateData.copy(uuidToActorRef =
serviceStateData.uuidToActorRef + (agentUUID -> agentToBeRegistered)
)

case Some(_) =>
// actor is already registered, do nothing
ctx.log.warn(
"Sending actor {} is already registered",
agentToBeRegistered,
)
serviceStateData
}
}

override protected def announceInformation(
tick: Long
)(using
serviceStateData: ExtPrimaryDataStateData,
ctx: ActorContext[Message],
): (ExtPrimaryDataStateData, Option[Long]) = { // We got activated for this tick, so we expect incoming primary data
serviceStateData.extPrimaryDataMessage.getOrElse(
throw ServiceException(
"ExtPrimaryDataService was triggered without ExtPrimaryDataMessage available"
)
) match {
case providedPrimaryData: ProvidePrimaryData =>
processDataAndAnnounce(tick, providedPrimaryData)
}
}

private def processDataAndAnnounce(
tick: Long,
primaryDataMessage: ProvidePrimaryData,
)(using
serviceStateData: ExtPrimaryDataStateData,
ctx: ActorContext[Message],
): (
ExtPrimaryDataStateData,
Option[Long],
) = {
ctx.log.debug(
s"Got activation to distribute primaryData = $primaryDataMessage"
)

val uuidToAgent = serviceStateData.uuidToActorRef
val maybeNextTick = primaryDataMessage.maybeNextTick.toScala.map(Long2long)

primaryDataMessage.primaryData.asScala.foreach { case (agentUuid, data) =>
data.toPrimaryData match {
case Success(primaryData) =>
uuidToAgent.get(agentUuid) match {
case Some(agentRef) =>
agentRef ! DataProvision(
tick,
ctx.self,
primaryData,
maybeNextTick,
)

case None =>
ctx.log.warn(
"A corresponding actor ref for UUID {} could not be found",
agentUuid,
)
}

case Failure(exception) =>
/* Processing of data failed */
ctx.log.warn(
"Unable to convert received value to primary data. Skipped that data." +
"\nException: {}",
exception,
)
}
}

(
serviceStateData.copy(extPrimaryDataMessage = None),
None,
)
}

override protected def handleDataMessage(
extMsg: DataMessageFromExt
)(using
serviceStateData: ExtPrimaryDataStateData
): ExtPrimaryDataStateData = {
extMsg match {
case extPrimaryDataMessage: PrimaryDataMessageFromExt =>
serviceStateData.copy(
extPrimaryDataMessage = Some(extPrimaryDataMessage)
)
}
}

// unused by this service
override protected def handleDataResponseMessage(
extResponseMsg: ServiceResponseMessage
)(implicit
serviceStateData: ExtPrimaryDataStateData
): ExtPrimaryDataStateData = serviceStateData
}
Loading