diff --git a/CHANGELOG.md b/CHANGELOG.md index 06a13beece..dc9ddbec38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Implement energy limit flex options and adapt optimization [#1572](https://github.com/ie3-institute/simona/issues/1572) - Introducing `onePU` as default quantity [#1607](https://github.com/ie3-institute/simona/issues/1607) - Introducing energy demand for warm water heating [#856](https://github.com/ie3-institute/simona/issues/856) +- Added external primary service worker [#1545](https://github.com/ie3-institute/simona/issues/1545) ### Changed - Upgraded `scala2` to `scala3` [#53](https://github.com/ie3-institute/simona/issues/53) diff --git a/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorker.scala b/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorker.scala new file mode 100644 index 0000000000..87acb49948 --- /dev/null +++ b/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorker.scala @@ -0,0 +1,225 @@ +/* + * © 2024. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.service.primary + +import edu.ie3.simona.agent.participant.ParticipantAgent +import edu.ie3.simona.agent.participant.ParticipantAgent.{ + DataProvision, + PrimaryRegistrationSuccessfulMessage, +} +import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection +import edu.ie3.simona.api.ontology.DataMessageFromExt +import edu.ie3.simona.api.ontology.primary.{ + PrimaryDataMessageFromExt, + ProvidePrimaryData, +} +import edu.ie3.simona.exceptions.WeatherServiceException.InvalidRegistrationRequestException +import edu.ie3.simona.exceptions.{InitializationException, ServiceException} +import edu.ie3.simona.ontology.messages.ServiceMessage.{ + PrimaryServiceRegistrationMessage, + ServiceRegistrationMessage, + ServiceResponseMessage, +} +import edu.ie3.simona.service.Data.PrimaryData +import edu.ie3.simona.service.Data.PrimaryData.RichValue +import edu.ie3.simona.service.ServiceStateData.{ + InitializeServiceStateData, + ServiceBaseStateData, +} +import edu.ie3.simona.service.{ExtDataSupport, ServiceStateData, SimonaService} +import org.apache.pekko.actor.typed.ActorRef +import org.apache.pekko.actor.typed.scaladsl.ActorContext +import org.slf4j.Logger + +import java.util.UUID +import scala.jdk.CollectionConverters.MapHasAsScala +import scala.jdk.OptionConverters.RichOptional +import scala.util.{Failure, Success, Try} + +object ExtPrimaryServiceWorker extends SimonaService with ExtDataSupport { + + override type S = ExtPrimaryDataStateData + + final case class ExtPrimaryDataStateData( + extPrimaryData: ExtPrimaryDataConnection, + uuidToActorRef: Map[UUID, ActorRef[ParticipantAgent.Request]] = + Map.empty, // subscribers in SIMONA + extPrimaryDataMessage: Option[PrimaryDataMessageFromExt] = None, + ) extends ServiceBaseStateData + + case class InitExtPrimaryData( + extPrimaryData: ExtPrimaryDataConnection + ) extends InitializeServiceStateData + + override def init( + initServiceData: ServiceStateData.InitializeServiceStateData + )(using log: Logger): Try[(ExtPrimaryDataStateData, Option[Long])] = + initServiceData match { + case InitExtPrimaryData(extPrimaryData) => + val primaryDataInitializedStateData = ExtPrimaryDataStateData( + extPrimaryData + ) + Success( + primaryDataInitializedStateData, + None, + ) + + case invalidData => + Failure( + new InitializationException( + s"Provided init data '${invalidData.getClass.getSimpleName}' for ExtPrimaryService are invalid!" + ) + ) + } + + override protected def handleRegistrationRequest( + registrationMessage: ServiceRegistrationMessage + )(using + serviceStateData: ExtPrimaryDataStateData, + ctx: ActorContext[Message], + ): Try[ExtPrimaryDataStateData] = registrationMessage match { + case PrimaryServiceRegistrationMessage( + requestingActor, + modelUuid, + ) => + Success(handleRegistrationRequest(requestingActor, modelUuid)) + case invalidMessage => + Failure( + InvalidRegistrationRequestException( + s"A primary service provider is not able to handle registration request '$invalidMessage'." + ) + ) + } + + private def handleRegistrationRequest( + agentToBeRegistered: ActorRef[ParticipantAgent.Request], + agentUUID: UUID, + )(using + serviceStateData: ExtPrimaryDataStateData, + ctx: ActorContext[Message], + ): ExtPrimaryDataStateData = { + serviceStateData.uuidToActorRef.get(agentUUID) match { + case None => + // checks if a value class was specified for the agent + val valueClass = serviceStateData.extPrimaryData + .getValueClass(agentUUID) + .toScala + .getOrElse( + throw InvalidRegistrationRequestException( + s"A primary service provider is not able to handle registration request, because there was no value class specified for the agent with id: '$agentUUID'." + ) + ) + + agentToBeRegistered ! PrimaryRegistrationSuccessfulMessage( + ctx.self, + 0L, + PrimaryData.getPrimaryDataExtra(valueClass), + ) + ctx.log.info(s"Successful registration for $agentUUID") + + serviceStateData.copy(uuidToActorRef = + serviceStateData.uuidToActorRef + (agentUUID -> agentToBeRegistered) + ) + + case Some(_) => + // actor is already registered, do nothing + ctx.log.warn( + "Sending actor {} is already registered", + agentToBeRegistered, + ) + serviceStateData + } + } + + override protected def announceInformation( + tick: Long + )(using + serviceStateData: ExtPrimaryDataStateData, + ctx: ActorContext[Message], + ): (ExtPrimaryDataStateData, Option[Long]) = { // We got activated for this tick, so we expect incoming primary data + serviceStateData.extPrimaryDataMessage.getOrElse( + throw ServiceException( + "ExtPrimaryDataService was triggered without ExtPrimaryDataMessage available" + ) + ) match { + case providedPrimaryData: ProvidePrimaryData => + processDataAndAnnounce(tick, providedPrimaryData) + } + } + + private def processDataAndAnnounce( + tick: Long, + primaryDataMessage: ProvidePrimaryData, + )(using + serviceStateData: ExtPrimaryDataStateData, + ctx: ActorContext[Message], + ): ( + ExtPrimaryDataStateData, + Option[Long], + ) = { + ctx.log.debug( + s"Got activation to distribute primaryData = $primaryDataMessage" + ) + + val uuidToAgent = serviceStateData.uuidToActorRef + val maybeNextTick = primaryDataMessage.maybeNextTick.toScala.map(Long2long) + + primaryDataMessage.primaryData.asScala.foreach { case (agentUuid, data) => + data.toPrimaryData match { + case Success(primaryData) => + uuidToAgent.get(agentUuid) match { + case Some(agentRef) => + agentRef ! DataProvision( + tick, + ctx.self, + primaryData, + maybeNextTick, + ) + + case None => + ctx.log.warn( + "A corresponding actor ref for UUID {} could not be found", + agentUuid, + ) + } + + case Failure(exception) => + /* Processing of data failed */ + ctx.log.warn( + "Unable to convert received value to primary data. Skipped that data." + + "\nException: {}", + exception, + ) + } + } + + ( + serviceStateData.copy(extPrimaryDataMessage = None), + None, + ) + } + + override protected def handleDataMessage( + extMsg: DataMessageFromExt + )(using + serviceStateData: ExtPrimaryDataStateData + ): ExtPrimaryDataStateData = { + extMsg match { + case extPrimaryDataMessage: PrimaryDataMessageFromExt => + serviceStateData.copy( + extPrimaryDataMessage = Some(extPrimaryDataMessage) + ) + } + } + + // unused by this service + override protected def handleDataResponseMessage( + extResponseMsg: ServiceResponseMessage + )(implicit + serviceStateData: ExtPrimaryDataStateData + ): ExtPrimaryDataStateData = serviceStateData +} diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala index 3c53b74f5a..6d54cfb8fa 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala @@ -29,6 +29,7 @@ import edu.ie3.datamodel.io.source.{ import edu.ie3.datamodel.models.value.Value import edu.ie3.simona.agent.participant.ParticipantAgent import edu.ie3.simona.agent.participant.ParticipantAgent.RegistrationFailedMessage +import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection import edu.ie3.simona.config.ConfigParams.{SqlParams, TimeStampedCsvParams} import edu.ie3.simona.config.InputConfig.Primary as PrimaryConfig import edu.ie3.simona.exceptions.InitializationException @@ -78,43 +79,51 @@ object PrimaryServiceProxy { type Message = ServiceMessage | Activation /** State data with needed information to initialize this primary service - * provider proxy + * provider proxy. * * @param primaryConfig - * Configuration for the primary source + * Configuration for the primary source. * @param simulationStart - * Simulation time of the first instant in simulation + * Simulation time of the first instant in simulation. + * @param extSimulationData + * Seq: external primary data connections to service references. */ final case class InitPrimaryServiceProxyStateData( primaryConfig: PrimaryConfig, simulationStart: ZonedDateTime, + extSimulationData: Seq[ + (ExtPrimaryDataConnection, ActorRef[ServiceMessage]) + ], ) extends InitializeServiceStateData /** Holding the state of an initialized proxy. * * @param modelToTimeSeries - * Mapping from models' to time series unique identifiers + * Mapping from models' to time series unique identifiers. * @param timeSeriesToSourceRef - * Mapping from time series identifier to [[SourceRef]] + * Mapping from time series identifier to [[SourceRef]]. * @param simulationStart - * Simulation time of the first instant in simulation + * Simulation time of the first instant in simulation. * @param primaryConfig - * The configuration for the sources + * The configuration for the sources. + * @param modelToExtWorker + * Map: participant uuid to external primary service worker. */ final case class PrimaryServiceStateData( modelToTimeSeries: Map[UUID, UUID], timeSeriesToSourceRef: Map[UUID, SourceRef], simulationStart: ZonedDateTime, primaryConfig: PrimaryConfig, + modelToExtWorker: Map[UUID, ActorRef[ServiceMessage]] = Map.empty, ) extends ServiceStateData /** Giving reference to the target time series and source worker. * * @param metaInformation - * Meta information (including column scheme) of the time series + * Meta information (including column scheme) of the time series. * @param worker * Optional reference to an already existing worker providing information - * on that time series + * on that time series. */ final case class SourceRef( metaInformation: IndividualTimeSeriesMetaInformation, @@ -139,7 +148,7 @@ object PrimaryServiceProxy { /** Handle all messages, when the actor isn't initialized, yet. * * @return - * How receiving should be handled with gained insight of myself + * How receiving should be handled with gained insight of myself. */ private def uninitialized( initStateData: InitPrimaryServiceProxyStateData @@ -153,6 +162,7 @@ object PrimaryServiceProxy { prepareStateData( initStateData.primaryConfig, initStateData.simulationStart, + initStateData.extSimulationData, )(using ctx.log) match { case Success(stateData) => scheduler ! Completion(ctx.self) @@ -173,18 +183,21 @@ object PrimaryServiceProxy { /** Prepare the needed state data by building a * [[edu.ie3.datamodel.io.source.TimeSeriesMappingSource]], obtain its - * information and compile them to state data + * information and compile them to state data. * * @param primaryConfig - * Configuration for the primary source + * Configuration for the primary source. * @param simulationStart - * Simulation time of first instant in simulation + * Simulation time of first instant in simulation. * @return - * State data, containing the known model and time series identifiers + * State data, containing the known model and time series identifiers. */ private[service] def prepareStateData( primaryConfig: PrimaryConfig, simulationStart: ZonedDateTime, + extSimulationData: Seq[ + (ExtPrimaryDataConnection, ActorRef[ServiceMessage]) + ], )(using log: Logger): Try[PrimaryServiceStateData] = { val sourceOption = Seq( primaryConfig.sqlParams, @@ -232,11 +245,19 @@ object PrimaryServiceProxy { } } .toMap + + // create the model to ref map + val modelToExtWorker = extSimulationData.flatMap { + case (connection, ref) => + connection.getPrimaryDataAssets.asScala.map(id => id -> ref) + }.toMap + PrimaryServiceStateData( modelToTimeSeries, timeSeriesToSourceRef, simulationStart, primaryConfig, + modelToExtWorker, ) } } @@ -300,9 +321,9 @@ object PrimaryServiceProxy { * needed, new workers are spun off. * * @param stateData - * Representing the current state of the agent + * Representing the current state of the agent. * @return - * Message handling routine + * Message handling routine. */ private[service] def onMessage(stateData: PrimaryServiceStateData)(using scheduler: ActorRef[SchedulerMessage] @@ -312,26 +333,50 @@ object PrimaryServiceProxy { PrimaryServiceRegistrationMessage(requestingActor, modelUuid), ) => /* Try to register for this model */ - stateData.modelToTimeSeries.get(modelUuid) match { - case Some(timeSeriesUuid) => - /* There is a time series apparent for this model, try to get a worker for it */ - val updatedStateData = handleCoveredModel( - modelUuid, - timeSeriesUuid, - stateData, - requestingActor, - )(using scheduler, ctx) + stateData.modelToExtWorker.get(modelUuid) match { + case Some(_) => + /* There is external data apparent for this model */ + stateData.modelToExtWorker.get(modelUuid) match { + case Some(ref) => + ref ! PrimaryServiceRegistrationMessage( + requestingActor, + modelUuid, + ) + case None => + ctx.log.warn( + s"Could not forward the registration message for model with uuid `$modelUuid`" + ) + } - onMessage(updatedStateData) + Behaviors.same case None => ctx.log.debug( - s"There is no time series apparent for the model with uuid '{}'.", + s"There is no external data apparent for the model with uuid '{}'.", modelUuid, ) - requestingActor ! RegistrationFailedMessage(ctx.self) - Behaviors.same + stateData.modelToTimeSeries.get(modelUuid) match { + case Some(timeSeriesUuid) => + /* There is a time series apparent for this model, try to get a worker for it */ + val updatedStateData = handleCoveredModel( + modelUuid, + timeSeriesUuid, + stateData, + requestingActor, + )(using scheduler, ctx) + + onMessage(updatedStateData) + + case None => + ctx.log.debug( + s"There is no time series apparent for the model with uuid '{}'.", + modelUuid, + ) + requestingActor ! RegistrationFailedMessage(ctx.self) + + Behaviors.same + } } case (ctx, unknown) => ctx.log.error( @@ -342,14 +387,14 @@ object PrimaryServiceProxy { /** Handle the registration request for a covered model. First, try to get an * already existing worker for this time series, otherwise spin-off a new - * one, remember it and forward the request + * one, remember it and forward the request. * * @param modelUuid - * Unique identifier of the model + * Unique identifier of the model. * @param timeSeriesUuid - * Unique identifier of the equivalent time series + * Unique identifier of the equivalent time series. * @param stateData - * Current state data of the actor + * Current state data of the actor. */ protected[service] def handleCoveredModel( modelUuid: UUID, @@ -401,16 +446,16 @@ object PrimaryServiceProxy { } /** Instantiate a new [[PrimaryServiceWorker]] and send initialization - * information + * information. * * @param metaInformation - * Meta information (including column scheme) of the time series + * Meta information (including column scheme) of the time series. * @param simulationStart - * The time of the simulation start + * The time of the simulation start. * @param primaryConfig - * Configuration for the primary config + * Configuration for the primary config. * @return - * The [[ActorRef]] to the worker + * The [[ActorRef]] to the worker. */ protected[service] def initializeWorker( metaInformation: IndividualTimeSeriesMetaInformation, @@ -447,12 +492,12 @@ object PrimaryServiceProxy { } /** Build a primary source worker and type it to the foreseen value class to - * come + * come. * * @param timeSeriesUuid - * Uuid of the time series the actor processes + * Uuid of the time series the actor processes. * @return - * The [[ActorRef]] to the spun off actor + * The [[ActorRef]] to the spun off actor. */ private[service] def classToWorkerRef( timeSeriesUuid: String @@ -465,15 +510,16 @@ object PrimaryServiceProxy { timeSeriesUuid, ) - /** Building proper init data for the worker + /** Building proper init data for the worker. * * @param metaInformation - * Meta information (including column scheme) of the time series + * Meta information (including column scheme) of the time series. * @param simulationStart - * The time of the simulation start + * The time of the simulation start. * @param primaryConfig - * Configuration for the primary config + * Configuration for the primary config. * @return + * A try for the init state data. */ private[service] def toInitData[V <: Value]( metaInformation: IndividualTimeSeriesMetaInformation, @@ -538,13 +584,13 @@ object PrimaryServiceProxy { /** Register the worker within the state data. * * @param stateData - * Current state information + * Current state information. * @param timeSeriesUuid - * Unique identifier of the time series, the worker takes care of + * Unique identifier of the time series, the worker takes care of. * @param workerRef - * [[ActorRef]] to the new worker actor + * [[ActorRef]] to the new worker actor. * @return - * The updated state data, that holds reference to the worker + * The updated state data, that holds reference to the worker. */ private def updateStateData( stateData: PrimaryServiceStateData, diff --git a/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala b/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala index 9dfae566ee..74dd8e24e4 100644 --- a/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala +++ b/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala @@ -27,6 +27,8 @@ import edu.ie3.simona.service.em.ExtEmDataService import edu.ie3.simona.service.em.ExtEmDataService.InitExtEmData import edu.ie3.simona.service.ev.ExtEvDataService import edu.ie3.simona.service.ev.ExtEvDataService.InitExtEvData +import edu.ie3.simona.service.primary.ExtPrimaryServiceWorker +import edu.ie3.simona.service.primary.ExtPrimaryServiceWorker.InitExtPrimaryData import edu.ie3.simona.util.SimonaConstants.PRE_INIT_TICK import org.apache.pekko.actor.typed.ActorRef import org.apache.pekko.actor.typed.scaladsl.ActorContext @@ -154,6 +156,20 @@ object ExtSimSetup { val updatedSetupData = connections.foldLeft(extSimSetupData) { case (setupData, connection) => connection match { + case extPrimaryDataConnection: ExtPrimaryDataConnection => + val serviceRef = context.spawn( + ExtPrimaryServiceWorker(scheduler), + "ExtPrimaryDataService_$index", + ) + + setupService( + extPrimaryDataConnection, + serviceRef, + InitExtPrimaryData.apply, + ) + + extSimSetupData.update(extPrimaryDataConnection, serviceRef) + case extEmDataConnection: ExtEmDataConnection => if setupData.emDataService.nonEmpty then { throw ServiceException( diff --git a/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala b/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala index 767a77bc28..e22ce831bb 100644 --- a/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala +++ b/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala @@ -147,6 +147,7 @@ class SimonaStandaloneSetup( InitPrimaryServiceProxyStateData( simonaConfig.simona.input.primary, simulationStart, + extSimSetupData.primaryDataServices, ), ), "primaryServiceProxyAgent", diff --git a/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorkerSpec.scala b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorkerSpec.scala new file mode 100644 index 0000000000..7463c9f018 --- /dev/null +++ b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorkerSpec.scala @@ -0,0 +1,223 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.service.primary + +import com.typesafe.scalalogging.LazyLogging +import edu.ie3.datamodel.models.value.{PValue, Value} +import edu.ie3.simona.agent.participant.ParticipantAgent.{ + DataProvision, + PrimaryRegistrationSuccessfulMessage, + RegistrationSuccessfulMessage, +} +import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection +import edu.ie3.simona.api.ontology.ScheduleDataServiceMessage +import edu.ie3.simona.api.ontology.primary.ProvidePrimaryData +import edu.ie3.simona.api.ontology.simulation.ControlResponseMessageFromExt +import edu.ie3.simona.ontology.messages.SchedulerMessage.{ + Completion, + ScheduleActivation, +} +import edu.ie3.simona.ontology.messages.ServiceMessage.{ + Create, + PrimaryServiceRegistrationMessage, + SecondaryServiceRegistrationMessage, +} +import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage} +import edu.ie3.simona.scheduler.ScheduleLock +import edu.ie3.simona.service.Data.PrimaryData.{ActivePower, ActivePowerExtra} +import edu.ie3.simona.service.primary.ExtPrimaryServiceWorker.InitExtPrimaryData +import edu.ie3.simona.test.common.TestSpawnerTyped +import edu.ie3.simona.util.SimonaConstants.INIT_SIM_TICK +import edu.ie3.util.quantities.QuantityUtils.asKiloWatt +import org.apache.pekko.actor.testkit.typed.scaladsl.{ + ScalaTestWithActorTestKit, + TestProbe, +} +import org.apache.pekko.actor.typed.scaladsl.adapter.TypedActorRefOps +import org.scalatest.PrivateMethodTester +import org.scalatest.matchers.should +import org.scalatest.wordspec.AnyWordSpecLike +import squants.energy.Kilowatts + +import java.util.{Optional, UUID} +import scala.jdk.CollectionConverters.* + +class ExtPrimaryServiceWorkerSpec + extends ScalaTestWithActorTestKit + with AnyWordSpecLike + with should.Matchers + with PrivateMethodTester + with LazyLogging + with TestSpawnerTyped { + + private val scheduler = TestProbe[SchedulerMessage]("scheduler") + private val extSimAdapter = + TestProbe[ControlResponseMessageFromExt]("extSimAdapter") + + private val systemParticipant = TestProbe[Any]("dummySystemParticipant") + + private val validUuid = + UUID.fromString("b73a7e3f-9045-40cd-b518-c11a9a6a1025") + private val invalidUuid = + UUID.fromString("46be1e57-e4ed-4ef7-95f1-b2b321cb2047") + + "An uninitialized external primary data service" must { + + "send correct completion message after initialisation" in { + val extPrimaryDataConnection = new ExtPrimaryDataConnection( + Map(validUuid -> classOf[PValue]).asJava + ) + + val primaryDataService = spawn(ExtPrimaryServiceWorker(scheduler.ref)) + extPrimaryDataConnection.setActorRefs( + primaryDataService, + extSimAdapter.ref, + ) + + val key = + ScheduleLock.singleKey(TSpawner, scheduler.ref, INIT_SIM_TICK) + scheduler + .expectMessageType[ScheduleActivation] // lock activation scheduled + + extPrimaryDataConnection.setActorRefs( + primaryDataService, + extSimAdapter.ref, + ) + + primaryDataService ! Create( + InitExtPrimaryData(extPrimaryDataConnection), + key, + ) + + scheduler.expectMessage( + ScheduleActivation(primaryDataService, INIT_SIM_TICK, Some(key)) + ) + + primaryDataService ! Activation(INIT_SIM_TICK) + scheduler.expectMessage(Completion(primaryDataService)) + } + + "refuse registration for wrong registration request" in { + val schedulerProbe = TestProbe[SchedulerMessage]("schedulerProbe") + + val extPrimaryDataConnection = new ExtPrimaryDataConnection( + Map(validUuid -> classOf[PValue]).asJava + ) + + // we need to create another service, since we want to continue using the other in later tests + val service = spawn(ExtPrimaryServiceWorker(schedulerProbe.ref)) + extPrimaryDataConnection.setActorRefs(service, extSimAdapter.ref) + + val key = + ScheduleLock.singleKey(TSpawner, schedulerProbe.ref, INIT_SIM_TICK) + + service ! Create( + InitExtPrimaryData(extPrimaryDataConnection), + key, + ) + + service ! Activation(INIT_SIM_TICK) + + service ! PrimaryServiceRegistrationMessage( + systemParticipant.ref, + UUID.randomUUID(), + ) + + val deathWatch = createTestProbe("deathWatch") + deathWatch.expectTerminated(service.ref) + } + + "refuse registration for unknown participant uuid" in { + val schedulerProbe = TestProbe[SchedulerMessage]("schedulerProbe") + + val extPrimaryDataConnection = new ExtPrimaryDataConnection( + Map(validUuid -> classOf[PValue]).asJava + ) + + // we need to create another service, since we want to continue using the other in later tests + val service = spawn(ExtPrimaryServiceWorker(schedulerProbe.ref)) + extPrimaryDataConnection.setActorRefs(service, extSimAdapter.ref) + + val key = + ScheduleLock.singleKey(TSpawner, schedulerProbe.ref, INIT_SIM_TICK) + + service ! Create( + InitExtPrimaryData(extPrimaryDataConnection), + key, + ) + + service ! Activation(INIT_SIM_TICK) + + service ! SecondaryServiceRegistrationMessage( + systemParticipant.ref, + invalidUuid, + ) + + val deathWatch = createTestProbe("deathWatch") + deathWatch.expectTerminated(service.ref) + } + } + + "An external primary service actor" should { + + val extPrimaryDataConnection = new ExtPrimaryDataConnection( + Map(validUuid -> classOf[PValue]).asJava + ) + + val serviceRef = spawn(ExtPrimaryServiceWorker(scheduler.ref)) + extPrimaryDataConnection.setActorRefs(serviceRef, extSimAdapter.ref) + + "init the service actor" in { + val key = ScheduleLock.singleKey(TSpawner, scheduler.ref, INIT_SIM_TICK) + scheduler + .expectMessageType[ScheduleActivation] // lock activation scheduled + + serviceRef ! Create(InitExtPrimaryData(extPrimaryDataConnection), key) + + val activationMsg = scheduler.expectMessageType[ScheduleActivation] + activationMsg.tick shouldBe INIT_SIM_TICK + activationMsg.unlockKey shouldBe Some(key) + + serviceRef ! Activation(INIT_SIM_TICK) + scheduler.expectMessage(Completion(serviceRef, None)) + } + + "correctly register a forwarded request" in { + serviceRef ! PrimaryServiceRegistrationMessage( + systemParticipant.ref, + validUuid, + ) + + systemParticipant.expectMessage( + PrimaryRegistrationSuccessfulMessage(serviceRef, 0L, ActivePowerExtra) + ) + } + + "announce primary data correctly" in { + extPrimaryDataConnection.sendExtMsg( + new ProvidePrimaryData( + 0L, + Map(validUuid -> new PValue(10.asKiloWatt)).asJava, + Optional.of(900L), + ) + ) + + extSimAdapter.expectMessage(new ScheduleDataServiceMessage(serviceRef)) + serviceRef ! Activation(0) + + systemParticipant.expectMessage( + DataProvision( + 0L, + serviceRef, + ActivePower(Kilowatts(10)), + Some(900L), + ) + ) + } + + } +} diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala index 68bf753c94..1b9438b9c2 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala @@ -11,6 +11,8 @@ import edu.ie3.datamodel.io.naming.FileNamingStrategy import edu.ie3.datamodel.io.naming.timeseries.ColumnScheme import edu.ie3.datamodel.io.source.TimeSeriesMappingSource import edu.ie3.datamodel.io.source.csv.CsvTimeSeriesMappingSource +import edu.ie3.datamodel.models.value.{PValue, SValue, Value} +import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection import edu.ie3.datamodel.models.value.SValue import edu.ie3.simona.agent.participant.ParticipantAgent import edu.ie3.simona.agent.participant.ParticipantAgent.RegistrationFailedMessage @@ -30,7 +32,11 @@ import edu.ie3.simona.ontology.messages.ServiceMessage.{ PrimaryServiceRegistrationMessage, WorkerRegistrationMessage, } -import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage} +import edu.ie3.simona.ontology.messages.{ + Activation, + SchedulerMessage, + ServiceMessage, +} import edu.ie3.simona.scheduler.ScheduleLock import edu.ie3.simona.service.primary.PrimaryServiceProxy.{ InitPrimaryServiceProxyStateData, @@ -66,6 +72,7 @@ import org.slf4j.{Logger, LoggerFactory} import java.nio.file.{Path, Paths} import java.time.ZonedDateTime import java.util.UUID +import scala.jdk.CollectionConverters.* import scala.language.implicitConversions import scala.util.{Failure, Success} @@ -138,11 +145,26 @@ class PrimaryServiceProxySpec when(m.self).thenReturn(service.ref) m } + private val validExtPrimaryDataService = spawn( + ExtPrimaryServiceWorker(scheduler.ref) + ) + + private val extEntityId = + UUID.fromString("07bbe1aa-1f39-4dfb-b41b-339dec816ec4") + + private val valueMap: Map[UUID, Class[? <: Value]] = Map( + extEntityId -> classOf[PValue] + ) + + private val extPrimaryDataConnection = new ExtPrimaryDataConnection( + valueMap.asJava + ) val initStateData: InitPrimaryServiceProxyStateData = InitPrimaryServiceProxyStateData( validPrimaryConfig, simulationStart, + Seq.empty, ) val proxy: ActorRef[PrimaryServiceProxy.Message] = testKit.spawn(PrimaryServiceProxy(scheduler.ref, initStateData)) @@ -159,6 +181,7 @@ class PrimaryServiceProxySpec PrimaryServiceProxy.prepareStateData( maliciousConfig, simulationStart, + Seq.empty, ) match { case Success(emptyStateData) => emptyStateData.modelToTimeSeries shouldBe Map.empty @@ -182,6 +205,7 @@ class PrimaryServiceProxySpec PrimaryServiceProxy.prepareStateData( maliciousConfig, simulationStart, + Seq.empty, ) match { case Success(_) => fail("Building state data with missing config should fail") @@ -195,6 +219,7 @@ class PrimaryServiceProxySpec PrimaryServiceProxy.prepareStateData( validPrimaryConfig, simulationStart, + Seq.empty, ) match { case Success( PrimaryServiceStateData( @@ -202,6 +227,7 @@ class PrimaryServiceProxySpec timeSeriesToSourceRef, simulationStart, primaryConfig, + _, ) ) => modelToTimeSeries shouldBe Map( @@ -239,6 +265,28 @@ class PrimaryServiceProxySpec ) } } + + "build proxy correctly when there is an external simulation" in { + PrimaryServiceProxy.prepareStateData( + validPrimaryConfig, + simulationStart, + Seq((extPrimaryDataConnection, validExtPrimaryDataService)), + ) match { + case Success( + PrimaryServiceStateData( + _, + _, + _, + _, + extSubscribersToService, + ) + ) => + extSubscribersToService shouldBe Map( + extEntityId -> validExtPrimaryDataService + ) + } + } + } "Sending initialization information to an uninitialized actor" should { @@ -405,6 +453,41 @@ class PrimaryServiceProxySpec key shouldBe None } } + + "forwarding the registration to correct ext worker" in { + val extWorker1 = TestProbe[ServiceMessage]("extWorker1") + val extWorker2 = TestProbe[ServiceMessage]("extWorker2") + + val participant1 = TestProbe[ParticipantAgent.Message]("Participant1").ref + val participant2 = TestProbe[ParticipantAgent.Message]("Participant2").ref + + val extEntityUuid1 = UUID.randomUUID() + val extEntityUuid2 = UUID.randomUUID() + + val stateData = PrimaryServiceStateData( + Map.empty, + Map.empty, + simulationStart, + validPrimaryConfig, + Map(extEntityUuid1 -> extWorker1.ref, extEntityUuid2 -> extWorker2.ref), + ) + + val behavior = BehaviorTestKit(PrimaryServiceProxy.onMessage(stateData)) + + behavior.run( + PrimaryServiceRegistrationMessage(participant1, extEntityUuid1) + ) + extWorker1.expectMessage( + PrimaryServiceRegistrationMessage(participant1, extEntityUuid1) + ) + + behavior.run( + PrimaryServiceRegistrationMessage(participant2, extEntityUuid2) + ) + extWorker2.expectMessage( + PrimaryServiceRegistrationMessage(participant2, extEntityUuid2) + ) + } } private val dummyWorker = @@ -436,6 +519,7 @@ class PrimaryServiceProxySpec timeSeriesToSourceRef, simulationStart, primaryConfig, + _, ) => modelToTimeSeries shouldBe proxyStateData.modelToTimeSeries timeSeriesToSourceRef shouldBe Map( diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySqlIT.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySqlIT.scala index 3534b6ae11..a4d6d24720 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySqlIT.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySqlIT.scala @@ -94,6 +94,7 @@ class PrimaryServiceProxySqlIT sqlParams = Some(sqlParams), ), simulationStart, + Seq.empty, ) testKit.spawn(