From c45e652c76616e3aea172c08f7c5ec3474523ed4 Mon Sep 17 00:00:00 2001 From: staudtMarius Date: Mon, 22 Sep 2025 14:51:29 +0200 Subject: [PATCH 1/5] Added external primary service worker --- CHANGELOG.md | 1 + .../primary/ExtPrimaryDataService.scala | 244 ++++++++++++++++++ .../service/primary/PrimaryServiceProxy.scala | 95 +++++-- .../ie3/simona/sim/setup/ExtSimSetup.scala | 21 +- .../sim/setup/SimonaStandaloneSetup.scala | 1 + .../primary/ExtPrimaryDataServiceSpec.scala | 140 ++++++++++ .../primary/PrimaryServiceProxySpec.scala | 45 ++++ .../primary/PrimaryServiceProxySqlIT.scala | 1 + 8 files changed, 528 insertions(+), 20 deletions(-) create mode 100644 src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala create mode 100644 src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala diff --git a/CHANGELOG.md b/CHANGELOG.md index 908cd30e6e..3959ba3719 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Implement weather forecast provision by WeatherService [#1512](https://github.com/ie3-institute/simona/issues/1512) - Introduce optimizing EM strategy [#1500](https://github.com/ie3-institute/simona/issues/1500) - Introduce `DomesticHotWaterStorage` [#1481](https://github.com/ie3-institute/simona/issues/1481) +- Added external primary service worker [#1545](https://github.com/ie3-institute/simona/issues/1545) ### Changed - Upgraded `scala2` to `scala3` [#53](https://github.com/ie3-institute/simona/issues/53) diff --git a/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala b/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala new file mode 100644 index 0000000000..6771615c6c --- /dev/null +++ b/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala @@ -0,0 +1,244 @@ +/* + * © 2024. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.service.primary + +import edu.ie3.simona.agent.participant.ParticipantAgent +import edu.ie3.simona.agent.participant.ParticipantAgent.{ + DataProvision, + PrimaryRegistrationSuccessfulMessage, +} +import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection +import edu.ie3.simona.api.ontology.DataMessageFromExt +import edu.ie3.simona.api.ontology.primary.{ + PrimaryDataMessageFromExt, + ProvidePrimaryData, +} +import edu.ie3.simona.exceptions.WeatherServiceException.InvalidRegistrationRequestException +import edu.ie3.simona.exceptions.{InitializationException, ServiceException} +import edu.ie3.simona.ontology.messages.ServiceMessage.{ + PrimaryServiceRegistrationMessage, + ServiceRegistrationMessage, + ServiceResponseMessage, +} +import edu.ie3.simona.service.Data.PrimaryData +import edu.ie3.simona.service.Data.PrimaryData.RichValue +import edu.ie3.simona.service.ServiceStateData.{ + InitializeServiceStateData, + ServiceBaseStateData, +} +import edu.ie3.simona.service.{ExtDataSupport, ServiceStateData, SimonaService} +import org.apache.pekko.actor.typed.ActorRef +import org.apache.pekko.actor.typed.scaladsl.ActorContext +import org.slf4j.Logger + +import java.util.UUID +import scala.jdk.CollectionConverters.MapHasAsScala +import scala.jdk.OptionConverters.RichOptional +import scala.util.{Failure, Success, Try} + +object ExtPrimaryDataService extends SimonaService with ExtDataSupport { + + override type S = ExtPrimaryDataStateData + + final case class ExtPrimaryDataStateData( + extPrimaryData: ExtPrimaryDataConnection, + subscribers: List[UUID] = List.empty, + uuidToActorRef: Map[UUID, ActorRef[ParticipantAgent.Request]] = + Map.empty, // subscribers in SIMONA + extPrimaryDataMessage: Option[PrimaryDataMessageFromExt] = None, + maybeNextTick: Option[Long] = None, + ) extends ServiceBaseStateData + + case class InitExtPrimaryData( + extPrimaryData: ExtPrimaryDataConnection + ) extends InitializeServiceStateData + + override def init( + initServiceData: ServiceStateData.InitializeServiceStateData + )(using log: Logger): Try[(ExtPrimaryDataStateData, Option[Long])] = + initServiceData match { + case InitExtPrimaryData(extPrimaryData) => + val primaryDataInitializedStateData = ExtPrimaryDataStateData( + extPrimaryData + ) + Success( + primaryDataInitializedStateData, + None, + ) + + case invalidData => + Failure( + new InitializationException( + s"Provided init data '${invalidData.getClass.getSimpleName}' for ExtPrimaryService are invalid!" + ) + ) + } + + override protected def handleRegistrationRequest( + registrationMessage: ServiceRegistrationMessage + )(using + serviceStateData: ExtPrimaryDataStateData, + ctx: ActorContext[Message], + ): Try[ExtPrimaryDataStateData] = registrationMessage match { + case PrimaryServiceRegistrationMessage( + requestingActor, + modelUuid, + ) => + Success(handleRegistrationRequest(requestingActor, modelUuid)) + case invalidMessage => + Failure( + InvalidRegistrationRequestException( + s"A primary service provider is not able to handle registration request '$invalidMessage'." + ) + ) + } + + private def handleRegistrationRequest( + agentToBeRegistered: ActorRef[ParticipantAgent.Request], + agentUUID: UUID, + )(using + serviceStateData: ExtPrimaryDataStateData, + ctx: ActorContext[Message], + ): ExtPrimaryDataStateData = { + serviceStateData.uuidToActorRef.get(agentUUID) match { + case None => + // checks if a value class was specified for the agent + val valueClass = serviceStateData.extPrimaryData + .getValueClass(agentUUID) + .toScala + .getOrElse( + throw InvalidRegistrationRequestException( + s"A primary service provider is not able to handle registration request, because there was no value class specified for the agent with id: '$agentUUID'." + ) + ) + + agentToBeRegistered ! PrimaryRegistrationSuccessfulMessage( + ctx.self, + 0L, + PrimaryData.getPrimaryDataExtra(valueClass), + ) + ctx.log.info(s"Successful registration for $agentUUID") + + serviceStateData.copy( + subscribers = serviceStateData.subscribers :+ agentUUID, + uuidToActorRef = + serviceStateData.uuidToActorRef + (agentUUID -> agentToBeRegistered), + ) + + case Some(_) => + // actor is already registered, do nothing + ctx.log.warn( + "Sending actor {} is already registered", + agentToBeRegistered, + ) + serviceStateData + } + } + + /** Send out the information to all registered recipients + * + * @param tick + * current tick data should be announced for + * @param serviceStateData + * the current state data of this service + * @return + * the service stata data that should be used in the next state (normally + * with updated values) together with the completion message that is send + * in response to the trigger that was sent to start this announcement + */ + override protected def announceInformation( + tick: Long + )(using + serviceStateData: ExtPrimaryDataStateData, + ctx: ActorContext[Message], + ): (ExtPrimaryDataStateData, Option[Long]) = { // We got activated for this tick, so we expect incoming primary data + serviceStateData.extPrimaryDataMessage.getOrElse( + throw ServiceException( + "ExtPrimaryDataService was triggered without ExtPrimaryDataMessage available" + ) + ) match { + case providedPrimaryData: ProvidePrimaryData => + processDataAndAnnounce(tick, providedPrimaryData) + } + } + + private def processDataAndAnnounce( + tick: Long, + primaryDataMessage: ProvidePrimaryData, + )(using + serviceStateData: ExtPrimaryDataStateData, + ctx: ActorContext[Message], + ): ( + ExtPrimaryDataStateData, + Option[Long], + ) = { + ctx.log.debug( + s"Got activation to distribute primaryData = $primaryDataMessage" + ) + val actorToPrimaryData = primaryDataMessage.primaryData.asScala.flatMap { + case (agent, primaryDataPerAgent) => + serviceStateData.uuidToActorRef + .get(agent) + .map((_, primaryDataPerAgent)) + .orElse { + ctx.log.warn( + "A corresponding actor ref for UUID {} could not be found", + agent, + ) + None + } + } + + val maybeNextTick = primaryDataMessage.maybeNextTick.toScala.map(Long2long) + + // Distribute Primary Data + if actorToPrimaryData.nonEmpty then { + actorToPrimaryData.foreach { case (actor, value) => + value.toPrimaryData match { + case Success(primaryData) => + actor ! DataProvision( + tick, + ctx.self, + primaryData, + maybeNextTick, + ) + case Failure(exception) => + /* Processing of data failed */ + ctx.log.warn( + "Unable to convert received value to primary data. Skipped that data." + + "\nException: {}", + exception, + ) + } + } + } + + ( + serviceStateData.copy(extPrimaryDataMessage = None), + None, + ) + } + + override protected def handleDataMessage( + extMsg: DataMessageFromExt + )(using + serviceStateData: ExtPrimaryDataStateData + ): ExtPrimaryDataStateData = { + extMsg match { + case extPrimaryDataMessage: PrimaryDataMessageFromExt => + serviceStateData.copy( + extPrimaryDataMessage = Some(extPrimaryDataMessage) + ) + } + } + + override protected def handleDataResponseMessage( + extResponseMsg: ServiceResponseMessage + )(implicit + serviceStateData: ExtPrimaryDataStateData + ): ExtPrimaryDataStateData = serviceStateData +} diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala index 3c53b74f5a..8c7a50c9b1 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala @@ -27,11 +27,17 @@ import edu.ie3.datamodel.io.source.{ TimeSeriesMetaInformationSource, } import edu.ie3.datamodel.models.value.Value +import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection import edu.ie3.simona.agent.participant.ParticipantAgent import edu.ie3.simona.agent.participant.ParticipantAgent.RegistrationFailedMessage import edu.ie3.simona.config.ConfigParams.{SqlParams, TimeStampedCsvParams} import edu.ie3.simona.config.InputConfig.Primary as PrimaryConfig import edu.ie3.simona.exceptions.InitializationException +import edu.ie3.simona.exceptions.{ + InitializationException, + InvalidConfigParameterException, +} +import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage} import edu.ie3.simona.ontology.messages.SchedulerMessage.{ Completion, ScheduleActivation, @@ -88,6 +94,9 @@ object PrimaryServiceProxy { final case class InitPrimaryServiceProxyStateData( primaryConfig: PrimaryConfig, simulationStart: ZonedDateTime, + extSimulationData: Seq[ + (ExtPrimaryDataConnection, ActorRef[ServiceMessage]) + ], ) extends InitializeServiceStateData /** Holding the state of an initialized proxy. @@ -106,6 +115,7 @@ object PrimaryServiceProxy { timeSeriesToSourceRef: Map[UUID, SourceRef], simulationStart: ZonedDateTime, primaryConfig: PrimaryConfig, + extSubscribersToService: Map[UUID, ActorRef[ServiceMessage]] = Map.empty, ) extends ServiceStateData /** Giving reference to the target time series and source worker. @@ -153,6 +163,7 @@ object PrimaryServiceProxy { prepareStateData( initStateData.primaryConfig, initStateData.simulationStart, + initStateData.extSimulationData, )(using ctx.log) match { case Success(stateData) => scheduler ! Completion(ctx.self) @@ -185,6 +196,9 @@ object PrimaryServiceProxy { private[service] def prepareStateData( primaryConfig: PrimaryConfig, simulationStart: ZonedDateTime, + extSimulationData: Seq[ + (ExtPrimaryDataConnection, ActorRef[ServiceMessage]) + ], )(using log: Logger): Try[PrimaryServiceStateData] = { val sourceOption = Seq( primaryConfig.sqlParams, @@ -232,12 +246,28 @@ object PrimaryServiceProxy { } } .toMap - PrimaryServiceStateData( - modelToTimeSeries, - timeSeriesToSourceRef, - simulationStart, - primaryConfig, - ) + if extSimulationData.nonEmpty then { + val extSubscribersToService = extSimulationData.flatMap { + case (connection, ref) => + connection.getPrimaryDataAssets.asScala.map(id => id -> ref) + } + + // Ask ExtPrimaryDataService which UUIDs should be substituted + PrimaryServiceStateData( + modelToTimeSeries, + timeSeriesToSourceRef, + simulationStart, + primaryConfig, + extSubscribersToService.toMap, + ) + } else { + PrimaryServiceStateData( + modelToTimeSeries, + timeSeriesToSourceRef, + simulationStart, + primaryConfig, + ) + } } } @@ -312,26 +342,40 @@ object PrimaryServiceProxy { PrimaryServiceRegistrationMessage(requestingActor, modelUuid), ) => /* Try to register for this model */ - stateData.modelToTimeSeries.get(modelUuid) match { - case Some(timeSeriesUuid) => - /* There is a time series apparent for this model, try to get a worker for it */ - val updatedStateData = handleCoveredModel( - modelUuid, - timeSeriesUuid, - stateData, - requestingActor, - )(using scheduler, ctx) + stateData.extSubscribersToService.get(modelUuid) match { + case Some(_) => + /* There is external data apparent for this model */ + handleExternalModel(modelUuid, stateData, requestingActor) - onMessage(updatedStateData) + Behaviors.same case None => ctx.log.debug( - s"There is no time series apparent for the model with uuid '{}'.", + s"There is no external data apparent for the model with uuid '{}'.", modelUuid, ) - requestingActor ! RegistrationFailedMessage(ctx.self) - Behaviors.same + stateData.modelToTimeSeries.get(modelUuid) match { + case Some(timeSeriesUuid) => + /* There is a time series apparent for this model, try to get a worker for it */ + val updatedStateData = handleCoveredModel( + modelUuid, + timeSeriesUuid, + stateData, + requestingActor, + )(using scheduler, ctx) + + onMessage(updatedStateData) + + case None => + ctx.log.debug( + s"There is no time series apparent for the model with uuid '{}'.", + modelUuid, + ) + requestingActor ! RegistrationFailedMessage(ctx.self) + + Behaviors.same + } } case (ctx, unknown) => ctx.log.error( @@ -400,6 +444,19 @@ object PrimaryServiceProxy { } } + protected def handleExternalModel( + modelUuid: UUID, + stateData: PrimaryServiceStateData, + requestingActor: ActorRef[ParticipantAgent.Request], + ): Unit = { + stateData.extSubscribersToService.foreach { case (_, ref) => + ref ! PrimaryServiceRegistrationMessage( + requestingActor, + modelUuid, + ) + } + } + /** Instantiate a new [[PrimaryServiceWorker]] and send initialization * information * diff --git a/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala b/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala index 4563b5e88e..1a60111584 100644 --- a/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala +++ b/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala @@ -21,6 +21,8 @@ import edu.ie3.simona.scheduler.ScheduleLock import edu.ie3.simona.service.ServiceStateData.InitializeServiceStateData import edu.ie3.simona.service.ev.ExtEvDataService import edu.ie3.simona.service.ev.ExtEvDataService.InitExtEvData +import edu.ie3.simona.service.primary.ExtPrimaryDataService +import edu.ie3.simona.service.primary.ExtPrimaryDataService.InitExtPrimaryData import edu.ie3.simona.util.SimonaConstants.PRE_INIT_TICK import org.apache.pekko.actor.typed.ActorRef import org.apache.pekko.actor.typed.scaladsl.ActorContext @@ -79,7 +81,7 @@ object ExtSimSetup { ) // setup data services that belong to this external simulation - val updatedSetupData = connect(extSimulation, extSimSetupData) + val updatedSetupData = connect(extSimulation, extSimSetupData, index) // starting external simulation new Thread(extSimulation, s"External simulation $index") @@ -104,6 +106,8 @@ object ExtSimSetup { * To connect. * @param extSimSetupData * That contains information about all external simulations. + * @param index + * Index of the external link interface. * @param context * The actor context of this actor system. * @param scheduler @@ -116,6 +120,7 @@ object ExtSimSetup { private[setup] def connect( extSimulation: ExtSimulation, extSimSetupData: ExtSimSetupData, + index: Int, )(using context: ActorContext[?], scheduler: ActorRef[SchedulerMessage], @@ -134,6 +139,20 @@ object ExtSimSetup { val updatedSetupData = connections.foldLeft(extSimSetupData) { case (setupData, connection) => connection match { + case extPrimaryDataConnection: ExtPrimaryDataConnection => + val serviceRef = context.spawn( + ExtPrimaryDataService(scheduler), + "ExtPrimaryDataService_$index", + ) + + setupService( + extPrimaryDataConnection, + serviceRef, + InitExtPrimaryData.apply, + ) + + extSimSetupData.update(extPrimaryDataConnection, serviceRef) + case extEvDataConnection: ExtEvDataConnection => if setupData.evDataConnection.nonEmpty then { throw ServiceException( diff --git a/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala b/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala index 854094e44d..19d144eb59 100644 --- a/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala +++ b/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala @@ -156,6 +156,7 @@ class SimonaStandaloneSetup( InitPrimaryServiceProxyStateData( simonaConfig.simona.input.primary, simulationStart, + extSimSetupData.extPrimaryDataServices, ), ), "primaryServiceProxyAgent", diff --git a/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala new file mode 100644 index 0000000000..eb42b3a9ca --- /dev/null +++ b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala @@ -0,0 +1,140 @@ +/* + * © 2021. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.service.primary + +import com.typesafe.scalalogging.LazyLogging +import edu.ie3.datamodel.models.value.Value +import edu.ie3.simona.agent.participant.ParticipantAgent.{ + DataProvision, + RegistrationSuccessfulMessage, +} +import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection +import edu.ie3.simona.api.ontology.simulation.ControlResponseMessageFromExt +import edu.ie3.simona.ontology.messages.SchedulerMessage.{ + Completion, + ScheduleActivation, +} +import edu.ie3.simona.ontology.messages.ServiceMessage.{ + Create, + PrimaryServiceRegistrationMessage, + SecondaryServiceRegistrationMessage, +} +import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage} +import edu.ie3.simona.scheduler.ScheduleLock +import edu.ie3.simona.service.primary.ExtPrimaryDataService.InitExtPrimaryData +import edu.ie3.simona.test.common.TestSpawnerTyped +import edu.ie3.simona.util.Coordinate +import edu.ie3.simona.util.SimonaConstants.INIT_SIM_TICK +import org.apache.pekko.actor.testkit.typed.scaladsl.{ + ScalaTestWithActorTestKit, + TestProbe, +} +import org.apache.pekko.actor.typed.scaladsl.adapter.TypedActorRefOps +import org.scalatest.PrivateMethodTester +import org.scalatest.matchers.should +import org.scalatest.wordspec.AnyWordSpecLike + +import java.util.UUID +import scala.jdk.CollectionConverters.* + +class ExtPrimaryDataServiceSpec + extends ScalaTestWithActorTestKit + with AnyWordSpecLike + with should.Matchers + with PrivateMethodTester + with LazyLogging + with TestSpawnerTyped { + + private val scheduler = TestProbe[SchedulerMessage]("scheduler") + private val extSimAdapter = + TestProbe[ControlResponseMessageFromExt]("extSimAdapter") + + private val extPrimaryDataConnection = new ExtPrimaryDataConnection( + Map.empty[UUID, Class[? <: Value]].asJava + ) + + "An uninitialized external primary data service" must { + + "send correct completion message after initialisation" in { + val primaryDataService = spawn(ExtPrimaryDataService(scheduler.ref)) + + val key = + ScheduleLock.singleKey(TSpawner, scheduler.ref, INIT_SIM_TICK) + scheduler + .expectMessageType[ScheduleActivation] // lock activation scheduled + + extPrimaryDataConnection.setActorRefs( + primaryDataService, + extSimAdapter.ref, + ) + + primaryDataService ! Create( + InitExtPrimaryData(extPrimaryDataConnection), + key, + ) + + scheduler.expectMessage( + ScheduleActivation(primaryDataService, INIT_SIM_TICK, Some(key)) + ) + + primaryDataService ! Activation(INIT_SIM_TICK) + scheduler.expectMessage(Completion(primaryDataService)) + } + } + + "An external primary service actor" should { + val primaryDataService = spawn(ExtPrimaryDataService(scheduler.ref)) + val systemParticipant = TestProbe[Any]("dummySystemParticipant") + + "refuse registration for wrong registration request" in { + val schedulerProbe = TestProbe[SchedulerMessage]("schedulerProbe") + + // we need to create another service, since we want to continue using the other in later tests + val service = spawn(ExtPrimaryDataService(schedulerProbe.ref)) + + val key = + ScheduleLock.singleKey(TSpawner, schedulerProbe.ref, INIT_SIM_TICK) + + primaryDataService ! Create( + InitExtPrimaryData(extPrimaryDataConnection), + key, + ) + + service ! Activation(INIT_SIM_TICK) + + service ! SecondaryServiceRegistrationMessage( + systemParticipant.ref, + Coordinate(51.4843281, 7.4116482), + ) + + val deathWatch = createTestProbe("deathWatch") + deathWatch.expectTerminated(service.ref) + } + + "correctly register a forwarded request" ignore { + primaryDataService ! PrimaryServiceRegistrationMessage( + systemParticipant.ref, + UUID.randomUUID(), + ) + + /* Wait for request approval */ + systemParticipant.expectMessage( + RegistrationSuccessfulMessage( + primaryDataService, + 0L, + ) + ) + + /* We cannot directly check, if the requesting actor is among the subscribers, therefore we ask the actor to + * provide data to all subscribed actors and check, if the subscribed probe gets one */ + primaryDataService ! Activation(0) + scheduler.expectMessage(Completion(primaryDataService)) + + systemParticipant.expectMessageType[DataProvision] + } + } +} diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala index 68bf753c94..94ceacd42c 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala @@ -11,6 +11,8 @@ import edu.ie3.datamodel.io.naming.FileNamingStrategy import edu.ie3.datamodel.io.naming.timeseries.ColumnScheme import edu.ie3.datamodel.io.source.TimeSeriesMappingSource import edu.ie3.datamodel.io.source.csv.CsvTimeSeriesMappingSource +import edu.ie3.datamodel.models.value.{PValue, SValue, Value} +import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection import edu.ie3.datamodel.models.value.SValue import edu.ie3.simona.agent.participant.ParticipantAgent import edu.ie3.simona.agent.participant.ParticipantAgent.RegistrationFailedMessage @@ -66,6 +68,7 @@ import org.slf4j.{Logger, LoggerFactory} import java.nio.file.{Path, Paths} import java.time.ZonedDateTime import java.util.UUID +import scala.jdk.CollectionConverters.* import scala.language.implicitConversions import scala.util.{Failure, Success} @@ -138,11 +141,26 @@ class PrimaryServiceProxySpec when(m.self).thenReturn(service.ref) m } + private val validExtPrimaryDataService = spawn( + ExtPrimaryDataService(scheduler.ref) + ) + + private val extEntityId = + UUID.fromString("07bbe1aa-1f39-4dfb-b41b-339dec816ec4") + + private val valueMap: Map[UUID, Class[? <: Value]] = Map( + extEntityId -> classOf[PValue] + ) + + private val extPrimaryDataConnection = new ExtPrimaryDataConnection( + valueMap.asJava + ) val initStateData: InitPrimaryServiceProxyStateData = InitPrimaryServiceProxyStateData( validPrimaryConfig, simulationStart, + Seq.empty, ) val proxy: ActorRef[PrimaryServiceProxy.Message] = testKit.spawn(PrimaryServiceProxy(scheduler.ref, initStateData)) @@ -159,6 +177,7 @@ class PrimaryServiceProxySpec PrimaryServiceProxy.prepareStateData( maliciousConfig, simulationStart, + Seq.empty, ) match { case Success(emptyStateData) => emptyStateData.modelToTimeSeries shouldBe Map.empty @@ -182,6 +201,7 @@ class PrimaryServiceProxySpec PrimaryServiceProxy.prepareStateData( maliciousConfig, simulationStart, + Seq.empty, ) match { case Success(_) => fail("Building state data with missing config should fail") @@ -195,6 +215,7 @@ class PrimaryServiceProxySpec PrimaryServiceProxy.prepareStateData( validPrimaryConfig, simulationStart, + Seq.empty, ) match { case Success( PrimaryServiceStateData( @@ -202,6 +223,7 @@ class PrimaryServiceProxySpec timeSeriesToSourceRef, simulationStart, primaryConfig, + _, ) ) => modelToTimeSeries shouldBe Map( @@ -239,6 +261,28 @@ class PrimaryServiceProxySpec ) } } + + "build proxy correctly when there is an external simulation" in { + PrimaryServiceProxy.prepareStateData( + validPrimaryConfig, + simulationStart, + Seq((extPrimaryDataConnection, validExtPrimaryDataService)), + ) match { + case Success( + PrimaryServiceStateData( + _, + _, + _, + _, + extSubscribersToService, + ) + ) => + extSubscribersToService shouldBe Map( + extEntityId -> validExtPrimaryDataService + ) + } + } + } "Sending initialization information to an uninitialized actor" should { @@ -436,6 +480,7 @@ class PrimaryServiceProxySpec timeSeriesToSourceRef, simulationStart, primaryConfig, + _, ) => modelToTimeSeries shouldBe proxyStateData.modelToTimeSeries timeSeriesToSourceRef shouldBe Map( diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySqlIT.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySqlIT.scala index 3534b6ae11..a4d6d24720 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySqlIT.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySqlIT.scala @@ -94,6 +94,7 @@ class PrimaryServiceProxySqlIT sqlParams = Some(sqlParams), ), simulationStart, + Seq.empty, ) testKit.spawn( From 00b2fc3bfe95cd28043f0bbf2ea4a98ac998b038 Mon Sep 17 00:00:00 2001 From: staudtMarius Date: Tue, 23 Sep 2025 14:57:09 +0200 Subject: [PATCH 2/5] Increase test coverage. --- .../primary/ExtPrimaryDataService.scala | 79 ++++------- .../primary/ExtPrimaryDataServiceSpec.scala | 131 ++++++++++++++---- 2 files changed, 137 insertions(+), 73 deletions(-) diff --git a/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala b/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala index 6771615c6c..d4fe67f992 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala @@ -46,11 +46,9 @@ object ExtPrimaryDataService extends SimonaService with ExtDataSupport { final case class ExtPrimaryDataStateData( extPrimaryData: ExtPrimaryDataConnection, - subscribers: List[UUID] = List.empty, uuidToActorRef: Map[UUID, ActorRef[ParticipantAgent.Request]] = Map.empty, // subscribers in SIMONA extPrimaryDataMessage: Option[PrimaryDataMessageFromExt] = None, - maybeNextTick: Option[Long] = None, ) extends ServiceBaseStateData case class InitExtPrimaryData( @@ -123,10 +121,8 @@ object ExtPrimaryDataService extends SimonaService with ExtDataSupport { ) ctx.log.info(s"Successful registration for $agentUUID") - serviceStateData.copy( - subscribers = serviceStateData.subscribers :+ agentUUID, - uuidToActorRef = - serviceStateData.uuidToActorRef + (agentUUID -> agentToBeRegistered), + serviceStateData.copy(uuidToActorRef = + serviceStateData.uuidToActorRef + (agentUUID -> agentToBeRegistered) ) case Some(_) => @@ -139,17 +135,6 @@ object ExtPrimaryDataService extends SimonaService with ExtDataSupport { } } - /** Send out the information to all registered recipients - * - * @param tick - * current tick data should be announced for - * @param serviceStateData - * the current state data of this service - * @return - * the service stata data that should be used in the next state (normally - * with updated values) together with the completion message that is send - * in response to the trigger that was sent to start this announcement - */ override protected def announceInformation( tick: Long )(using @@ -179,41 +164,36 @@ object ExtPrimaryDataService extends SimonaService with ExtDataSupport { ctx.log.debug( s"Got activation to distribute primaryData = $primaryDataMessage" ) - val actorToPrimaryData = primaryDataMessage.primaryData.asScala.flatMap { - case (agent, primaryDataPerAgent) => - serviceStateData.uuidToActorRef - .get(agent) - .map((_, primaryDataPerAgent)) - .orElse { - ctx.log.warn( - "A corresponding actor ref for UUID {} could not be found", - agent, - ) - None - } - } + val uuidToAgent = serviceStateData.uuidToActorRef val maybeNextTick = primaryDataMessage.maybeNextTick.toScala.map(Long2long) - // Distribute Primary Data - if actorToPrimaryData.nonEmpty then { - actorToPrimaryData.foreach { case (actor, value) => - value.toPrimaryData match { - case Success(primaryData) => - actor ! DataProvision( - tick, - ctx.self, - primaryData, - maybeNextTick, - ) - case Failure(exception) => - /* Processing of data failed */ - ctx.log.warn( - "Unable to convert received value to primary data. Skipped that data." + - "\nException: {}", - exception, - ) - } + primaryDataMessage.primaryData.asScala.foreach { case (agentUuid, data) => + data.toPrimaryData match { + case Success(primaryData) => + uuidToAgent.get(agentUuid) match { + case Some(agentRef) => + agentRef ! DataProvision( + tick, + ctx.self, + primaryData, + maybeNextTick, + ) + + case None => + ctx.log.warn( + "A corresponding actor ref for UUID {} could not be found", + agentUuid, + ) + } + + case Failure(exception) => + /* Processing of data failed */ + ctx.log.warn( + "Unable to convert received value to primary data. Skipped that data." + + "\nException: {}", + exception, + ) } } @@ -236,6 +216,7 @@ object ExtPrimaryDataService extends SimonaService with ExtDataSupport { } } + // unused by this service override protected def handleDataResponseMessage( extResponseMsg: ServiceResponseMessage )(implicit diff --git a/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala index eb42b3a9ca..888b3b473a 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala @@ -7,12 +7,15 @@ package edu.ie3.simona.service.primary import com.typesafe.scalalogging.LazyLogging -import edu.ie3.datamodel.models.value.Value +import edu.ie3.datamodel.models.value.{PValue, Value} import edu.ie3.simona.agent.participant.ParticipantAgent.{ DataProvision, + PrimaryRegistrationSuccessfulMessage, RegistrationSuccessfulMessage, } import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection +import edu.ie3.simona.api.ontology.ScheduleDataServiceMessage +import edu.ie3.simona.api.ontology.primary.ProvidePrimaryData import edu.ie3.simona.api.ontology.simulation.ControlResponseMessageFromExt import edu.ie3.simona.ontology.messages.SchedulerMessage.{ Completion, @@ -25,10 +28,11 @@ import edu.ie3.simona.ontology.messages.ServiceMessage.{ } import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage} import edu.ie3.simona.scheduler.ScheduleLock +import edu.ie3.simona.service.Data.PrimaryData.{ActivePower, ActivePowerExtra} import edu.ie3.simona.service.primary.ExtPrimaryDataService.InitExtPrimaryData import edu.ie3.simona.test.common.TestSpawnerTyped -import edu.ie3.simona.util.Coordinate import edu.ie3.simona.util.SimonaConstants.INIT_SIM_TICK +import edu.ie3.util.quantities.QuantityUtils.asKiloWatt import org.apache.pekko.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe, @@ -37,8 +41,9 @@ import org.apache.pekko.actor.typed.scaladsl.adapter.TypedActorRefOps import org.scalatest.PrivateMethodTester import org.scalatest.matchers.should import org.scalatest.wordspec.AnyWordSpecLike +import squants.energy.Kilowatts -import java.util.UUID +import java.util.{Optional, UUID} import scala.jdk.CollectionConverters.* class ExtPrimaryDataServiceSpec @@ -53,14 +58,25 @@ class ExtPrimaryDataServiceSpec private val extSimAdapter = TestProbe[ControlResponseMessageFromExt]("extSimAdapter") - private val extPrimaryDataConnection = new ExtPrimaryDataConnection( - Map.empty[UUID, Class[? <: Value]].asJava - ) + private val systemParticipant = TestProbe[Any]("dummySystemParticipant") + + private val validUuid = + UUID.fromString("b73a7e3f-9045-40cd-b518-c11a9a6a1025") + private val invalidUuid = + UUID.fromString("46be1e57-e4ed-4ef7-95f1-b2b321cb2047") "An uninitialized external primary data service" must { "send correct completion message after initialisation" in { + val extPrimaryDataConnection = new ExtPrimaryDataConnection( + Map(validUuid -> classOf[PValue]).asJava + ) + val primaryDataService = spawn(ExtPrimaryDataService(scheduler.ref)) + extPrimaryDataConnection.setActorRefs( + primaryDataService, + extSimAdapter.ref, + ) val key = ScheduleLock.singleKey(TSpawner, scheduler.ref, INIT_SIM_TICK) @@ -84,22 +100,52 @@ class ExtPrimaryDataServiceSpec primaryDataService ! Activation(INIT_SIM_TICK) scheduler.expectMessage(Completion(primaryDataService)) } - } - - "An external primary service actor" should { - val primaryDataService = spawn(ExtPrimaryDataService(scheduler.ref)) - val systemParticipant = TestProbe[Any]("dummySystemParticipant") "refuse registration for wrong registration request" in { val schedulerProbe = TestProbe[SchedulerMessage]("schedulerProbe") + val extPrimaryDataConnection = new ExtPrimaryDataConnection( + Map(validUuid -> classOf[PValue]).asJava + ) + // we need to create another service, since we want to continue using the other in later tests val service = spawn(ExtPrimaryDataService(schedulerProbe.ref)) + extPrimaryDataConnection.setActorRefs(service, extSimAdapter.ref) val key = ScheduleLock.singleKey(TSpawner, schedulerProbe.ref, INIT_SIM_TICK) - primaryDataService ! Create( + service ! Create( + InitExtPrimaryData(extPrimaryDataConnection), + key, + ) + + service ! Activation(INIT_SIM_TICK) + + service ! PrimaryServiceRegistrationMessage( + systemParticipant.ref, + UUID.randomUUID(), + ) + + val deathWatch = createTestProbe("deathWatch") + deathWatch.expectTerminated(service.ref) + } + + "refuse registration for unknown participant uuid" in { + val schedulerProbe = TestProbe[SchedulerMessage]("schedulerProbe") + + val extPrimaryDataConnection = new ExtPrimaryDataConnection( + Map(validUuid -> classOf[PValue]).asJava + ) + + // we need to create another service, since we want to continue using the other in later tests + val service = spawn(ExtPrimaryDataService(schedulerProbe.ref)) + extPrimaryDataConnection.setActorRefs(service, extSimAdapter.ref) + + val key = + ScheduleLock.singleKey(TSpawner, schedulerProbe.ref, INIT_SIM_TICK) + + service ! Create( InitExtPrimaryData(extPrimaryDataConnection), key, ) @@ -108,33 +154,70 @@ class ExtPrimaryDataServiceSpec service ! SecondaryServiceRegistrationMessage( systemParticipant.ref, - Coordinate(51.4843281, 7.4116482), + invalidUuid, ) val deathWatch = createTestProbe("deathWatch") deathWatch.expectTerminated(service.ref) } + } + + "An external primary service actor" should { + + val extPrimaryDataConnection = new ExtPrimaryDataConnection( + Map(validUuid -> classOf[PValue]).asJava + ) + + val serviceRef = spawn(ExtPrimaryDataService(scheduler.ref)) + extPrimaryDataConnection.setActorRefs(serviceRef, extSimAdapter.ref) + + "init the service actor" in { + val key = ScheduleLock.singleKey(TSpawner, scheduler.ref, INIT_SIM_TICK) + scheduler + .expectMessageType[ScheduleActivation] // lock activation scheduled - "correctly register a forwarded request" ignore { - primaryDataService ! PrimaryServiceRegistrationMessage( + serviceRef ! Create(InitExtPrimaryData(extPrimaryDataConnection), key) + + val activationMsg = scheduler.expectMessageType[ScheduleActivation] + activationMsg.tick shouldBe INIT_SIM_TICK + activationMsg.unlockKey shouldBe Some(key) + + serviceRef ! Activation(INIT_SIM_TICK) + scheduler.expectMessage(Completion(serviceRef, None)) + } + + "correctly register a forwarded request" in { + serviceRef ! PrimaryServiceRegistrationMessage( systemParticipant.ref, - UUID.randomUUID(), + validUuid, ) - /* Wait for request approval */ systemParticipant.expectMessage( - RegistrationSuccessfulMessage( - primaryDataService, + PrimaryRegistrationSuccessfulMessage(serviceRef, 0L, ActivePowerExtra) + ) + } + + "announce primary data correctly" in { + extPrimaryDataConnection.sendExtMsg( + new ProvidePrimaryData( 0L, + Map(validUuid -> new PValue(10.asKiloWatt)).asJava, + Optional.of(900L), ) ) - /* We cannot directly check, if the requesting actor is among the subscribers, therefore we ask the actor to - * provide data to all subscribed actors and check, if the subscribed probe gets one */ - primaryDataService ! Activation(0) - scheduler.expectMessage(Completion(primaryDataService)) + extSimAdapter.expectMessage(new ScheduleDataServiceMessage(serviceRef)) + serviceRef ! Activation(0) - systemParticipant.expectMessageType[DataProvision] + systemParticipant.expectMessage( + DataProvision( + 0L, + serviceRef, + ActivePower(Kilowatts(10)), + Some(900L), + ) + ) } + } } From cbc42f3e7f8d94daea3ea81f1e0c123f42205f24 Mon Sep 17 00:00:00 2001 From: staudtMarius Date: Mon, 13 Oct 2025 11:54:44 +0200 Subject: [PATCH 3/5] Including reviewer's comments. --- ...ce.scala => ExtPrimaryServiceWorker.scala} | 2 +- .../service/primary/PrimaryServiceProxy.scala | 147 ++++++++---------- .../ie3/simona/sim/setup/ExtSimSetup.scala | 6 +- .../primary/ExtPrimaryDataServiceSpec.scala | 10 +- .../primary/PrimaryServiceProxySpec.scala | 43 ++++- 5 files changed, 118 insertions(+), 90 deletions(-) rename src/main/scala/edu/ie3/simona/service/primary/{ExtPrimaryDataService.scala => ExtPrimaryServiceWorker.scala} (98%) diff --git a/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala b/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorker.scala similarity index 98% rename from src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala rename to src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorker.scala index d4fe67f992..87acb49948 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryDataService.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorker.scala @@ -40,7 +40,7 @@ import scala.jdk.CollectionConverters.MapHasAsScala import scala.jdk.OptionConverters.RichOptional import scala.util.{Failure, Success, Try} -object ExtPrimaryDataService extends SimonaService with ExtDataSupport { +object ExtPrimaryServiceWorker extends SimonaService with ExtDataSupport { override type S = ExtPrimaryDataStateData diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala index 8c7a50c9b1..6d54cfb8fa 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala @@ -27,17 +27,12 @@ import edu.ie3.datamodel.io.source.{ TimeSeriesMetaInformationSource, } import edu.ie3.datamodel.models.value.Value -import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection import edu.ie3.simona.agent.participant.ParticipantAgent import edu.ie3.simona.agent.participant.ParticipantAgent.RegistrationFailedMessage +import edu.ie3.simona.api.data.connection.ExtPrimaryDataConnection import edu.ie3.simona.config.ConfigParams.{SqlParams, TimeStampedCsvParams} import edu.ie3.simona.config.InputConfig.Primary as PrimaryConfig import edu.ie3.simona.exceptions.InitializationException -import edu.ie3.simona.exceptions.{ - InitializationException, - InvalidConfigParameterException, -} -import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage} import edu.ie3.simona.ontology.messages.SchedulerMessage.{ Completion, ScheduleActivation, @@ -84,12 +79,14 @@ object PrimaryServiceProxy { type Message = ServiceMessage | Activation /** State data with needed information to initialize this primary service - * provider proxy + * provider proxy. * * @param primaryConfig - * Configuration for the primary source + * Configuration for the primary source. * @param simulationStart - * Simulation time of the first instant in simulation + * Simulation time of the first instant in simulation. + * @param extSimulationData + * Seq: external primary data connections to service references. */ final case class InitPrimaryServiceProxyStateData( primaryConfig: PrimaryConfig, @@ -102,29 +99,31 @@ object PrimaryServiceProxy { /** Holding the state of an initialized proxy. * * @param modelToTimeSeries - * Mapping from models' to time series unique identifiers + * Mapping from models' to time series unique identifiers. * @param timeSeriesToSourceRef - * Mapping from time series identifier to [[SourceRef]] + * Mapping from time series identifier to [[SourceRef]]. * @param simulationStart - * Simulation time of the first instant in simulation + * Simulation time of the first instant in simulation. * @param primaryConfig - * The configuration for the sources + * The configuration for the sources. + * @param modelToExtWorker + * Map: participant uuid to external primary service worker. */ final case class PrimaryServiceStateData( modelToTimeSeries: Map[UUID, UUID], timeSeriesToSourceRef: Map[UUID, SourceRef], simulationStart: ZonedDateTime, primaryConfig: PrimaryConfig, - extSubscribersToService: Map[UUID, ActorRef[ServiceMessage]] = Map.empty, + modelToExtWorker: Map[UUID, ActorRef[ServiceMessage]] = Map.empty, ) extends ServiceStateData /** Giving reference to the target time series and source worker. * * @param metaInformation - * Meta information (including column scheme) of the time series + * Meta information (including column scheme) of the time series. * @param worker * Optional reference to an already existing worker providing information - * on that time series + * on that time series. */ final case class SourceRef( metaInformation: IndividualTimeSeriesMetaInformation, @@ -149,7 +148,7 @@ object PrimaryServiceProxy { /** Handle all messages, when the actor isn't initialized, yet. * * @return - * How receiving should be handled with gained insight of myself + * How receiving should be handled with gained insight of myself. */ private def uninitialized( initStateData: InitPrimaryServiceProxyStateData @@ -184,14 +183,14 @@ object PrimaryServiceProxy { /** Prepare the needed state data by building a * [[edu.ie3.datamodel.io.source.TimeSeriesMappingSource]], obtain its - * information and compile them to state data + * information and compile them to state data. * * @param primaryConfig - * Configuration for the primary source + * Configuration for the primary source. * @param simulationStart - * Simulation time of first instant in simulation + * Simulation time of first instant in simulation. * @return - * State data, containing the known model and time series identifiers + * State data, containing the known model and time series identifiers. */ private[service] def prepareStateData( primaryConfig: PrimaryConfig, @@ -246,28 +245,20 @@ object PrimaryServiceProxy { } } .toMap - if extSimulationData.nonEmpty then { - val extSubscribersToService = extSimulationData.flatMap { - case (connection, ref) => - connection.getPrimaryDataAssets.asScala.map(id => id -> ref) - } - // Ask ExtPrimaryDataService which UUIDs should be substituted - PrimaryServiceStateData( - modelToTimeSeries, - timeSeriesToSourceRef, - simulationStart, - primaryConfig, - extSubscribersToService.toMap, - ) - } else { - PrimaryServiceStateData( - modelToTimeSeries, - timeSeriesToSourceRef, - simulationStart, - primaryConfig, - ) - } + // create the model to ref map + val modelToExtWorker = extSimulationData.flatMap { + case (connection, ref) => + connection.getPrimaryDataAssets.asScala.map(id => id -> ref) + }.toMap + + PrimaryServiceStateData( + modelToTimeSeries, + timeSeriesToSourceRef, + simulationStart, + primaryConfig, + modelToExtWorker, + ) } } @@ -330,9 +321,9 @@ object PrimaryServiceProxy { * needed, new workers are spun off. * * @param stateData - * Representing the current state of the agent + * Representing the current state of the agent. * @return - * Message handling routine + * Message handling routine. */ private[service] def onMessage(stateData: PrimaryServiceStateData)(using scheduler: ActorRef[SchedulerMessage] @@ -342,10 +333,20 @@ object PrimaryServiceProxy { PrimaryServiceRegistrationMessage(requestingActor, modelUuid), ) => /* Try to register for this model */ - stateData.extSubscribersToService.get(modelUuid) match { + stateData.modelToExtWorker.get(modelUuid) match { case Some(_) => /* There is external data apparent for this model */ - handleExternalModel(modelUuid, stateData, requestingActor) + stateData.modelToExtWorker.get(modelUuid) match { + case Some(ref) => + ref ! PrimaryServiceRegistrationMessage( + requestingActor, + modelUuid, + ) + case None => + ctx.log.warn( + s"Could not forward the registration message for model with uuid `$modelUuid`" + ) + } Behaviors.same @@ -386,14 +387,14 @@ object PrimaryServiceProxy { /** Handle the registration request for a covered model. First, try to get an * already existing worker for this time series, otherwise spin-off a new - * one, remember it and forward the request + * one, remember it and forward the request. * * @param modelUuid - * Unique identifier of the model + * Unique identifier of the model. * @param timeSeriesUuid - * Unique identifier of the equivalent time series + * Unique identifier of the equivalent time series. * @param stateData - * Current state data of the actor + * Current state data of the actor. */ protected[service] def handleCoveredModel( modelUuid: UUID, @@ -444,30 +445,17 @@ object PrimaryServiceProxy { } } - protected def handleExternalModel( - modelUuid: UUID, - stateData: PrimaryServiceStateData, - requestingActor: ActorRef[ParticipantAgent.Request], - ): Unit = { - stateData.extSubscribersToService.foreach { case (_, ref) => - ref ! PrimaryServiceRegistrationMessage( - requestingActor, - modelUuid, - ) - } - } - /** Instantiate a new [[PrimaryServiceWorker]] and send initialization - * information + * information. * * @param metaInformation - * Meta information (including column scheme) of the time series + * Meta information (including column scheme) of the time series. * @param simulationStart - * The time of the simulation start + * The time of the simulation start. * @param primaryConfig - * Configuration for the primary config + * Configuration for the primary config. * @return - * The [[ActorRef]] to the worker + * The [[ActorRef]] to the worker. */ protected[service] def initializeWorker( metaInformation: IndividualTimeSeriesMetaInformation, @@ -504,12 +492,12 @@ object PrimaryServiceProxy { } /** Build a primary source worker and type it to the foreseen value class to - * come + * come. * * @param timeSeriesUuid - * Uuid of the time series the actor processes + * Uuid of the time series the actor processes. * @return - * The [[ActorRef]] to the spun off actor + * The [[ActorRef]] to the spun off actor. */ private[service] def classToWorkerRef( timeSeriesUuid: String @@ -522,15 +510,16 @@ object PrimaryServiceProxy { timeSeriesUuid, ) - /** Building proper init data for the worker + /** Building proper init data for the worker. * * @param metaInformation - * Meta information (including column scheme) of the time series + * Meta information (including column scheme) of the time series. * @param simulationStart - * The time of the simulation start + * The time of the simulation start. * @param primaryConfig - * Configuration for the primary config + * Configuration for the primary config. * @return + * A try for the init state data. */ private[service] def toInitData[V <: Value]( metaInformation: IndividualTimeSeriesMetaInformation, @@ -595,13 +584,13 @@ object PrimaryServiceProxy { /** Register the worker within the state data. * * @param stateData - * Current state information + * Current state information. * @param timeSeriesUuid - * Unique identifier of the time series, the worker takes care of + * Unique identifier of the time series, the worker takes care of. * @param workerRef - * [[ActorRef]] to the new worker actor + * [[ActorRef]] to the new worker actor. * @return - * The updated state data, that holds reference to the worker + * The updated state data, that holds reference to the worker. */ private def updateStateData( stateData: PrimaryServiceStateData, diff --git a/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala b/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala index 1a60111584..7085ca5bc1 100644 --- a/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala +++ b/src/main/scala/edu/ie3/simona/sim/setup/ExtSimSetup.scala @@ -21,8 +21,8 @@ import edu.ie3.simona.scheduler.ScheduleLock import edu.ie3.simona.service.ServiceStateData.InitializeServiceStateData import edu.ie3.simona.service.ev.ExtEvDataService import edu.ie3.simona.service.ev.ExtEvDataService.InitExtEvData -import edu.ie3.simona.service.primary.ExtPrimaryDataService -import edu.ie3.simona.service.primary.ExtPrimaryDataService.InitExtPrimaryData +import edu.ie3.simona.service.primary.ExtPrimaryServiceWorker +import edu.ie3.simona.service.primary.ExtPrimaryServiceWorker.InitExtPrimaryData import edu.ie3.simona.util.SimonaConstants.PRE_INIT_TICK import org.apache.pekko.actor.typed.ActorRef import org.apache.pekko.actor.typed.scaladsl.ActorContext @@ -141,7 +141,7 @@ object ExtSimSetup { connection match { case extPrimaryDataConnection: ExtPrimaryDataConnection => val serviceRef = context.spawn( - ExtPrimaryDataService(scheduler), + ExtPrimaryServiceWorker(scheduler), "ExtPrimaryDataService_$index", ) diff --git a/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala index 888b3b473a..dbc4747b7c 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala @@ -29,7 +29,7 @@ import edu.ie3.simona.ontology.messages.ServiceMessage.{ import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage} import edu.ie3.simona.scheduler.ScheduleLock import edu.ie3.simona.service.Data.PrimaryData.{ActivePower, ActivePowerExtra} -import edu.ie3.simona.service.primary.ExtPrimaryDataService.InitExtPrimaryData +import edu.ie3.simona.service.primary.ExtPrimaryServiceWorker.InitExtPrimaryData import edu.ie3.simona.test.common.TestSpawnerTyped import edu.ie3.simona.util.SimonaConstants.INIT_SIM_TICK import edu.ie3.util.quantities.QuantityUtils.asKiloWatt @@ -72,7 +72,7 @@ class ExtPrimaryDataServiceSpec Map(validUuid -> classOf[PValue]).asJava ) - val primaryDataService = spawn(ExtPrimaryDataService(scheduler.ref)) + val primaryDataService = spawn(ExtPrimaryServiceWorker(scheduler.ref)) extPrimaryDataConnection.setActorRefs( primaryDataService, extSimAdapter.ref, @@ -109,7 +109,7 @@ class ExtPrimaryDataServiceSpec ) // we need to create another service, since we want to continue using the other in later tests - val service = spawn(ExtPrimaryDataService(schedulerProbe.ref)) + val service = spawn(ExtPrimaryServiceWorker(schedulerProbe.ref)) extPrimaryDataConnection.setActorRefs(service, extSimAdapter.ref) val key = @@ -139,7 +139,7 @@ class ExtPrimaryDataServiceSpec ) // we need to create another service, since we want to continue using the other in later tests - val service = spawn(ExtPrimaryDataService(schedulerProbe.ref)) + val service = spawn(ExtPrimaryServiceWorker(schedulerProbe.ref)) extPrimaryDataConnection.setActorRefs(service, extSimAdapter.ref) val key = @@ -168,7 +168,7 @@ class ExtPrimaryDataServiceSpec Map(validUuid -> classOf[PValue]).asJava ) - val serviceRef = spawn(ExtPrimaryDataService(scheduler.ref)) + val serviceRef = spawn(ExtPrimaryServiceWorker(scheduler.ref)) extPrimaryDataConnection.setActorRefs(serviceRef, extSimAdapter.ref) "init the service actor" in { diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala index 94ceacd42c..1b9438b9c2 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala @@ -32,7 +32,11 @@ import edu.ie3.simona.ontology.messages.ServiceMessage.{ PrimaryServiceRegistrationMessage, WorkerRegistrationMessage, } -import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage} +import edu.ie3.simona.ontology.messages.{ + Activation, + SchedulerMessage, + ServiceMessage, +} import edu.ie3.simona.scheduler.ScheduleLock import edu.ie3.simona.service.primary.PrimaryServiceProxy.{ InitPrimaryServiceProxyStateData, @@ -142,7 +146,7 @@ class PrimaryServiceProxySpec m } private val validExtPrimaryDataService = spawn( - ExtPrimaryDataService(scheduler.ref) + ExtPrimaryServiceWorker(scheduler.ref) ) private val extEntityId = @@ -449,6 +453,41 @@ class PrimaryServiceProxySpec key shouldBe None } } + + "forwarding the registration to correct ext worker" in { + val extWorker1 = TestProbe[ServiceMessage]("extWorker1") + val extWorker2 = TestProbe[ServiceMessage]("extWorker2") + + val participant1 = TestProbe[ParticipantAgent.Message]("Participant1").ref + val participant2 = TestProbe[ParticipantAgent.Message]("Participant2").ref + + val extEntityUuid1 = UUID.randomUUID() + val extEntityUuid2 = UUID.randomUUID() + + val stateData = PrimaryServiceStateData( + Map.empty, + Map.empty, + simulationStart, + validPrimaryConfig, + Map(extEntityUuid1 -> extWorker1.ref, extEntityUuid2 -> extWorker2.ref), + ) + + val behavior = BehaviorTestKit(PrimaryServiceProxy.onMessage(stateData)) + + behavior.run( + PrimaryServiceRegistrationMessage(participant1, extEntityUuid1) + ) + extWorker1.expectMessage( + PrimaryServiceRegistrationMessage(participant1, extEntityUuid1) + ) + + behavior.run( + PrimaryServiceRegistrationMessage(participant2, extEntityUuid2) + ) + extWorker2.expectMessage( + PrimaryServiceRegistrationMessage(participant2, extEntityUuid2) + ) + } } private val dummyWorker = From 3b40a73a00b0ead322a23512b39cc74280c24f5a Mon Sep 17 00:00:00 2001 From: staudtMarius Date: Mon, 13 Oct 2025 11:56:38 +0200 Subject: [PATCH 4/5] fmt --- ...yDataServiceSpec.scala => ExtPrimaryServiceWorkerSpec.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename src/test/scala/edu/ie3/simona/service/primary/{ExtPrimaryDataServiceSpec.scala => ExtPrimaryServiceWorkerSpec.scala} (99%) diff --git a/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorkerSpec.scala similarity index 99% rename from src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala rename to src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorkerSpec.scala index dbc4747b7c..7463c9f018 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryDataServiceSpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/ExtPrimaryServiceWorkerSpec.scala @@ -46,7 +46,7 @@ import squants.energy.Kilowatts import java.util.{Optional, UUID} import scala.jdk.CollectionConverters.* -class ExtPrimaryDataServiceSpec +class ExtPrimaryServiceWorkerSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with should.Matchers From 5a50773ee98b2a1e02e6da89c715793ca071b5fd Mon Sep 17 00:00:00 2001 From: staudtMarius Date: Tue, 28 Oct 2025 15:37:45 +0100 Subject: [PATCH 5/5] Fixing issues after merging dev. --- .../scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala b/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala index b1cad422bd..e22ce831bb 100644 --- a/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala +++ b/src/main/scala/edu/ie3/simona/sim/setup/SimonaStandaloneSetup.scala @@ -147,7 +147,7 @@ class SimonaStandaloneSetup( InitPrimaryServiceProxyStateData( simonaConfig.simona.input.primary, simulationStart, - extSimSetupData.extPrimaryDataServices, + extSimSetupData.primaryDataServices, ), ), "primaryServiceProxyAgent",