From 6c820516fde2d8a28d2d69349b1ddea1abf21ce9 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 29 Mar 2026 14:35:41 +0200 Subject: [PATCH] add inline projection feature --- composer.json | 2 +- composer.lock | 127 +++++++++--------- .../DoctrineProjectionStateStore.php | 79 +++++++++++ src/Subscription/Projection/Projection.php | 38 ++++++ .../Projection/ProjectionStateStore.php | 12 ++ .../Subscriber/ProfileInlineProjection.php | 29 ++++ .../Subscription/SubscriptionTest.php | 121 +++++++++++++++++ 7 files changed, 344 insertions(+), 64 deletions(-) create mode 100644 src/Subscription/Projection/DoctrineProjectionStateStore.php create mode 100644 src/Subscription/Projection/Projection.php create mode 100644 src/Subscription/Projection/ProjectionStateStore.php create mode 100644 tests/Integration/Subscription/Subscriber/ProfileInlineProjection.php diff --git a/composer.json b/composer.json index 89b479ea..224c7215 100644 --- a/composer.json +++ b/composer.json @@ -22,7 +22,7 @@ "php": "~8.2.0 || ~8.3.0 || ~8.4.0 || ~8.5.0", "doctrine/dbal": "^4.0.0", "doctrine/migrations": "^3.3.2", - "patchlevel/hydrator": "^1.8.0", + "patchlevel/hydrator": "^1.19.0", "patchlevel/worker": "^1.4.0", "psr/cache": "^2.0.0 || ^3.0.0", "psr/clock": "^1.0", diff --git a/composer.lock b/composer.lock index fb48f038..e9438518 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "eaaa35929b4073dbe5946b8486ca1a64", + "content-hash": "533e45fe8486bd6c5ee24aa6c5701614", "packages": [ { "name": "brick/math", @@ -68,16 +68,16 @@ }, { "name": "doctrine/dbal", - "version": "4.4.2", + "version": "4.4.3", "source": { "type": "git", "url": "https://github.com/doctrine/dbal.git", - "reference": "476f7f0fa6ea4aa5364926db7fabdf6049075722" + "reference": "61e730f1658814821a85f2402c945f3883407dec" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/dbal/zipball/476f7f0fa6ea4aa5364926db7fabdf6049075722", - "reference": "476f7f0fa6ea4aa5364926db7fabdf6049075722", + "url": "https://api.github.com/repos/doctrine/dbal/zipball/61e730f1658814821a85f2402c945f3883407dec", + "reference": "61e730f1658814821a85f2402c945f3883407dec", "shasum": "" }, "require": { @@ -154,7 +154,7 @@ ], "support": { "issues": "https://github.com/doctrine/dbal/issues", - "source": "https://github.com/doctrine/dbal/tree/4.4.2" + "source": "https://github.com/doctrine/dbal/tree/4.4.3" }, "funding": [ { @@ -170,7 +170,7 @@ "type": "tidelift" } ], - "time": "2026-02-26T12:12:19+00:00" + "time": "2026-03-20T08:52:12+00:00" }, { "name": "doctrine/deprecations", @@ -416,16 +416,16 @@ }, { "name": "patchlevel/hydrator", - "version": "1.16.0", + "version": "1.19.0", "source": { "type": "git", "url": "https://github.com/patchlevel/hydrator.git", - "reference": "27b29f806f366194752cbaaa241e426bed2d942f" + "reference": "5b79db6c2089cd3ddb5f08666179d85bda3160d7" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/patchlevel/hydrator/zipball/27b29f806f366194752cbaaa241e426bed2d942f", - "reference": "27b29f806f366194752cbaaa241e426bed2d942f", + "url": "https://api.github.com/repos/patchlevel/hydrator/zipball/5b79db6c2089cd3ddb5f08666179d85bda3160d7", + "reference": "5b79db6c2089cd3ddb5f08666179d85bda3160d7", "shasum": "" }, "require": { @@ -474,9 +474,9 @@ ], "support": { "issues": "https://github.com/patchlevel/hydrator/issues", - "source": "https://github.com/patchlevel/hydrator/tree/1.16.0" + "source": "https://github.com/patchlevel/hydrator/tree/1.19.0" }, - "time": "2026-02-17T11:56:35+00:00" + "time": "2026-03-29T12:04:47+00:00" }, { "name": "patchlevel/worker", @@ -995,16 +995,16 @@ }, { "name": "symfony/console", - "version": "v8.0.6", + "version": "v8.0.7", "source": { "type": "git", "url": "https://github.com/symfony/console.git", - "reference": "488285876e807a4777f074041d8bb508623419fa" + "reference": "15ed9008a4ebe2d6a78e4937f74e0c13ef2e618a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/console/zipball/488285876e807a4777f074041d8bb508623419fa", - "reference": "488285876e807a4777f074041d8bb508623419fa", + "url": "https://api.github.com/repos/symfony/console/zipball/15ed9008a4ebe2d6a78e4937f74e0c13ef2e618a", + "reference": "15ed9008a4ebe2d6a78e4937f74e0c13ef2e618a", "shasum": "" }, "require": { @@ -1061,7 +1061,7 @@ "terminal" ], "support": { - "source": "https://github.com/symfony/console/tree/v8.0.6" + "source": "https://github.com/symfony/console/tree/v8.0.7" }, "funding": [ { @@ -1081,7 +1081,7 @@ "type": "tidelift" } ], - "time": "2026-02-25T16:59:43+00:00" + "time": "2026-03-06T14:06:22+00:00" }, { "name": "symfony/deprecation-contracts", @@ -1959,16 +1959,16 @@ }, { "name": "symfony/type-info", - "version": "v8.0.6", + "version": "v8.0.7", "source": { "type": "git", "url": "https://github.com/symfony/type-info.git", - "reference": "785992c06d07306f963ded3439036f5da9b292fe" + "reference": "3c7de103dd6cb68be24e155838a64ef4a70ae195" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/type-info/zipball/785992c06d07306f963ded3439036f5da9b292fe", - "reference": "785992c06d07306f963ded3439036f5da9b292fe", + "url": "https://api.github.com/repos/symfony/type-info/zipball/3c7de103dd6cb68be24e155838a64ef4a70ae195", + "reference": "3c7de103dd6cb68be24e155838a64ef4a70ae195", "shasum": "" }, "require": { @@ -2017,7 +2017,7 @@ "type" ], "support": { - "source": "https://github.com/symfony/type-info/tree/v8.0.6" + "source": "https://github.com/symfony/type-info/tree/v8.0.7" }, "funding": [ { @@ -2037,7 +2037,7 @@ "type": "tidelift" } ], - "time": "2026-02-20T07:51:53+00:00" + "time": "2026-03-04T13:55:34+00:00" }, { "name": "symfony/var-exporter", @@ -3609,16 +3609,16 @@ }, { "name": "league/commonmark", - "version": "2.8.0", + "version": "2.8.2", "source": { "type": "git", "url": "https://github.com/thephpleague/commonmark.git", - "reference": "4efa10c1e56488e658d10adf7b7b7dcd19940bfb" + "reference": "59fb075d2101740c337c7216e3f32b36c204218b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/thephpleague/commonmark/zipball/4efa10c1e56488e658d10adf7b7b7dcd19940bfb", - "reference": "4efa10c1e56488e658d10adf7b7b7dcd19940bfb", + "url": "https://api.github.com/repos/thephpleague/commonmark/zipball/59fb075d2101740c337c7216e3f32b36c204218b", + "reference": "59fb075d2101740c337c7216e3f32b36c204218b", "shasum": "" }, "require": { @@ -3643,9 +3643,9 @@ "phpstan/phpstan": "^1.8.2", "phpunit/phpunit": "^9.5.21 || ^10.5.9 || ^11.0.0", "scrutinizer/ocular": "^1.8.1", - "symfony/finder": "^5.3 | ^6.0 | ^7.0", - "symfony/process": "^5.4 | ^6.0 | ^7.0", - "symfony/yaml": "^2.3 | ^3.0 | ^4.0 | ^5.0 | ^6.0 | ^7.0", + "symfony/finder": "^5.3 | ^6.0 | ^7.0 || ^8.0", + "symfony/process": "^5.4 | ^6.0 | ^7.0 || ^8.0", + "symfony/yaml": "^2.3 | ^3.0 | ^4.0 | ^5.0 | ^6.0 | ^7.0 || ^8.0", "unleashedtech/php-coding-standard": "^3.1.1", "vimeo/psalm": "^4.24.0 || ^5.0.0 || ^6.0.0" }, @@ -3712,7 +3712,7 @@ "type": "tidelift" } ], - "time": "2025-11-26T21:48:24+00:00" + "time": "2026-03-19T13:16:38+00:00" }, { "name": "league/config", @@ -4398,16 +4398,16 @@ }, { "name": "phpat/phpat", - "version": "0.12.3", + "version": "0.12.4", "source": { "type": "git", "url": "https://github.com/carlosas/phpat.git", - "reference": "2412a8959254a076e751498cbba8cf29406e0cf4" + "reference": "5319264270c335f548451209bb0f32b55aa59924" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/carlosas/phpat/zipball/2412a8959254a076e751498cbba8cf29406e0cf4", - "reference": "2412a8959254a076e751498cbba8cf29406e0cf4", + "url": "https://api.github.com/repos/carlosas/phpat/zipball/5319264270c335f548451209bb0f32b55aa59924", + "reference": "5319264270c335f548451209bb0f32b55aa59924", "shasum": "" }, "require": { @@ -4449,9 +4449,9 @@ "description": "PHP Architecture Tester", "support": { "issues": "https://github.com/carlosas/phpat/issues", - "source": "https://github.com/carlosas/phpat/tree/0.12.3" + "source": "https://github.com/carlosas/phpat/tree/0.12.4" }, - "time": "2026-02-20T11:15:22+00:00" + "time": "2026-03-17T16:47:43+00:00" }, { "name": "phpbench/container", @@ -4506,16 +4506,16 @@ }, { "name": "phpbench/phpbench", - "version": "1.4.3", + "version": "1.6.1", "source": { "type": "git", "url": "https://github.com/phpbench/phpbench.git", - "reference": "b641dde59d969ea42eed70a39f9b51950bc96878" + "reference": "661c8c6abbc7734986cf7bc6062c237fbb450461" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpbench/phpbench/zipball/b641dde59d969ea42eed70a39f9b51950bc96878", - "reference": "b641dde59d969ea42eed70a39f9b51950bc96878", + "url": "https://api.github.com/repos/phpbench/phpbench/zipball/661c8c6abbc7734986cf7bc6062c237fbb450461", + "reference": "661c8c6abbc7734986cf7bc6062c237fbb450461", "shasum": "" }, "require": { @@ -4526,7 +4526,7 @@ "ext-reflection": "*", "ext-spl": "*", "ext-tokenizer": "*", - "php": "^8.1", + "php": "^8.2", "phpbench/container": "^2.2", "psr/log": "^1.1 || ^2.0 || ^3.0", "seld/jsonlint": "^1.1", @@ -4546,8 +4546,9 @@ "phpstan/extension-installer": "^1.1", "phpstan/phpstan": "^1.0", "phpstan/phpstan-phpunit": "^1.0", - "phpunit/phpunit": "^10.4 || ^11.0", + "phpunit/phpunit": "^11.5", "rector/rector": "^1.2", + "sebastian/exporter": "^6.3.2", "symfony/error-handler": "^6.1 || ^7.0 || ^8.0", "symfony/var-dumper": "^6.1 || ^7.0 || ^8.0" }, @@ -4592,7 +4593,7 @@ ], "support": { "issues": "https://github.com/phpbench/phpbench/issues", - "source": "https://github.com/phpbench/phpbench/tree/1.4.3" + "source": "https://github.com/phpbench/phpbench/tree/1.6.1" }, "funding": [ { @@ -4600,7 +4601,7 @@ "type": "github" } ], - "time": "2025-11-06T19:07:31+00:00" + "time": "2026-03-22T10:27:20+00:00" }, { "name": "phpstan/phpdoc-parser", @@ -4651,11 +4652,11 @@ }, { "name": "phpstan/phpstan", - "version": "2.1.40", + "version": "2.1.44", "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/9b2c7aeb83a75d8680ea5e7c9b7fca88052b766b", - "reference": "9b2c7aeb83a75d8680ea5e7c9b7fca88052b766b", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/4a88c083c668b2c364a425c9b3171b2d9ea5d218", + "reference": "4a88c083c668b2c364a425c9b3171b2d9ea5d218", "shasum": "" }, "require": { @@ -4700,7 +4701,7 @@ "type": "github" } ], - "time": "2026-02-23T15:04:35+00:00" + "time": "2026-03-25T17:34:21+00:00" }, { "name": "phpstan/phpstan-phpunit", @@ -6884,16 +6885,16 @@ }, { "name": "symfony/messenger", - "version": "v8.0.6", + "version": "v8.0.7", "source": { "type": "git", "url": "https://github.com/symfony/messenger.git", - "reference": "4be925bf0155d6435d2cdfa63d5ffd277c44ac10" + "reference": "6ba5f08c156cfc95911dbb0da01a9bc390a70fd1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/messenger/zipball/4be925bf0155d6435d2cdfa63d5ffd277c44ac10", - "reference": "4be925bf0155d6435d2cdfa63d5ffd277c44ac10", + "url": "https://api.github.com/repos/symfony/messenger/zipball/6ba5f08c156cfc95911dbb0da01a9bc390a70fd1", + "reference": "6ba5f08c156cfc95911dbb0da01a9bc390a70fd1", "shasum": "" }, "require": { @@ -6950,7 +6951,7 @@ "description": "Helps applications send and receive messages to/from other applications or via message queues", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/messenger/tree/v8.0.6" + "source": "https://github.com/symfony/messenger/tree/v8.0.7" }, "funding": [ { @@ -6970,7 +6971,7 @@ "type": "tidelift" } ], - "time": "2026-02-25T16:59:43+00:00" + "time": "2026-03-04T13:55:34+00:00" }, { "name": "symfony/options-resolver", @@ -7634,16 +7635,16 @@ }, { "name": "webmozart/assert", - "version": "2.1.5", + "version": "2.1.6", "source": { "type": "git", "url": "https://github.com/webmozarts/assert.git", - "reference": "79155f94852fa27e2f73b459f6503f5e87e2c188" + "reference": "ff31ad6efc62e66e518fbab1cde3453d389bcdc8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/webmozarts/assert/zipball/79155f94852fa27e2f73b459f6503f5e87e2c188", - "reference": "79155f94852fa27e2f73b459f6503f5e87e2c188", + "url": "https://api.github.com/repos/webmozarts/assert/zipball/ff31ad6efc62e66e518fbab1cde3453d389bcdc8", + "reference": "ff31ad6efc62e66e518fbab1cde3453d389bcdc8", "shasum": "" }, "require": { @@ -7690,9 +7691,9 @@ ], "support": { "issues": "https://github.com/webmozarts/assert/issues", - "source": "https://github.com/webmozarts/assert/tree/2.1.5" + "source": "https://github.com/webmozarts/assert/tree/2.1.6" }, - "time": "2026-02-18T14:09:36+00:00" + "time": "2026-02-27T10:28:38+00:00" }, { "name": "webmozart/glob", diff --git a/src/Subscription/Projection/DoctrineProjectionStateStore.php b/src/Subscription/Projection/DoctrineProjectionStateStore.php new file mode 100644 index 00000000..f4bf2fea --- /dev/null +++ b/src/Subscription/Projection/DoctrineProjectionStateStore.php @@ -0,0 +1,79 @@ +subscriberId($projection); + $data = $this->hydrator->extract($projection); + + $this->connection->insert($this->tableName, [ + 'id' => $subscriberId, + 'state' => json_encode($data), + ]); + } + + public function load(Projection $projection): void + { + $subscriberId = $this->subscriberId($projection); + + $data = $this->connection->fetchAssociative( + 'SELECT * FROM ' . $this->tableName . ' WHERE id = ?', + [$subscriberId], + ); + + if (!$data) { + return; + } + + $this->hydrator->hydrate($projection::class, json_decode($data['state'], true), [HydratorWithContext::OBJECT_TO_POPULATE => $projection]); + } + + public function configureSchema(Schema $schema, Connection $connection): void + { + if (!DoctrineHelper::sameDatabase($this->connection, $connection)) { + return; + } + + $table = $schema->createTable($this->tableName); + + $table->addColumn('id', Types::STRING) + ->setLength(255) + ->setNotnull(true); + $table->addColumn('state', Types::JSON) + ->setNotnull(true); + + $table->setPrimaryKey(['id']); + } + + public function subscriberId(Projection $projection): string + { + return $this->subscriberMetadataFactory->metadata($projection::class)->id; + } +} diff --git a/src/Subscription/Projection/Projection.php b/src/Subscription/Projection/Projection.php new file mode 100644 index 00000000..d97d7b3f --- /dev/null +++ b/src/Subscription/Projection/Projection.php @@ -0,0 +1,38 @@ +store->load($this); + } + + public function beginBatch(): void + { + // do nothing + } + + public function commitBatch(): void + { + $this->store->store($this); + } + + public function rollbackBatch(): void + { + $this->store->load($this); + } + + public function forceCommit(): bool + { + return false; + } +} diff --git a/src/Subscription/Projection/ProjectionStateStore.php b/src/Subscription/Projection/ProjectionStateStore.php new file mode 100644 index 00000000..80d0f754 --- /dev/null +++ b/src/Subscription/Projection/ProjectionStateStore.php @@ -0,0 +1,12 @@ + */ + public array $profiles = []; + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->profiles[$profileCreated->profileId->toString()] = $profileCreated->name; + } + + public function findById(ProfileId $id): string|null + { + return $this->profiles[$id->toString()] ?? null; + } +} diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 4e1684b2..37c8b4a2 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -31,6 +31,7 @@ use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\StoreMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; +use Patchlevel\EventSourcing\Subscription\Projection\DoctrineProjectionStateStore; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Subscription\RunMode; use Patchlevel\EventSourcing\Subscription\Status; @@ -43,6 +44,7 @@ use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerWithSelfRecoverySubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\LookupSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\MigrateAggregateToStreamStoreSubscriber; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileInlineProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileNewProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProcessor; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProjection; @@ -1632,6 +1634,125 @@ class { self::assertEquals(RunMode::FromNow, $subscriptions[0]->runMode()); } + public function testInlineProjections(): void + { + $store = new DoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $stateStore = new DoctrineProjectionStateStore($this->connection); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + $stateStore, + ]), + ); + + $schemaDirector->create(); + + $subscriberRepository = new MetadataSubscriberAccessorRepository([ + new ProfileInlineProjection( + $stateStore, + ), + ]); + + $engine = new DefaultSubscriptionEngine( + new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), + $subscriptionStore, + $subscriberRepository, + ); + + self::assertEquals( + [ + new Subscription( + 'profile_inline', + 'projector', + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $engine->setup(); + + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_inline', + 'projector', + RunMode::FromBeginning, + Status::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $profileId = ProfileId::fromString('019d3991-e575-73b2-a18c-7da3fd7f5d70'); + $profile = Profile::create($profileId, 'John'); + $repository->save($profile); + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_inline', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $this->connection->fetchAssociative( + 'SELECT * FROM projection_state WHERE id = ?', + ['profile_inline'], + ); + + self::assertEquals([ + 'id' => 'profile_inline', + 'state' => '{"profiles":{"019d3991-e575-73b2-a18c-7da3fd7f5d70":"John"}}', + ], $result); + + $projection = new ProfileInlineProjection( + $stateStore, + ); + + self::assertEquals(['019d3991-e575-73b2-a18c-7da3fd7f5d70' => 'John'], $projection->profiles); + } + /** @param list $subscriptions */ private static function findSubscription(array $subscriptions, string $id): Subscription {