diff --git a/model/src/main/java/io/spine/examples/shareaware/market/SharePriceMovement.java b/model/src/main/java/io/spine/examples/shareaware/market/SharePriceMovement.java
new file mode 100644
index 00000000..396d1c19
--- /dev/null
+++ b/model/src/main/java/io/spine/examples/shareaware/market/SharePriceMovement.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.market;
+
+import com.google.errorprone.annotations.Immutable;
+import com.google.protobuf.Duration;
+import com.google.protobuf.Timestamp;
+import io.spine.annotation.GeneratedMixin;
+import io.spine.base.EntityState;
+import io.spine.examples.shareaware.ShareId;
+import io.spine.examples.shareaware.SharePriceMovementId;
+
+/**
+ * Common interface for projections that display the movements of the share price.
+ */
+@Immutable
+@GeneratedMixin
+public interface SharePriceMovement extends EntityState {
+
+ /**
+ * Returns the ID of the {@code SharePriceMovement} projection.
+ */
+ SharePriceMovementId getId();
+
+ /**
+ * Returns the time when the projection was created.
+ */
+ default Timestamp whenCreated() {
+ return getId().getWhenCreated();
+ }
+
+ /**
+ * Returns the activity period of the projection.
+ *
+ *
The period when it is collecting data about the share price movements.
+ */
+ default Duration activityTime() {
+ return getId().getActivityTime();
+ }
+
+ /**
+ * Returns the ID of the share which price movements the projection displays.
+ */
+ default ShareId share() {
+ return getId().getShare();
+ }
+}
diff --git a/model/src/main/java/io/spine/examples/shareaware/market/WithShares.java b/model/src/main/java/io/spine/examples/shareaware/market/WithShares.java
new file mode 100644
index 00000000..df1c0b90
--- /dev/null
+++ b/model/src/main/java/io/spine/examples/shareaware/market/WithShares.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.market;
+
+import com.google.errorprone.annotations.Immutable;
+import io.spine.annotation.GeneratedMixin;
+import io.spine.base.EventMessage;
+import io.spine.examples.shareaware.ShareId;
+import io.spine.examples.shareaware.share.Share;
+
+import java.util.List;
+
+import static io.spine.util.Exceptions.newIllegalArgumentException;
+
+/**
+ * Common interface for signals which operate with a list of shares.
+ */
+@Immutable
+@GeneratedMixin
+public interface WithShares extends EventMessage {
+
+ /**
+ * Returns the list of shares.
+ */
+ List getShareList();
+
+ /**
+ * Finds the share with provided ID from the shares list.
+ *
+ * @throws IllegalArgumentException when the share with provided ID is not found in the list
+ */
+ default Share find(ShareId id) {
+ List shares = getShareList();
+ var optionalShare = shares
+ .stream()
+ .filter(share -> share.getId().equals(id))
+ .findAny();
+ if (optionalShare.isEmpty()) {
+ throw newIllegalArgumentException(
+ "Cannot find the share with the provided ID `%s` in the list of shares `%s`.",
+ id, shares);
+ }
+ return optionalShare.get();
+ }
+}
diff --git a/model/src/main/proto/spine_examples/shareaware/identifiers.proto b/model/src/main/proto/spine_examples/shareaware/identifiers.proto
index 034ecc16..5bac51f7 100644
--- a/model/src/main/proto/spine_examples/shareaware/identifiers.proto
+++ b/model/src/main/proto/spine_examples/shareaware/identifiers.proto
@@ -36,6 +36,8 @@ option java_outer_classname = "IdentifiersProto";
option java_multiple_files = true;
import "spine/core/user_id.proto";
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
// Identifies a watchlist.
message WatchlistId {
@@ -119,3 +121,16 @@ message ReplenishmentOperationId {
SaleId sale = 2;
}
}
+
+// Identifies the share price movement view.
+message SharePriceMovementId {
+
+ // The ID of share, for which the movements are collected.
+ ShareId share = 1;
+
+ // The duration during which the share price movements are collected.
+ google.protobuf.Duration activity_time = 2 [(required) = true];
+
+ // The time when the share price movement view was created.
+ google.protobuf.Timestamp when_created = 3 [(required) = true];
+}
diff --git a/model/src/main/proto/spine_examples/shareaware/market/events.proto b/model/src/main/proto/spine_examples/shareaware/market/events.proto
index db9dbb5b..f1c7a564 100644
--- a/model/src/main/proto/spine_examples/shareaware/market/events.proto
+++ b/model/src/main/proto/spine_examples/shareaware/market/events.proto
@@ -38,6 +38,7 @@ option java_multiple_files = true;
import "spine_examples/shareaware/identifiers.proto";
import "spine/money/money.proto";
import "spine_examples/shareaware/share.proto";
+import "google/protobuf/timestamp.proto";
// Shares have been obtained from the market.
message SharesObtained {
@@ -90,10 +91,14 @@ message MarketClosed {
// Shares on market have been updated.
message MarketSharesUpdated {
+ option (is).java_type = "io.spine.examples.shareaware.market.WithShares";
// The ID of the shares market.
MarketId market = 1;
// Updated shares.
repeated Share share = 2 [(required) = true];
+
+ // Time when shares were updated.
+ google.protobuf.Timestamp when_updated = 3 [(required) = true];
}
diff --git a/model/src/main/proto/spine_examples/shareaware/market/share_price_movement.proto b/model/src/main/proto/spine_examples/shareaware/market/share_price_movement.proto
new file mode 100644
index 00000000..d6116b72
--- /dev/null
+++ b/model/src/main/proto/spine_examples/shareaware/market/share_price_movement.proto
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+syntax = "proto3";
+
+package spine_examples.shareaware.market;
+
+import "spine/options.proto";
+
+option (type_url_prefix) = "type.shareaware.spine.io";
+option java_package = "io.spine.examples.shareaware.market";
+option java_outer_classname = "SharesPriceMovementPerMinuteProto";
+option java_multiple_files = true;
+
+import "spine_examples/shareaware/identifiers.proto";
+import "google/protobuf/timestamp.proto";
+import "spine/money/money.proto";
+
+// Displays the share price movement per minute on a timeline.
+message SharePriceMovementPerMinute {
+ option (entity) = {kind: PROJECTION};
+ option (is).java_type = "io.spine.examples.shareaware.market.SharePriceMovement";
+
+ // The ID of the share price movement.
+ SharePriceMovementId id = 1;
+
+ // The ID of the share, which price movement to display.
+ ShareId share = 2 [(required) = true, (column) = true];
+
+ // The list of the price-to-time points.
+ repeated PriceAtTime point = 3;
+}
+
+// The share price at a particular time.
+message PriceAtTime {
+
+ // The share price.
+ spine.money.Money price = 1 [(required) = true];
+
+ // Point in time when the share price was set.
+ google.protobuf.Timestamp when = 2 [(required) = true];
+}
diff --git a/server/build.gradle.kts b/server/build.gradle.kts
index 1dbf7ea1..13b601ad 100644
--- a/server/build.gradle.kts
+++ b/server/build.gradle.kts
@@ -46,6 +46,7 @@ spine {
dependencies {
implementation(project(":model"))
testImplementation(project(":model", "test"))
+ testImplementation(project(":testutil-server"))
}
application {
diff --git a/server/src/main/java/io/spine/examples/shareaware/server/TradingContext.java b/server/src/main/java/io/spine/examples/shareaware/server/TradingContext.java
index 521df2f5..9297d0fc 100644
--- a/server/src/main/java/io/spine/examples/shareaware/server/TradingContext.java
+++ b/server/src/main/java/io/spine/examples/shareaware/server/TradingContext.java
@@ -32,6 +32,7 @@
import io.spine.examples.shareaware.server.investment.SharesSaleRepository;
import io.spine.examples.shareaware.server.market.AvailableMarketSharesRepository;
import io.spine.examples.shareaware.server.market.MarketProcess;
+import io.spine.examples.shareaware.server.market.SharePriceMovementPerMinuteRepository;
import io.spine.examples.shareaware.server.paymentgateway.PaymentGatewayProcess;
import io.spine.examples.shareaware.server.wallet.WalletAggregate;
import io.spine.examples.shareaware.server.wallet.WalletBalanceRepository;
@@ -75,6 +76,7 @@ public static BoundedContextBuilder newBuilder() {
.add(new SharesPurchaseRepository())
.add(new SharesSaleRepository())
.add(new InvestmentViewRepository())
- .add(new AvailableMarketSharesRepository());
+ .add(new AvailableMarketSharesRepository())
+ .add(new SharePriceMovementPerMinuteRepository());
}
}
diff --git a/server/src/main/java/io/spine/examples/shareaware/server/market/MarketDataProvider.java b/server/src/main/java/io/spine/examples/shareaware/server/market/MarketDataProvider.java
index e598af1e..41cdc5ae 100644
--- a/server/src/main/java/io/spine/examples/shareaware/server/market/MarketDataProvider.java
+++ b/server/src/main/java/io/spine/examples/shareaware/server/market/MarketDataProvider.java
@@ -39,6 +39,7 @@
import java.util.function.Consumer;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static io.spine.base.Time.currentTime;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -159,6 +160,7 @@ private MarketSharesUpdated emitEvent() {
.newBuilder()
.setMarket(MarketProcess.ID)
.addAllShare(updatedShares)
+ .setWhenUpdated(currentTime())
.vBuild();
marketContext.emittedEvent(event, actor);
return event;
diff --git a/server/src/main/java/io/spine/examples/shareaware/server/market/SharePriceMovementPerMinuteProjection.java b/server/src/main/java/io/spine/examples/shareaware/server/market/SharePriceMovementPerMinuteProjection.java
new file mode 100644
index 00000000..4a596869
--- /dev/null
+++ b/server/src/main/java/io/spine/examples/shareaware/server/market/SharePriceMovementPerMinuteProjection.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.server.market;
+
+import io.spine.core.External;
+import io.spine.core.Subscribe;
+import io.spine.examples.shareaware.SharePriceMovementId;
+import io.spine.examples.shareaware.market.PriceAtTime;
+import io.spine.examples.shareaware.market.SharePriceMovementPerMinute;
+import io.spine.examples.shareaware.market.event.MarketSharesUpdated;
+import io.spine.server.projection.Projection;
+
+/**
+ * The view of the share price movements per minute.
+ */
+final class SharePriceMovementPerMinuteProjection
+ extends Projection {
+
+ @Subscribe
+ void on(@External MarketSharesUpdated e) {
+ var shareId = builder()
+ .getId()
+ .getShare();
+ var price = e.find(shareId)
+ .getPrice();
+ var priceAtTime = PriceAtTime
+ .newBuilder()
+ .setPrice(price)
+ .setWhen(e.getWhenUpdated())
+ .vBuild();
+ builder().setShare(shareId)
+ .addPoint(priceAtTime);
+ }
+}
diff --git a/server/src/main/java/io/spine/examples/shareaware/server/market/SharePriceMovementPerMinuteRepository.java b/server/src/main/java/io/spine/examples/shareaware/server/market/SharePriceMovementPerMinuteRepository.java
new file mode 100644
index 00000000..5d46f39d
--- /dev/null
+++ b/server/src/main/java/io/spine/examples/shareaware/server/market/SharePriceMovementPerMinuteRepository.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.server.market;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
+import com.google.protobuf.Duration;
+import com.google.protobuf.Timestamp;
+import io.spine.examples.shareaware.SharePriceMovementId;
+import io.spine.examples.shareaware.market.SharePriceMovementPerMinute;
+import io.spine.examples.shareaware.market.event.MarketSharesUpdated;
+import io.spine.server.projection.ProjectionRepository;
+import io.spine.server.route.EventRouting;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static io.spine.base.Time.currentTime;
+
+/**
+ * Manages instances of the {@link SharePriceMovementPerMinuteProjection}.
+ */
+public final class SharePriceMovementPerMinuteRepository
+ extends ProjectionRepository {
+
+ private static final int SECONDS_IN_MINUTE = 60;
+
+ private static final Duration PROJECTION_ACTIVE_TIME = Duration
+ .newBuilder()
+ .setSeconds(SECONDS_IN_MINUTE)
+ .build();
+
+ @Override
+ @OverridingMethodsMustInvokeSuper
+ protected void setupEventRouting(EventRouting routing) {
+ super.setupEventRouting(routing);
+ routing.route(MarketSharesUpdated.class, (event, context) -> toSharePriceMovements(event));
+ }
+
+ /**
+ * Routes the {@code MarketSharesUpdated} event to the {@code SharePriceMovementPerMinute} projections.
+ */
+ private static ImmutableSet
+ toSharePriceMovements(MarketSharesUpdated event) {
+ return event.getShareList()
+ .stream()
+ .map(share -> {
+ var whenCreated = roundDownToNearestMinute(currentTime());
+ return SharePriceMovementId
+ .newBuilder()
+ .setShare(share.getId())
+ .setActivityTime(PROJECTION_ACTIVE_TIME)
+ .setWhenCreated(whenCreated)
+ .vBuild();
+ })
+ .collect(toImmutableSet());
+ }
+
+ /**
+ * Rounds the provided {@code Timestamp} down to the nearest minute.
+ */
+ private static Timestamp roundDownToNearestMinute(Timestamp timestamp) {
+ var seconds = timestamp.getSeconds() - timestamp.getSeconds() % SECONDS_IN_MINUTE;
+ return Timestamp
+ .newBuilder()
+ .setSeconds(seconds)
+ .setNanos(0)
+ .build();
+ }
+}
diff --git a/server/src/test/java/io/spine/examples/shareaware/server/ProjectionReader.java b/server/src/test/java/io/spine/examples/shareaware/server/ProjectionReader.java
new file mode 100644
index 00000000..70a8c3b1
--- /dev/null
+++ b/server/src/test/java/io/spine/examples/shareaware/server/ProjectionReader.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.server;
+
+import com.google.common.collect.ImmutableList;
+import io.spine.base.EntityState;
+import io.spine.client.ActorRequestFactory;
+import io.spine.client.Filter;
+import io.spine.client.Query;
+import io.spine.client.QueryResponse;
+import io.spine.core.ActorContext;
+import io.spine.grpc.MemoizingObserver;
+import io.spine.server.stand.Stand;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.spine.protobuf.AnyPacker.unpack;
+
+/**
+ * Reader for projections in bounded context.
+ *
+ * @param type of the {@code Projection} to read
+ */
+public final class ProjectionReader {
+
+ private final Stand stand;
+ private final Class stateClass;
+
+ public ProjectionReader(Stand stand, Class stateClass) {
+ this.stand = checkNotNull(stand);
+ this.stateClass = checkNotNull(stateClass);
+ }
+
+ /**
+ * Reads projections that match the filter on behalf of the actor from the context.
+ */
+ public ImmutableList read(ActorContext ctx, Filter... filters) {
+ checkNotNull(ctx);
+ var queryFactory = ActorRequestFactory
+ .fromContext(ctx)
+ .query();
+ var query = queryFactory
+ .select(stateClass)
+ .where(filters)
+ .build();
+ return executeAndUnpackResponse(query);
+ }
+
+ private ImmutableList executeAndUnpackResponse(Query query) {
+ var observer = new MemoizingObserver();
+ stand.execute(query, observer);
+ var response = observer.firstResponse();
+ var result = response
+ .getMessageList()
+ .stream()
+ .map(state -> unpack(state.getState(), stateClass))
+ .collect(toImmutableList());
+ return result;
+ }
+}
diff --git a/server/src/test/java/io/spine/examples/shareaware/server/e2e/SharePurchaseTest.java b/server/src/test/java/io/spine/examples/shareaware/server/e2e/SharePurchaseTest.java
index 1b62db65..50a9ea35 100644
--- a/server/src/test/java/io/spine/examples/shareaware/server/e2e/SharePurchaseTest.java
+++ b/server/src/test/java/io/spine/examples/shareaware/server/e2e/SharePurchaseTest.java
@@ -32,6 +32,7 @@
import org.junit.jupiter.api.Test;
import static com.google.common.truth.extensions.proto.ProtoTruth.assertThat;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static io.spine.examples.shareaware.given.GivenMoney.usd;
import static io.spine.examples.shareaware.server.e2e.given.SharePurchaseTestEnv.balanceAfterPurchase;
import static io.spine.examples.shareaware.server.e2e.given.SharePurchaseTestEnv.investmentAfterPurchase;
@@ -56,6 +57,8 @@ void test() {
var channel = openChannel();
var user = new E2EUser(channel);
+ // Wait for the market to release shares.
+ sleepUninterruptibly(marketPeriod());
var shares = user.looksAtAvailableShares();
var tesla = pickTesla(shares);
diff --git a/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/E2EUser.java b/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/E2EUser.java
index caa2e2f5..dacbae06 100644
--- a/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/E2EUser.java
+++ b/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/E2EUser.java
@@ -60,6 +60,7 @@
import static io.spine.examples.shareaware.server.e2e.given.SharePurchaseTestEnv.zeroWalletBalance;
import static io.spine.examples.shareaware.server.given.GivenWallet.createWallet;
import static io.spine.util.Exceptions.illegalStateWithCauseOf;
+import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
@@ -113,17 +114,16 @@ public WalletId walletId() {
* Describes the user's action to look at available shares on the market.
*/
public List looksAtAvailableShares() {
- var shares = availableMarketShares
- .state()
- .getShareList();
- return shares;
+ var marketShares = requireNonNull(availableMarketShares.state());
+ return marketShares.getShareList();
}
/**
* Describes the user's action to look at the investment.
*/
public InvestmentView looksAtInvestment() {
- return investment.state();
+ var investmentView = requireNonNull(investment.state());
+ return investmentView;
}
/**
@@ -132,8 +132,7 @@ public InvestmentView looksAtInvestment() {
public WalletBalance replenishesWalletFor(Money amount) {
var replenishWallet = replenishWallet(walletId, amount);
- post(replenishWallet);
- var balanceAfterReplenishment = wallet.state();
+ var balanceAfterReplenishment = wallet.onceUpdatedAfter(replenishWallet);
var expectedBalance = walletBalanceWith(usd(500), walletId);
assertThat(balanceAfterReplenishment).isEqualTo(expectedBalance);
return balanceAfterReplenishment;
@@ -157,8 +156,8 @@ public EitherOf2 purchase(Share share, int how
var insufficientFunds = retrieveValueFrom(subscriptionOutcome);
return EitherOf2.withB(insufficientFunds);
}
- post(purchaseShares);
- return EitherOf2.withA(wallet.state());
+ WalletBalance walletAfterPurchase = wallet.onceUpdatedAfter(purchaseShares);
+ return EitherOf2.withA(walletAfterPurchase);
}
/**
@@ -176,8 +175,8 @@ public WalletBalance withdrawsAllMoney(WalletBalance balance) {
*/
private WalletBalance withdrawsMoney(Money amount) {
var withdrawMoney = withdrawMoneyFrom(walletId, amount);
- post(withdrawMoney);
- return wallet.state();
+ WalletBalance walletAfterWithdraw = wallet.onceUpdatedAfter(withdrawMoney);
+ return walletAfterWithdraw;
}
/**
@@ -198,9 +197,10 @@ private SubscriptionOutcome subscribeToEvent(Class S retrieveValueFrom(SubscriptionOutcome changedState) {
try {
+ S value = changedState.future()
+ .get(10, SECONDS);
cancel(changedState.subscription());
- return changedState.future()
- .get(10, SECONDS);
+ return value;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw illegalStateWithCauseOf(e);
}
@@ -216,8 +216,7 @@ private void cancel(Subscription subscription) {
private void createWalletForUser() {
var createWallet = createWallet(walletId);
- post(createWallet);
- var initialBalance = wallet.state();
+ var initialBalance = wallet.onceUpdatedAfter(createWallet);
assertThat(initialBalance).isEqualTo(zeroWalletBalance(walletId));
}
}
diff --git a/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/EntitySubscription.java b/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/EntitySubscription.java
index 57fa1d1d..8d7e0577 100644
--- a/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/EntitySubscription.java
+++ b/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/EntitySubscription.java
@@ -26,52 +26,43 @@
package io.spine.examples.shareaware.server.e2e.given;
+import io.spine.base.CommandMessage;
import io.spine.base.EntityState;
import io.spine.client.Client;
import io.spine.core.UserId;
+import io.spine.examples.shareaware.testing.server.e2e.AsyncObserver;
+import io.spine.examples.shareaware.testing.server.e2e.StateRouter;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-import static io.spine.util.Exceptions.illegalStateWithCauseOf;
+import java.util.function.Consumer;
/**
- * Subscription for the {@code EntityState} changes.
+ * Configures the {@code AsyncObserver} with how to send a command
+ * and how to observe the entity state changes using {@code Spine} Client API.
*/
-public class EntitySubscription {
-
- private final ObservedEntity entity = new ObservedEntity<>();
+class EntitySubscription extends AsyncObserver {
EntitySubscription(Class entityType, Client client, UserId user) {
- client.onBehalfOf(user)
- .subscribeTo(entityType)
- .observe(entity::setState)
- .post();
+ super(subscribeAndObserve(entityType, client, user), command(client, user));
}
/**
- * Provides the current state of the subscribed entity.
+ * Returns callback that defines how to observe an entity state using {@code Spine} Client API.
*/
- public S state() {
- return entity.state();
+ @SuppressWarnings("ResultOfMethodCallIgnored") // It's fine as this callback calls once in the constructor.
+ private static
+ StateRouter subscribeAndObserve(Class entityType, Client client, UserId user) {
+ return recipient -> client.onBehalfOf(user)
+ .subscribeTo(entityType)
+ .observe(recipient)
+ .post();
}
- private static final class ObservedEntity {
- private CompletableFuture future = new CompletableFuture<>();
-
- private void setState(S value) {
- if(future.isDone()) {
- future = new CompletableFuture<>();
- }
- future.complete(value);
- }
-
- private S state() {
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- throw illegalStateWithCauseOf(e);
- }
- }
+ /**
+ * Returns callback which defines how to send a command using {@code Spine} Client API.
+ */
+ private static Consumer command(Client client, UserId user) {
+ return commandMessage -> client.onBehalfOf(user)
+ .command(commandMessage)
+ .postAndForget();
}
}
diff --git a/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/WithServer.java b/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/WithServer.java
index 1bd4db1b..8afb7384 100644
--- a/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/WithServer.java
+++ b/server/src/test/java/io/spine/examples/shareaware/server/e2e/given/WithServer.java
@@ -53,6 +53,7 @@ public abstract class WithServer {
private Server server;
private final Collection channels = new ArrayList<>();
private static final MarketDataProvider provider = MarketDataProvider.instance();
+ private final Duration marketPeriod = Duration.ofSeconds(1);
/**
* Starts the server and runs the {@link MarketDataProvider}.
@@ -63,7 +64,7 @@ void startAndConnect() throws IOException {
.add(TradingContext.newBuilder())
.build();
server.start();
- provider.runWith(Duration.ofSeconds(1));
+ provider.runWith(marketPeriod);
}
/**
@@ -87,6 +88,13 @@ protected ManagedChannel openChannel() {
return channel;
}
+ /**
+ * Returns the period with which the market is updating shares.
+ */
+ protected Duration marketPeriod() {
+ return marketPeriod;
+ }
+
private static void closeChannel(ManagedChannel channel) {
channel.shutdown();
try {
diff --git a/server/src/test/java/io/spine/examples/shareaware/server/given/GivenShare.java b/server/src/test/java/io/spine/examples/shareaware/server/given/GivenShare.java
index e46e1a53..01a7d0f4 100644
--- a/server/src/test/java/io/spine/examples/shareaware/server/given/GivenShare.java
+++ b/server/src/test/java/io/spine/examples/shareaware/server/given/GivenShare.java
@@ -28,6 +28,7 @@
import io.spine.examples.shareaware.share.Share;
import io.spine.examples.shareaware.ShareId;
+import io.spine.money.Money;
import static io.spine.examples.shareaware.given.GivenMoney.*;
@@ -46,21 +47,27 @@ private GivenShare() {
}
public static Share tesla() {
- return Share
- .newBuilder()
- .setId(teslaId)
- .setPrice(usd(20))
- .setCompanyName("Tesla")
- .setCompanyLogo("testURL")
- .vBuild();
+ return tesla(usd(20));
+ }
+
+ public static Share tesla(Money price) {
+ return share(teslaId, price, "Tesla");
}
public static Share apple() {
+ return apple(usd(20));
+ }
+
+ public static Share apple(Money price) {
+ return share(appleId, price, "Apple");
+ }
+
+ private static Share share(ShareId id, Money price, String companyName) {
return Share
.newBuilder()
- .setId(appleId)
- .setPrice(usd(20))
- .setCompanyName("Apple")
+ .setId(id)
+ .setPrice(price)
+ .setCompanyName(companyName)
.setCompanyLogo("testURL")
.vBuild();
}
diff --git a/server/src/test/java/io/spine/examples/shareaware/server/market/SharePriceMovementProjectionTest.java b/server/src/test/java/io/spine/examples/shareaware/server/market/SharePriceMovementProjectionTest.java
new file mode 100644
index 00000000..e9091382
--- /dev/null
+++ b/server/src/test/java/io/spine/examples/shareaware/server/market/SharePriceMovementProjectionTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.server.market;
+
+import io.spine.examples.shareaware.market.SharePriceMovementPerMinute;
+import io.spine.examples.shareaware.server.ProjectionReader;
+import io.spine.server.BoundedContextBuilder;
+import io.spine.server.integration.ThirdPartyContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.extensions.proto.ProtoTruth.assertThat;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static io.spine.client.Filters.eq;
+import static io.spine.examples.shareaware.given.GivenMoney.usd;
+import static io.spine.examples.shareaware.server.given.GivenShare.tesla;
+import static io.spine.examples.shareaware.server.market.given.MarketTestEnv.marketSharesUpdated;
+import static io.spine.examples.shareaware.server.market.given.SharesPriceMovementTestEnv.projectionActivityTime;
+import static io.spine.examples.shareaware.server.market.given.SharesPriceMovementTestEnv.ShareFieldInProjection;
+import static io.spine.examples.shareaware.server.market.given.SharesPriceMovementTestEnv.actorContext;
+import static io.spine.examples.shareaware.server.market.given.SharesPriceMovementTestEnv.sharePriceMovementPerMinute;
+import static io.spine.testing.core.given.GivenUserId.newUuid;
+import static java.time.Duration.ofSeconds;
+
+@DisplayName("`SharePriceMovement` projection should")
+final class SharePriceMovementProjectionTest {
+
+ private ThirdPartyContext marketData;
+
+ private ProjectionReader reader;
+
+ @BeforeEach
+ void setUp() {
+ var repository = new SharePriceMovementPerMinuteRepository();
+ var context = BoundedContextBuilder
+ .assumingTests()
+ .add(repository)
+ .build();
+ marketData = ThirdPartyContext.singleTenant("MarketData");
+ reader = new ProjectionReader<>(context.stand(), SharePriceMovementPerMinute.class);
+ }
+
+ @Test
+ @DisplayName("accept the events only for the activity time")
+ void createProjections() {
+ var shareId = tesla().getId();
+
+ marketData.emittedEvent(marketSharesUpdated(), newUuid());
+ sleepUninterruptibly(ofSeconds(60));
+ var projectionsAfterFirstEmit = reader.read(
+ actorContext(),
+ eq(ShareFieldInProjection, shareId)
+ );
+ assertThat(projectionsAfterFirstEmit.size()).isEqualTo(1);
+
+ marketData.emittedEvent(marketSharesUpdated(), newUuid());
+ sleepUninterruptibly(ofSeconds(projectionActivityTime));
+ var projectionsAfterSecondEmit = reader.read(
+ actorContext(),
+ eq(ShareFieldInProjection, shareId)
+ );
+ assertThat(projectionsAfterSecondEmit.size()).isEqualTo(2);
+
+ assertThat(projectionsAfterSecondEmit.get(0))
+ .isNotEqualTo(projectionsAfterSecondEmit.get(1));
+ assertThat(projectionsAfterSecondEmit.get(0)
+ .getPointCount())
+ .isEqualTo(1);
+ assertThat(projectionsAfterSecondEmit.get(1)
+ .getPointCount())
+ .isEqualTo(1);
+ }
+
+ @Test
+ @DisplayName("construct the `PriceAtTime` from the `MarketSharesUpdate` event")
+ void state() {
+ var shareId = tesla().getId();
+ var shareWithLowerPrice = tesla(usd(10));
+ var shareWithHigherPrice = tesla(usd(20));
+ var eventWithLowerPrice = marketSharesUpdated(shareWithLowerPrice);
+ var eventWithHigherPrice = marketSharesUpdated(shareWithHigherPrice);
+
+ marketData.emittedEvent(eventWithLowerPrice, newUuid());
+ marketData.emittedEvent(eventWithHigherPrice, newUuid());
+ sleepUninterruptibly(ofSeconds(projectionActivityTime));
+ var projection = reader
+ .read(actorContext(), eq(ShareFieldInProjection, shareId))
+ .get(0);
+ var expectedProjection =
+ sharePriceMovementPerMinute(shareId, eventWithLowerPrice, eventWithHigherPrice);
+
+ assertThat(projection)
+ .comparingExpectedFieldsOnly()
+ .isEqualTo(expectedProjection);
+ }
+}
diff --git a/server/src/test/java/io/spine/examples/shareaware/server/market/given/MarketTestEnv.java b/server/src/test/java/io/spine/examples/shareaware/server/market/given/MarketTestEnv.java
index 7bffd1c6..441dab17 100644
--- a/server/src/test/java/io/spine/examples/shareaware/server/market/given/MarketTestEnv.java
+++ b/server/src/test/java/io/spine/examples/shareaware/server/market/given/MarketTestEnv.java
@@ -26,12 +26,12 @@
package io.spine.examples.shareaware.server.market.given;
-import io.spine.core.UserId;
+import io.spine.base.Time;
import io.spine.examples.shareaware.MarketId;
import io.spine.examples.shareaware.PurchaseId;
import io.spine.examples.shareaware.SaleId;
-import io.spine.examples.shareaware.share.Share;
import io.spine.examples.shareaware.ShareId;
+import io.spine.examples.shareaware.given.GivenMoney;
import io.spine.examples.shareaware.market.AvailableMarketShares;
import io.spine.examples.shareaware.market.Market;
import io.spine.examples.shareaware.market.command.CloseMarket;
@@ -41,13 +41,15 @@
import io.spine.examples.shareaware.market.event.MarketClosed;
import io.spine.examples.shareaware.market.event.MarketOpened;
import io.spine.examples.shareaware.market.event.MarketSharesUpdated;
-import io.spine.examples.shareaware.market.rejection.Rejections.SharesCannotBeSoldOnMarket;
import io.spine.examples.shareaware.market.rejection.Rejections.SharesCannotBeObtained;
-import io.spine.examples.shareaware.given.GivenMoney;
+import io.spine.examples.shareaware.market.rejection.Rejections.SharesCannotBeSoldOnMarket;
import io.spine.examples.shareaware.server.market.MarketProcess;
+import io.spine.examples.shareaware.share.Share;
+
+import java.util.Arrays;
-import static io.spine.base.Identifier.*;
-import static io.spine.examples.shareaware.server.given.GivenShare.*;
+import static io.spine.examples.shareaware.server.given.GivenShare.apple;
+import static io.spine.examples.shareaware.server.given.GivenShare.tesla;
public final class MarketTestEnv {
@@ -139,12 +141,17 @@ public static SellSharesOnMarket sellSharesOnMarket() {
}
public static MarketSharesUpdated marketSharesUpdated() {
- return MarketSharesUpdated
+ return marketSharesUpdated(tesla(), apple());
+ }
+
+ public static MarketSharesUpdated marketSharesUpdated(Share... shares) {
+ var builder = MarketSharesUpdated
.newBuilder()
.setMarket(MarketProcess.ID)
- .addShare(tesla())
- .addShare(apple())
- .vBuild();
+ .setWhenUpdated(Time.currentTime());
+ Arrays.stream(shares)
+ .forEach(builder::addShare);
+ return builder.vBuild();
}
public static AvailableMarketShares
diff --git a/server/src/test/java/io/spine/examples/shareaware/server/market/given/SharesPriceMovementTestEnv.java b/server/src/test/java/io/spine/examples/shareaware/server/market/given/SharesPriceMovementTestEnv.java
new file mode 100644
index 00000000..280605c7
--- /dev/null
+++ b/server/src/test/java/io/spine/examples/shareaware/server/market/given/SharesPriceMovementTestEnv.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.server.market.given;
+
+import com.google.protobuf.Duration;
+import com.google.protobuf.Timestamp;
+import io.spine.base.EntityColumn;
+import io.spine.core.ActorContext;
+import io.spine.core.TenantId;
+import io.spine.examples.shareaware.ShareId;
+import io.spine.examples.shareaware.SharePriceMovementId;
+import io.spine.examples.shareaware.market.PriceAtTime;
+import io.spine.examples.shareaware.market.SharePriceMovementPerMinute;
+import io.spine.examples.shareaware.market.event.MarketSharesUpdated;
+import io.spine.money.Money;
+
+import static io.spine.base.Time.currentTime;
+import static io.spine.testing.core.given.GivenUserId.newUuid;
+
+public final class SharesPriceMovementTestEnv {
+
+ public static final long projectionActivityTime = 60;
+
+ public static final EntityColumn ShareFieldInProjection =
+ SharePriceMovementPerMinute.Column.share();
+
+ /**
+ * Prevents instantiation of this class.
+ */
+ private SharesPriceMovementTestEnv() {
+ }
+
+ public static SharePriceMovementPerMinute sharePriceMovementPerMinute(
+ ShareId shareId,
+ MarketSharesUpdated firstEvent,
+ MarketSharesUpdated secondEvent
+ ) {
+ Money firstPrice = firstEvent.find(shareId)
+ .getPrice();
+ Money secondPrice = secondEvent.find(shareId)
+ .getPrice();
+ PriceAtTime firstPriceAtTime = priceAtTime(firstPrice, firstEvent.getWhenUpdated());
+ PriceAtTime secondPriceAtTime = priceAtTime(secondPrice, secondEvent.getWhenUpdated());
+ Duration activityTime = Duration
+ .newBuilder()
+ .setSeconds(projectionActivityTime)
+ .build();
+ SharePriceMovementId sharePriceMovementId = sharePriceMovementId(shareId, activityTime);
+ return SharePriceMovementPerMinute
+ .newBuilder()
+ .setId(sharePriceMovementId)
+ .addPoint(firstPriceAtTime)
+ .addPoint(secondPriceAtTime)
+ .setShare(shareId)
+ .buildPartial();
+ }
+
+ public static ActorContext actorContext() {
+ TenantId tenantId = TenantId
+ .newBuilder()
+ .setValue("SharePriceMovementTest")
+ .build();
+ return ActorContext
+ .newBuilder()
+ .setActor(newUuid())
+ .setTenantId(tenantId)
+ .setTimestamp(currentTime())
+ .vBuild();
+ }
+
+ private static PriceAtTime priceAtTime(Money price, Timestamp time) {
+ return PriceAtTime
+ .newBuilder()
+ .setPrice(price)
+ .setWhen(time)
+ .vBuild();
+ }
+
+ private static SharePriceMovementId sharePriceMovementId(
+ ShareId share,
+ Duration activityTime
+ ) {
+ return SharePriceMovementId
+ .newBuilder()
+ .setShare(share)
+ .setActivityTime(activityTime)
+ .buildPartial();
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 8bfed053..e8e68fb1 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -28,3 +28,4 @@ rootProject.name = "ShareAware"
include("model")
include("server")
include("client")
+include("testutil-server")
diff --git a/testutil-server/build.gradle.kts b/testutil-server/build.gradle.kts
new file mode 100644
index 00000000..aa01dde6
--- /dev/null
+++ b/testutil-server/build.gradle.kts
@@ -0,0 +1,9 @@
+import io.spine.examples.shareaware.dependency.Truth
+
+repositories {
+ mavenCentral()
+}
+
+dependencies {
+ testImplementation(Truth.lib)
+}
diff --git a/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/AsyncObserver.java b/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/AsyncObserver.java
new file mode 100644
index 00000000..d3192ce9
--- /dev/null
+++ b/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/AsyncObserver.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.testing.server.e2e;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static io.spine.util.Exceptions.illegalStateWithCauseOf;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * Observer for entity state changes.
+ *
+ * Allows to observe the asynchronous mutations of the entity state.
+ *
+ * @param
+ * the state to observe
+ * @param
+ * the type of the command, the execution of which should lead to changes in the state
+ */
+public class AsyncObserver {
+
+ private final Consumer howToCommand;
+
+ private CompletableFuture statePack = new CompletableFuture<>();
+
+ private @Nullable S state = null;
+
+ private final AtomicReference observationState =
+ new AtomicReference<>(null);
+
+ /**
+ * Creates the new instance of the {@code AsyncObserver}.
+ *
+ * @param howToObserve
+ * a callback that defines how to observe and route the state {@link S}.
+ * @param howToCommand
+ * a callback that defines how to send a command {@link C}.
+ */
+ public AsyncObserver(
+ StateRouter howToObserve,
+ Consumer howToCommand) {
+ this.howToCommand = howToCommand;
+ howToObserve.route(value -> {
+ state = value;
+ if (statePack.isDone()) {
+ statePack = new CompletableFuture<>();
+ }
+ statePack.complete(value);
+ observationState.set(ObservationState.UPDATE_PACKED);
+ });
+ }
+
+ /**
+ * Posts a command and waits for an update of the entity state that
+ * should occur as a consequence of the posted command.
+ *
+ * If an update of the entity state is not received within 10 seconds,
+ * a {@code TimeoutException} is thrown.
+ */
+ public S onceUpdatedAfter(C command) {
+ howToCommand.accept(command);
+ return waitForUpdate();
+ }
+
+ /**
+ * Returns the current state of the entity if it exists, null otherwise.
+ */
+ public @Nullable S state() {
+ return state;
+ }
+
+ /**
+ * Waits for an update of the entity state to arrive and return this state.
+ *
+ *
An update of the entity state should be received within 10 seconds,
+ * otherwise a {@code TimeoutException} will be thrown.
+ */
+ private S waitForUpdate() {
+ checkForUpdate();
+ S updatedEntity = unpackState();
+ observationState.set(ObservationState.UPDATE_UNPACKED);
+ return updatedEntity;
+ }
+
+ /**
+ * Checks for the update of the entity state to arrive
+ * at the moment when this method is called.
+ *
+ *
If the update of the entity state has not arrived
+ * this method forces the waiting for update.
+ */
+ private void checkForUpdate() {
+ if (observationState.get() != null &&
+ observationState.get() != ObservationState.UPDATE_PACKED) {
+ statePack = new CompletableFuture<>();
+ }
+ }
+
+ /**
+ * Unpack the entity state from the {@link #statePack}.
+ *
+ *
If the entity state cannot be unpacked within 10 seconds,
+ * a {@code TimeoutException} will be thrown.
+ */
+ private S unpackState() {
+ try {
+ return statePack.whenComplete((value, error) -> {
+ if (error != null) {
+ throw illegalStateWithCauseOf(error);
+ }
+ })
+ .orTimeout(10, SECONDS)
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw illegalStateWithCauseOf(e);
+ }
+ }
+
+ /**
+ * Represents the states of the entity observation.
+ *
+ *
The {@code AsyncObserver} can observe asynchronous changes of the entity's state,
+ * and these states were introduced to restrict the order of the observing operations.
+ */
+ private enum ObservationState {
+
+ // The update entity state has arrived asynchronously
+ // and packed for synchronization with the main thread.
+ UPDATE_PACKED,
+
+ // The update of the entity state has been unpacked and ready for usage in the main thread.
+ UPDATE_UNPACKED
+ }
+}
diff --git a/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/StateRecipient.java b/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/StateRecipient.java
new file mode 100644
index 00000000..a9d80314
--- /dev/null
+++ b/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/StateRecipient.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.testing.server.e2e;
+
+import java.util.function.Consumer;
+
+/**
+ * Represents a recipient of state updates.
+ *
+ *
This interface is designed to provide a standardized way of accepting
+ * and handling state updates from various routers or observers.
+ *
+ * @param the type of the state to be received
+ */
+public interface StateRecipient extends Consumer {
+
+ /**
+ * Receives the updated state.
+ *
+ *
This method is used to receive state updates
+ * and define how the recipient should handle it.
+ *
+ * @param state the updated state to be received and processed
+ */
+ void receive(S state);
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implNote Inheritors can choose to override this method or directly implement
+ * the {@link #receive} method to handle the received state.
+ *
+ * @param state the updated state to be received and processed
+ */
+ @Override
+ default void accept(S state) {
+ receive(state);
+ };
+}
diff --git a/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/StateRouter.java b/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/StateRouter.java
new file mode 100644
index 00000000..88bbb9cd
--- /dev/null
+++ b/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/StateRouter.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.testing.server.e2e;
+
+import java.util.function.Consumer;
+
+/**
+ * Represents a state router to deliver the state to the {@code StateRecipient}.
+ *
+ *
A state router acts as a bridge between the subject that produces the state
+ * and the recipients that are interested in receiving and processing this state.
+ *
+ * @param the type of the state to be routed to the {@code StateRecipient}
+ */
+public interface StateRouter extends Consumer> {
+
+ /**
+ * Routes the state to the specified recipient.
+ *
+ * @param recipient the recipient to which the state should be routed.
+ */
+ void route(StateRecipient recipient);
+
+ /**
+ * Accepts a {@code StateRecipient} and routes the state to it.
+ *
+ * @param recipient the recipient to which the state should be routed.
+ * @implNote Inheritors can choose to override this method or directly implement
+ * the {@link #route} method to route the state.
+ */
+ @Override
+ default void accept(StateRecipient recipient) {
+ route(recipient);
+ }
+}
diff --git a/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/package-info.java b/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/package-info.java
new file mode 100644
index 00000000..42852b3e
--- /dev/null
+++ b/testutil-server/src/main/java/io/spine/examples/shareaware/testing/server/e2e/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * This package contains the utilities for the end-to-end testing.
+ */
+@CheckReturnValue
+@ParametersAreNonnullByDefault
+package io.spine.examples.shareaware.testing.server.e2e;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/testutil-server/src/test/java/io/spine/examples/shareaware/testing/server/e2e/AsyncObserverTest.java b/testutil-server/src/test/java/io/spine/examples/shareaware/testing/server/e2e/AsyncObserverTest.java
new file mode 100644
index 00000000..c0e1025f
--- /dev/null
+++ b/testutil-server/src/test/java/io/spine/examples/shareaware/testing/server/e2e/AsyncObserverTest.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.testing.server.e2e;
+
+import io.spine.examples.shareaware.testing.server.e2e.given.AsyncStateMutator;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.TimeoutException;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.time.Duration.ofSeconds;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@DisplayName("`AsyncObserver` should")
+class AsyncObserverTest {
+
+ @Test
+ @DisplayName("observe mutation of the state with the slow response from the mutator")
+ void observeMutationWithDelay() {
+ var stateMutator = new AsyncStateMutator("state", ofSeconds(5));
+ var observer = new AsyncObserver<>(stateMutator.mutationRouter(),
+ stateMutator::mutateState);
+
+ var actualStateAfterFirstMutation = observer.onceUpdatedAfter(true);
+ var stateAfterFirstMutation = stateMutator.state();
+ assertThat(actualStateAfterFirstMutation).isEqualTo(stateAfterFirstMutation);
+ assertThat(observer.state()).isEqualTo(stateAfterFirstMutation);
+
+ var actualStateAfterSecondMutation = observer.onceUpdatedAfter(true);
+ var stateAfterSecondMutation = stateMutator.state();
+ assertThat(actualStateAfterSecondMutation).isEqualTo(stateAfterSecondMutation);
+ assertThat(observer.state()).isEqualTo(stateAfterSecondMutation);
+
+ stateMutator.stopMutatorThread();
+ }
+
+ @Test
+ @DisplayName("observe mutation of the state with the fast response from the mutator")
+ void observeMutationWithoutDelay() {
+ var stateMutator = new AsyncStateMutator("state", ofSeconds(0));
+ var observer = new AsyncObserver<>(stateMutator.mutationRouter(),
+ stateMutator::mutateState);
+
+ var actualStateAfterFirstMutation = observer.onceUpdatedAfter(true);
+ var stateAfterFirstMutation = stateMutator.state();
+ assertThat(actualStateAfterFirstMutation).isEqualTo(stateAfterFirstMutation);
+ assertThat(observer.state()).isEqualTo(stateAfterFirstMutation);
+
+ stateMutator.stopMutatorThread();
+ }
+
+ @Test
+ @DisplayName("throw the `IllegalArgumentException` with cause of `TimeoutException`" +
+ "when the state mutation were not received in 10 seconds")
+ void throwExceptionWithoutMutation() {
+ var stateMutator = new AsyncStateMutator("", ofSeconds(0));
+ var observer = new AsyncObserver<>(stateMutator.mutationRouter(),
+ stateMutator::mutateState);
+
+ IllegalStateException exception =
+ assertThrows(IllegalStateException.class,
+ () -> observer.onceUpdatedAfter(false));
+ assertThat(exception.getCause()
+ .getClass()).isEqualTo(TimeoutException.class);
+
+ stateMutator.stopMutatorThread();
+ }
+}
diff --git a/testutil-server/src/test/java/io/spine/examples/shareaware/testing/server/e2e/given/AsyncStateMutator.java b/testutil-server/src/test/java/io/spine/examples/shareaware/testing/server/e2e/given/AsyncStateMutator.java
new file mode 100644
index 00000000..06bc1516
--- /dev/null
+++ b/testutil-server/src/test/java/io/spine/examples/shareaware/testing/server/e2e/given/AsyncStateMutator.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.examples.shareaware.testing.server.e2e.given;
+
+import io.spine.examples.shareaware.testing.server.e2e.StateRecipient;
+import io.spine.examples.shareaware.testing.server.e2e.StateRouter;
+import io.spine.util.Exceptions;
+
+import java.security.SecureRandom;
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
+/**
+ * State mutator that working in a separate thread.
+ */
+public class AsyncStateMutator {
+
+ private final StateRouter mutationRouter;
+
+ private final AtomicBoolean isMutated = new AtomicBoolean(false);
+
+ private final AtomicReference state = new AtomicReference<>();
+
+ private final ExecutorService thread = newSingleThreadExecutor();
+
+ private final Random random = new SecureRandom();
+
+ private final Object lock = new Object();
+
+ private volatile boolean keepRunning = true;
+
+ public AsyncStateMutator(String state, Duration delay) {
+ this.state.set(state);
+ mutationRouter = recipient ->
+ thread.execute(() -> {
+ while (keepRunning) {
+ waitForMutate();
+ routeState(delay, recipient);
+ }
+ });
+ }
+
+ /**
+ * Waits for the state mutation to occur.
+ *
+ * If the state mutation does not occur within 10 seconds, a
+ * {@code TimeoutException} is thrown.
+ */
+ private void waitForMutate() {
+ synchronized (lock) {
+ while (!isMutated.get() && keepRunning) {
+ try {
+ lock.wait(10000);
+ } catch (InterruptedException e) {
+ throw Exceptions.illegalStateWithCauseOf(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Routes the modified state to receiver.
+ *
+ * @param delay
+ * the duration to delay before routing the state
+ * @param recipient
+ * the recipient to route the modified state to
+ */
+ private void routeState(Duration delay, StateRecipient recipient) {
+ if (isMutated.get()) {
+ sleepUninterruptibly(delay);
+ recipient.receive(this.state.get());
+ isMutated.set(false);
+ }
+ }
+
+ /**
+ * Mutate the state by adding the random number to it.
+ */
+ public synchronized void mutateState(Boolean isMutating) {
+ if (isMutating) {
+ int additionalState = random.nextInt(100);
+ state.set(state.get() + additionalState);
+ }
+ isMutated.set(isMutating);
+ if (isMutating) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Stops the thread where the {@code AsyncStateMutator} is executing.
+ */
+ public void stopMutatorThread() {
+ keepRunning = false;
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ thread.shutdown();
+ }
+
+ /**
+ * Returns the current state.
+ */
+ public String state() {
+ return state.get();
+ }
+
+ /**
+ * Returns the callback that routes the mutated state to the recipient.
+ */
+ public StateRouter mutationRouter() {
+ return mutationRouter;
+ }
+}
diff --git a/testutil-server/src/test/java/io/spine/examples/shareaware/testing/server/e2e/given/package-info.java b/testutil-server/src/test/java/io/spine/examples/shareaware/testing/server/e2e/given/package-info.java
new file mode 100644
index 00000000..35c01773
--- /dev/null
+++ b/testutil-server/src/test/java/io/spine/examples/shareaware/testing/server/e2e/given/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2023, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Provides test environment for the end-to-end testing.
+ */
+@CheckReturnValue
+@ParametersAreNonnullByDefault
+package io.spine.examples.shareaware.testing.server.e2e.given;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+import javax.annotation.ParametersAreNonnullByDefault;