diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 30b55e2..2db3281 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -11,6 +11,7 @@ jobs: strategy: matrix: php-version: ["8.3", "8.4", "8.5"] + redis-image: ["redis:7-alpine", "redis:8-alpine", "valkey/valkey:9-alpine"] steps: - name: Checkout code @@ -18,7 +19,7 @@ jobs: - name: Build and Start Services # Pass the matrix value as an environment variable to Docker Compose - run: PHP_VERSION=${{ matrix.php-version }} docker compose up redis -d --build + run: PHP_VERSION=${{ matrix.php-version }} REDIS_IMAGE=${{ matrix.redis-image }} docker compose up redis -d --build - name: Unit Tests run: | @@ -26,4 +27,4 @@ jobs: - name: Stop Services if: always() - run: PHP_VERSION=${{ matrix.php-version }} docker compose down \ No newline at end of file + run: PHP_VERSION=${{ matrix.php-version }} REDIS_IMAGE=${{ matrix.redis-image }} docker compose down \ No newline at end of file diff --git a/Makefile b/Makefile index fa7db69..9d98e62 100644 --- a/Makefile +++ b/Makefile @@ -26,4 +26,4 @@ mago-format: # Run code formatting via mago. .PHONY: run-tests run-tests: # Run unit tests via PHPUnit. - docker compose run --rm php ./vendor/bin/phpunit --colors=always --configuration ./tests/phpunit.xml ./tests/Unit/ \ No newline at end of file + docker compose run --rm php ./vendor/bin/phpunit --colors=always --configuration ./tests/phpunit.xml ./tests/ \ No newline at end of file diff --git a/README.md b/README.md index 22e0a90..f778d1b 100644 --- a/README.md +++ b/README.md @@ -22,16 +22,28 @@ composer require mimatus/locksmith ## Roadmap - [x] Basic in-memory & Redis semaphore implementation +- [x] Redlock algorithm for Redis semaphore +- [x] Predis support for Redis semaphore +- [x] AMPHP Redis client support for Redis semaphore +- [x] First class support and tests for Redis 7 | Redis 8 | Valkey 9 - [ ] Feedback and API stabilization +- [ ] Redis Cluster support - [ ] Documentation improvements -- [ ] Redlock algorithm for Redis semaphore -- [ ] Predis support for Redis semaphore -- [ ] AMPHP Redis client support for Redis semaphore - [ ] MySQL/MariaDB/PostgreSQL semaphore implementation ## Usage +> [!NOTE] +> Project is still in early stages of development, so API is not stable yet and may change. Feedback is very welcome to help shape the API and make it more intuitive and easy to use. + ### In-Memory semaphore + +For single-process scenarios you can use in-memory semaphore implementation. It allows to limit concurrent access to resource within single process (e.g., number of concurrent HTTP requests, background jobs, or other tasks). + +It's suitable mainly for concurrent PHP - AMPHP, Swoole, ReactPHP, etc. + +It's not suitable for multi-process scenarios (e.g., multiple PHP-FPM workers, multiple servers) as each process/server will have its own instance of in-memory semaphore. For multi-process scenarios you should use Redis-based semaphore implementation. + ```php $locksmith = new Locksmith( @@ -41,11 +53,11 @@ $locksmith = new Locksmith( $resource = new Resource( namespace: 'test-resource', // Namespace/identifier for resource version: 1, // Optional resource version - ttlNanoseconds: 1_000_000_000, //How long should be resource locked ); $locked = $locksmith->locked( $resource, + lockTTLNs: 1_000_000_000, // How long should be resource locked maxLockWaitNs: 500_000_000, // How long to wait for lock acquisition - error if exceeded minSuspensionDelayNs: 10_000 // Minimum delay between retries when lock acquisition fails ); @@ -60,13 +72,26 @@ $locked(function (Closure $suspension): void { ``` ### Redis semaphore + +For distributed scenarios you can use Redis-based semaphore implementation. + +Supported Redis servers: +- Redis 7+ +- Valkey 9+ + +Supported Redis clients: +- PhpRedis +- Predis +- AMPHP Redis client + ```php $redis = new Redis(); $redis->connect('redis'); +$phpRedisCleint = new PhpRedisClient($redis); $semaphore = new RedisSemaphore( - redisClient: $redis, + redisClient: $phpRedisCleint, maxConcurrentLocks: 3, // Max concurrent locks ); @@ -76,11 +101,11 @@ $locksmith = new Locksmith(semaphore: $semaphore); $resource = new Resource( namespace: 'test-resource', // Namespace/identifier for resource version: 1, // Optional resouce version - ttlNanoseconds: 1_000_000_000, //How long should be resource locked ); $locked = $locksmith->locked( $resource, + lockTTLNs: 1_000_000_000, // How long should be resource locked maxLockWaitNs: 500_000_000, // How long to wait for lock acquisition - error if exceeded minSuspensionDelayNs: 10_000 // Minimum delay between retries when lock acquisition fails ); @@ -93,6 +118,53 @@ $locked(function (Closure $suspension): void { // Lock is released after callback execution ``` +### Distributed semaphore +Distributed semaphore allows to use multiple semaphore instances (e.g., multiple Redis instances) to achieve higher availability and fault tolerance. It uses quorum-based approach - single lock is successful only if the defined quorum of semaphores is reached. + +Implementation of distributed semaphore is based on [Redlock algorithm](https://redis.io/docs/latest/develop/clients/patterns/distributed-locks/#the-redlock-algorithm) with some adjustments to fit the `Semaphore` interface and allow cooperative suspension points. + +> [!NOTE] +> It's important to note that while distributed semaphore can be used Redis instances, it does not have first class support for Redis Cluster or Sentinel. First class support for Redis Cluster is on the roadmap, but in the meantime you can use distributed semaphore with multiple independent Redis instances as a workaround. + +```php +$semaphores = new SemaphoreCollection([ + new RedisSemaphore( + redisClient: $redisClient1, + maxConcurrentLocks: 3, + ), + new RedisSemaphore( + redisClient: $redisClient2, + maxConcurrentLocks: 3, + ), + new RedisSemaphore( + redisClient: $redisClient3, + maxConcurrentLocks: 3, + ), +]); + +$locksmith = new Locksmith( + semaphore: new DistributedSemaphore( + semaphores: $semaphores, + quorum: 2, + ), +); +$resource = new Resource( + namespace: 'test-resource', // Namespace/identifier for resource + version: 1, // Optional resource version +); +$locked = $locksmith->locked( + $resource, + lockTTLNs: 1_000_000_000, // How long should be resource locked + maxLockWaitNs: 500_000_000, // How long to wait for lock acquisition - error if exceeded + minSuspensionDelayNs: 10_000 // Minimum delay between retries when lock acquisition fails +); +$locked(function (Closure $suspension): void { + // Critical section - code executed under lock + + $suspension(); // Optional - cooperative suspension point to allow other lock acquisition attempts or allow lock TTL checks for long running processes +}); +// Lock is released after callback execution +``` ## Development ### Commits @@ -115,7 +187,7 @@ mago-analyze: Run static analysis via mago. mago-format: Run code formatting via mago. mago-lint-fix: Run linting with auto-fix via mago. mago-lint: Run linting via mago. -run-tests: Run unit tests via PHPUnit. +run-tests: Run tests via PHPUnit. ``` ## License diff --git a/composer.json b/composer.json index 9fb15c6..53da913 100644 --- a/composer.json +++ b/composer.json @@ -6,7 +6,9 @@ "php": ">=8.3" }, "suggest": { - "ext-redis": "To use this library with the PHP Redis extension." + "ext-redis": "To use this library with the PHP Redis extension.", + "predis/predis": "To use this library with the Predis client.", + "amphp/redis": "To use this library with the AMPHP Redis client." }, "autoload": { "psr-4": { @@ -20,6 +22,9 @@ }, "require-dev": { "ext-redis": "*", - "phpunit/phpunit": "^12.5" + "phpunit/phpunit": "^12.5", + "amphp/redis": "^2.0", + "predis/predis": "^3.3", + "revolt/event-loop": "^1.0" } } diff --git a/composer.lock b/composer.lock index 1122af3..a41e7b8 100644 --- a/composer.lock +++ b/composer.lock @@ -4,9 +4,1098 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "6a525cd42cbc85c10c1d7a10d546c274", + "content-hash": "347b9fd5107b5d2a529b4c76beca8030", "packages": [], "packages-dev": [ + { + "name": "amphp/amp", + "version": "v3.1.1", + "source": { + "type": "git", + "url": "https://github.com/amphp/amp.git", + "reference": "fa0ab33a6f47a82929c38d03ca47ebb71086a93f" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/amp/zipball/fa0ab33a6f47a82929c38d03ca47ebb71086a93f", + "reference": "fa0ab33a6f47a82929c38d03ca47ebb71086a93f", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "phpunit/phpunit": "^9", + "psalm/phar": "5.23.1" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php", + "src/Future/functions.php", + "src/Internal/functions.php" + ], + "psr-4": { + "Amp\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Bob Weinand", + "email": "bobwei9@hotmail.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Daniel Lowrey", + "email": "rdlowrey@php.net" + } + ], + "description": "A non-blocking concurrency framework for PHP applications.", + "homepage": "https://amphp.org/amp", + "keywords": [ + "async", + "asynchronous", + "awaitable", + "concurrency", + "event", + "event-loop", + "future", + "non-blocking", + "promise" + ], + "support": { + "issues": "https://github.com/amphp/amp/issues", + "source": "https://github.com/amphp/amp/tree/v3.1.1" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-08-27T21:42:00+00:00" + }, + { + "name": "amphp/byte-stream", + "version": "v2.1.2", + "source": { + "type": "git", + "url": "https://github.com/amphp/byte-stream.git", + "reference": "55a6bd071aec26fa2a3e002618c20c35e3df1b46" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/byte-stream/zipball/55a6bd071aec26fa2a3e002618c20c35e3df1b46", + "reference": "55a6bd071aec26fa2a3e002618c20c35e3df1b46", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/parser": "^1.1", + "amphp/pipeline": "^1", + "amphp/serialization": "^1", + "amphp/sync": "^2", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2.3" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "5.22.1" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php", + "src/Internal/functions.php" + ], + "psr-4": { + "Amp\\ByteStream\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "A stream abstraction to make working with non-blocking I/O simple.", + "homepage": "https://amphp.org/byte-stream", + "keywords": [ + "amp", + "amphp", + "async", + "io", + "non-blocking", + "stream" + ], + "support": { + "issues": "https://github.com/amphp/byte-stream/issues", + "source": "https://github.com/amphp/byte-stream/tree/v2.1.2" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-03-16T17:10:27+00:00" + }, + { + "name": "amphp/cache", + "version": "v2.0.1", + "source": { + "type": "git", + "url": "https://github.com/amphp/cache.git", + "reference": "46912e387e6aa94933b61ea1ead9cf7540b7797c" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/cache/zipball/46912e387e6aa94933b61ea1ead9cf7540b7797c", + "reference": "46912e387e6aa94933b61ea1ead9cf7540b7797c", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/serialization": "^1", + "amphp/sync": "^2", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.4" + }, + "type": "library", + "autoload": { + "psr-4": { + "Amp\\Cache\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Daniel Lowrey", + "email": "rdlowrey@php.net" + } + ], + "description": "A fiber-aware cache API based on Amp and Revolt.", + "homepage": "https://amphp.org/cache", + "support": { + "issues": "https://github.com/amphp/cache/issues", + "source": "https://github.com/amphp/cache/tree/v2.0.1" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-04-19T03:38:06+00:00" + }, + { + "name": "amphp/dns", + "version": "v2.4.0", + "source": { + "type": "git", + "url": "https://github.com/amphp/dns.git", + "reference": "78eb3db5fc69bf2fc0cb503c4fcba667bc223c71" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/dns/zipball/78eb3db5fc69bf2fc0cb503c4fcba667bc223c71", + "reference": "78eb3db5fc69bf2fc0cb503c4fcba667bc223c71", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/byte-stream": "^2", + "amphp/cache": "^2", + "amphp/parser": "^1", + "amphp/process": "^2", + "daverandom/libdns": "^2.0.2", + "ext-filter": "*", + "ext-json": "*", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "5.20" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "Amp\\Dns\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Chris Wright", + "email": "addr@daverandom.com" + }, + { + "name": "Daniel Lowrey", + "email": "rdlowrey@php.net" + }, + { + "name": "Bob Weinand", + "email": "bobwei9@hotmail.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + } + ], + "description": "Async DNS resolution for Amp.", + "homepage": "https://github.com/amphp/dns", + "keywords": [ + "amp", + "amphp", + "async", + "client", + "dns", + "resolve" + ], + "support": { + "issues": "https://github.com/amphp/dns/issues", + "source": "https://github.com/amphp/dns/tree/v2.4.0" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-01-19T15:43:40+00:00" + }, + { + "name": "amphp/parser", + "version": "v1.1.1", + "source": { + "type": "git", + "url": "https://github.com/amphp/parser.git", + "reference": "3cf1f8b32a0171d4b1bed93d25617637a77cded7" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/parser/zipball/3cf1f8b32a0171d4b1bed93d25617637a77cded7", + "reference": "3cf1f8b32a0171d4b1bed93d25617637a77cded7", + "shasum": "" + }, + "require": { + "php": ">=7.4" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.4" + }, + "type": "library", + "autoload": { + "psr-4": { + "Amp\\Parser\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "A generator parser to make streaming parsers simple.", + "homepage": "https://github.com/amphp/parser", + "keywords": [ + "async", + "non-blocking", + "parser", + "stream" + ], + "support": { + "issues": "https://github.com/amphp/parser/issues", + "source": "https://github.com/amphp/parser/tree/v1.1.1" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-03-21T19:16:53+00:00" + }, + { + "name": "amphp/pipeline", + "version": "v1.2.3", + "source": { + "type": "git", + "url": "https://github.com/amphp/pipeline.git", + "reference": "7b52598c2e9105ebcddf247fc523161581930367" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/pipeline/zipball/7b52598c2e9105ebcddf247fc523161581930367", + "reference": "7b52598c2e9105ebcddf247fc523161581930367", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "php": ">=8.1", + "revolt/event-loop": "^1" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.18" + }, + "type": "library", + "autoload": { + "psr-4": { + "Amp\\Pipeline\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Asynchronous iterators and operators.", + "homepage": "https://amphp.org/pipeline", + "keywords": [ + "amp", + "amphp", + "async", + "io", + "iterator", + "non-blocking" + ], + "support": { + "issues": "https://github.com/amphp/pipeline/issues", + "source": "https://github.com/amphp/pipeline/tree/v1.2.3" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-03-16T16:33:53+00:00" + }, + { + "name": "amphp/process", + "version": "v2.0.3", + "source": { + "type": "git", + "url": "https://github.com/amphp/process.git", + "reference": "52e08c09dec7511d5fbc1fb00d3e4e79fc77d58d" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/process/zipball/52e08c09dec7511d5fbc1fb00d3e4e79fc77d58d", + "reference": "52e08c09dec7511d5fbc1fb00d3e4e79fc77d58d", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/byte-stream": "^2", + "amphp/sync": "^2", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.4" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "Amp\\Process\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Bob Weinand", + "email": "bobwei9@hotmail.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "A fiber-aware process manager based on Amp and Revolt.", + "homepage": "https://amphp.org/process", + "support": { + "issues": "https://github.com/amphp/process/issues", + "source": "https://github.com/amphp/process/tree/v2.0.3" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-04-19T03:13:44+00:00" + }, + { + "name": "amphp/redis", + "version": "v2.0.3", + "source": { + "type": "git", + "url": "https://github.com/amphp/redis.git", + "reference": "1572c2fec2849d272570919e998f9a3c1a5b1703" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/redis/zipball/1572c2fec2849d272570919e998f9a3c1a5b1703", + "reference": "1572c2fec2849d272570919e998f9a3c1a5b1703", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/byte-stream": "^2", + "amphp/cache": "^2", + "amphp/parser": "^1", + "amphp/pipeline": "^1", + "amphp/serialization": "^1", + "amphp/socket": "^2", + "amphp/sync": "^2", + "league/uri": "^7", + "php": ">=8.1", + "psr/log": "^1|^2|^3", + "revolt/event-loop": "^1" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "amphp/process": "^2", + "phpunit/phpunit": "^9", + "psalm/phar": "5.22" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php", + "src/Internal/functions.php" + ], + "psr-4": { + "Amp\\Redis\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + } + ], + "description": "Efficient asynchronous communication with Redis servers, enabling scalable and responsive data storage and retrieval.", + "homepage": "https://amphp.org/redis", + "keywords": [ + "amp", + "amphp", + "async", + "client", + "redis", + "revolt" + ], + "support": { + "issues": "https://github.com/amphp/redis/issues", + "source": "https://github.com/amphp/redis/tree/v2.0.3" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-01-15T04:14:11+00:00" + }, + { + "name": "amphp/serialization", + "version": "v1.0.0", + "source": { + "type": "git", + "url": "https://github.com/amphp/serialization.git", + "reference": "693e77b2fb0b266c3c7d622317f881de44ae94a1" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/serialization/zipball/693e77b2fb0b266c3c7d622317f881de44ae94a1", + "reference": "693e77b2fb0b266c3c7d622317f881de44ae94a1", + "shasum": "" + }, + "require": { + "php": ">=7.1" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "dev-master", + "phpunit/phpunit": "^9 || ^8 || ^7" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "Amp\\Serialization\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Serialization tools for IPC and data storage in PHP.", + "homepage": "https://github.com/amphp/serialization", + "keywords": [ + "async", + "asynchronous", + "serialization", + "serialize" + ], + "support": { + "issues": "https://github.com/amphp/serialization/issues", + "source": "https://github.com/amphp/serialization/tree/master" + }, + "time": "2020-03-25T21:39:07+00:00" + }, + { + "name": "amphp/socket", + "version": "v2.3.1", + "source": { + "type": "git", + "url": "https://github.com/amphp/socket.git", + "reference": "58e0422221825b79681b72c50c47a930be7bf1e1" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/socket/zipball/58e0422221825b79681b72c50c47a930be7bf1e1", + "reference": "58e0422221825b79681b72c50c47a930be7bf1e1", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/byte-stream": "^2", + "amphp/dns": "^2", + "ext-openssl": "*", + "kelunik/certificate": "^1.1", + "league/uri": "^6.5 | ^7", + "league/uri-interfaces": "^2.3 | ^7", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "amphp/process": "^2", + "phpunit/phpunit": "^9", + "psalm/phar": "5.20" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php", + "src/Internal/functions.php", + "src/SocketAddress/functions.php" + ], + "psr-4": { + "Amp\\Socket\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Daniel Lowrey", + "email": "rdlowrey@gmail.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Non-blocking socket connection / server implementations based on Amp and Revolt.", + "homepage": "https://github.com/amphp/socket", + "keywords": [ + "amp", + "async", + "encryption", + "non-blocking", + "sockets", + "tcp", + "tls" + ], + "support": { + "issues": "https://github.com/amphp/socket/issues", + "source": "https://github.com/amphp/socket/tree/v2.3.1" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-04-21T14:33:03+00:00" + }, + { + "name": "amphp/sync", + "version": "v2.3.0", + "source": { + "type": "git", + "url": "https://github.com/amphp/sync.git", + "reference": "217097b785130d77cfcc58ff583cf26cd1770bf1" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/sync/zipball/217097b785130d77cfcc58ff583cf26cd1770bf1", + "reference": "217097b785130d77cfcc58ff583cf26cd1770bf1", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/pipeline": "^1", + "amphp/serialization": "^1", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "5.23" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "Amp\\Sync\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Stephen Coakley", + "email": "me@stephencoakley.com" + } + ], + "description": "Non-blocking synchronization primitives for PHP based on Amp and Revolt.", + "homepage": "https://github.com/amphp/sync", + "keywords": [ + "async", + "asynchronous", + "mutex", + "semaphore", + "synchronization" + ], + "support": { + "issues": "https://github.com/amphp/sync/issues", + "source": "https://github.com/amphp/sync/tree/v2.3.0" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-08-03T19:31:26+00:00" + }, + { + "name": "daverandom/libdns", + "version": "v2.1.0", + "source": { + "type": "git", + "url": "https://github.com/DaveRandom/LibDNS.git", + "reference": "b84c94e8fe6b7ee4aecfe121bfe3b6177d303c8a" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/DaveRandom/LibDNS/zipball/b84c94e8fe6b7ee4aecfe121bfe3b6177d303c8a", + "reference": "b84c94e8fe6b7ee4aecfe121bfe3b6177d303c8a", + "shasum": "" + }, + "require": { + "ext-ctype": "*", + "php": ">=7.1" + }, + "suggest": { + "ext-intl": "Required for IDN support" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "LibDNS\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "DNS protocol implementation written in pure PHP", + "keywords": [ + "dns" + ], + "support": { + "issues": "https://github.com/DaveRandom/LibDNS/issues", + "source": "https://github.com/DaveRandom/LibDNS/tree/v2.1.0" + }, + "time": "2024-04-12T12:12:48+00:00" + }, + { + "name": "kelunik/certificate", + "version": "v1.1.3", + "source": { + "type": "git", + "url": "https://github.com/kelunik/certificate.git", + "reference": "7e00d498c264d5eb4f78c69f41c8bd6719c0199e" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/kelunik/certificate/zipball/7e00d498c264d5eb4f78c69f41c8bd6719c0199e", + "reference": "7e00d498c264d5eb4f78c69f41c8bd6719c0199e", + "shasum": "" + }, + "require": { + "ext-openssl": "*", + "php": ">=7.0" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "phpunit/phpunit": "^6 | 7 | ^8 | ^9" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.x-dev" + } + }, + "autoload": { + "psr-4": { + "Kelunik\\Certificate\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Access certificate details and transform between different formats.", + "keywords": [ + "DER", + "certificate", + "certificates", + "openssl", + "pem", + "x509" + ], + "support": { + "issues": "https://github.com/kelunik/certificate/issues", + "source": "https://github.com/kelunik/certificate/tree/v1.1.3" + }, + "time": "2023-02-03T21:26:53+00:00" + }, + { + "name": "league/uri", + "version": "7.8.0", + "source": { + "type": "git", + "url": "https://github.com/thephpleague/uri.git", + "reference": "4436c6ec8d458e4244448b069cc572d088230b76" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/thephpleague/uri/zipball/4436c6ec8d458e4244448b069cc572d088230b76", + "reference": "4436c6ec8d458e4244448b069cc572d088230b76", + "shasum": "" + }, + "require": { + "league/uri-interfaces": "^7.8", + "php": "^8.1", + "psr/http-factory": "^1" + }, + "conflict": { + "league/uri-schemes": "^1.0" + }, + "suggest": { + "ext-bcmath": "to improve IPV4 host parsing", + "ext-dom": "to convert the URI into an HTML anchor tag", + "ext-fileinfo": "to create Data URI from file contennts", + "ext-gmp": "to improve IPV4 host parsing", + "ext-intl": "to handle IDN host with the best performance", + "ext-uri": "to use the PHP native URI class", + "jeremykendall/php-domain-parser": "to further parse the URI host and resolve its Public Suffix and Top Level Domain", + "league/uri-components": "to provide additional tools to manipulate URI objects components", + "league/uri-polyfill": "to backport the PHP URI extension for older versions of PHP", + "php-64bit": "to improve IPV4 host parsing", + "rowbot/url": "to handle URLs using the WHATWG URL Living Standard specification", + "symfony/polyfill-intl-idn": "to handle IDN host via the Symfony polyfill if ext-intl is not present" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "7.x-dev" + } + }, + "autoload": { + "psr-4": { + "League\\Uri\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Ignace Nyamagana Butera", + "email": "nyamsprod@gmail.com", + "homepage": "https://nyamsprod.com" + } + ], + "description": "URI manipulation library", + "homepage": "https://uri.thephpleague.com", + "keywords": [ + "URN", + "data-uri", + "file-uri", + "ftp", + "hostname", + "http", + "https", + "middleware", + "parse_str", + "parse_url", + "psr-7", + "query-string", + "querystring", + "rfc2141", + "rfc3986", + "rfc3987", + "rfc6570", + "rfc8141", + "uri", + "uri-template", + "url", + "ws" + ], + "support": { + "docs": "https://uri.thephpleague.com", + "forum": "https://thephpleague.slack.com", + "issues": "https://github.com/thephpleague/uri-src/issues", + "source": "https://github.com/thephpleague/uri/tree/7.8.0" + }, + "funding": [ + { + "url": "https://github.com/sponsors/nyamsprod", + "type": "github" + } + ], + "time": "2026-01-14T17:24:56+00:00" + }, + { + "name": "league/uri-interfaces", + "version": "7.8.0", + "source": { + "type": "git", + "url": "https://github.com/thephpleague/uri-interfaces.git", + "reference": "c5c5cd056110fc8afaba29fa6b72a43ced42acd4" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/thephpleague/uri-interfaces/zipball/c5c5cd056110fc8afaba29fa6b72a43ced42acd4", + "reference": "c5c5cd056110fc8afaba29fa6b72a43ced42acd4", + "shasum": "" + }, + "require": { + "ext-filter": "*", + "php": "^8.1", + "psr/http-message": "^1.1 || ^2.0" + }, + "suggest": { + "ext-bcmath": "to improve IPV4 host parsing", + "ext-gmp": "to improve IPV4 host parsing", + "ext-intl": "to handle IDN host with the best performance", + "php-64bit": "to improve IPV4 host parsing", + "rowbot/url": "to handle URLs using the WHATWG URL Living Standard specification", + "symfony/polyfill-intl-idn": "to handle IDN host via the Symfony polyfill if ext-intl is not present" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "7.x-dev" + } + }, + "autoload": { + "psr-4": { + "League\\Uri\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Ignace Nyamagana Butera", + "email": "nyamsprod@gmail.com", + "homepage": "https://nyamsprod.com" + } + ], + "description": "Common tools for parsing and resolving RFC3987/RFC3986 URI", + "homepage": "https://uri.thephpleague.com", + "keywords": [ + "data-uri", + "file-uri", + "ftp", + "hostname", + "http", + "https", + "parse_str", + "parse_url", + "psr-7", + "query-string", + "querystring", + "rfc3986", + "rfc3987", + "rfc6570", + "uri", + "url", + "ws" + ], + "support": { + "docs": "https://uri.thephpleague.com", + "forum": "https://thephpleague.slack.com", + "issues": "https://github.com/thephpleague/uri-src/issues", + "source": "https://github.com/thephpleague/uri-interfaces/tree/7.8.0" + }, + "funding": [ + { + "url": "https://github.com/sponsors/nyamsprod", + "type": "github" + } + ], + "time": "2026-01-15T06:54:53+00:00" + }, { "name": "myclabs/deep-copy", "version": "1.13.4", @@ -682,6 +1771,299 @@ ], "time": "2025-12-15T06:05:34+00:00" }, + { + "name": "predis/predis", + "version": "v3.3.0", + "source": { + "type": "git", + "url": "https://github.com/predis/predis.git", + "reference": "153097374b39a2f737fe700ebcd725642526cdec" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/predis/predis/zipball/153097374b39a2f737fe700ebcd725642526cdec", + "reference": "153097374b39a2f737fe700ebcd725642526cdec", + "shasum": "" + }, + "require": { + "php": "^7.2 || ^8.0", + "psr/http-message": "^1.0|^2.0" + }, + "require-dev": { + "friendsofphp/php-cs-fixer": "^3.3", + "phpstan/phpstan": "^1.9", + "phpunit/phpcov": "^6.0 || ^8.0", + "phpunit/phpunit": "^8.0 || ~9.4.4" + }, + "suggest": { + "ext-relay": "Faster connection with in-memory caching (>=0.6.2)" + }, + "type": "library", + "autoload": { + "psr-4": { + "Predis\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Till Krüss", + "homepage": "https://till.im", + "role": "Maintainer" + } + ], + "description": "A flexible and feature-complete Redis/Valkey client for PHP.", + "homepage": "http://github.com/predis/predis", + "keywords": [ + "nosql", + "predis", + "redis" + ], + "support": { + "issues": "https://github.com/predis/predis/issues", + "source": "https://github.com/predis/predis/tree/v3.3.0" + }, + "funding": [ + { + "url": "https://github.com/sponsors/tillkruss", + "type": "github" + } + ], + "time": "2025-11-24T17:48:50+00:00" + }, + { + "name": "psr/http-factory", + "version": "1.1.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-factory.git", + "reference": "2b4765fddfe3b508ac62f829e852b1501d3f6e8a" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-factory/zipball/2b4765fddfe3b508ac62f829e852b1501d3f6e8a", + "reference": "2b4765fddfe3b508ac62f829e852b1501d3f6e8a", + "shasum": "" + }, + "require": { + "php": ">=7.1", + "psr/http-message": "^1.0 || ^2.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "https://www.php-fig.org/" + } + ], + "description": "PSR-17: Common interfaces for PSR-7 HTTP message factories", + "keywords": [ + "factory", + "http", + "message", + "psr", + "psr-17", + "psr-7", + "request", + "response" + ], + "support": { + "source": "https://github.com/php-fig/http-factory" + }, + "time": "2024-04-15T12:06:14+00:00" + }, + { + "name": "psr/http-message", + "version": "2.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-message.git", + "reference": "402d35bcb92c70c026d1a6a9883f06b2ead23d71" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-message/zipball/402d35bcb92c70c026d1a6a9883f06b2ead23d71", + "reference": "402d35bcb92c70c026d1a6a9883f06b2ead23d71", + "shasum": "" + }, + "require": { + "php": "^7.2 || ^8.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "https://www.php-fig.org/" + } + ], + "description": "Common interface for HTTP messages", + "homepage": "https://github.com/php-fig/http-message", + "keywords": [ + "http", + "http-message", + "psr", + "psr-7", + "request", + "response" + ], + "support": { + "source": "https://github.com/php-fig/http-message/tree/2.0" + }, + "time": "2023-04-04T09:54:51+00:00" + }, + { + "name": "psr/log", + "version": "3.0.2", + "source": { + "type": "git", + "url": "https://github.com/php-fig/log.git", + "reference": "f16e1d5863e37f8d8c2a01719f5b34baa2b714d3" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/log/zipball/f16e1d5863e37f8d8c2a01719f5b34baa2b714d3", + "reference": "f16e1d5863e37f8d8c2a01719f5b34baa2b714d3", + "shasum": "" + }, + "require": { + "php": ">=8.0.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Log\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "https://www.php-fig.org/" + } + ], + "description": "Common interface for logging libraries", + "homepage": "https://github.com/php-fig/log", + "keywords": [ + "log", + "psr", + "psr-3" + ], + "support": { + "source": "https://github.com/php-fig/log/tree/3.0.2" + }, + "time": "2024-09-11T13:17:53+00:00" + }, + { + "name": "revolt/event-loop", + "version": "v1.0.8", + "source": { + "type": "git", + "url": "https://github.com/revoltphp/event-loop.git", + "reference": "b6fc06dce8e9b523c9946138fa5e62181934f91c" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/revoltphp/event-loop/zipball/b6fc06dce8e9b523c9946138fa5e62181934f91c", + "reference": "b6fc06dce8e9b523c9946138fa5e62181934f91c", + "shasum": "" + }, + "require": { + "php": ">=8.1" + }, + "require-dev": { + "ext-json": "*", + "jetbrains/phpstorm-stubs": "^2019.3", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.15" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-main": "1.x-dev" + } + }, + "autoload": { + "psr-4": { + "Revolt\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Cees-Jan Kiewiet", + "email": "ceesjank@gmail.com" + }, + { + "name": "Christian Lück", + "email": "christian@clue.engineering" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Rock-solid event loop for concurrent PHP applications.", + "keywords": [ + "async", + "asynchronous", + "concurrency", + "event", + "event-loop", + "non-blocking", + "scheduler" + ], + "support": { + "issues": "https://github.com/revoltphp/event-loop/issues", + "source": "https://github.com/revoltphp/event-loop/tree/v1.0.8" + }, + "time": "2025-08-27T21:33:23+00:00" + }, { "name": "sebastian/cli-parser", "version": "4.2.0", @@ -1688,8 +3070,10 @@ "prefer-stable": false, "prefer-lowest": false, "platform": { - "php": ">=8.2" + "php": ">=8.3" + }, + "platform-dev": { + "ext-redis": "*" }, - "platform-dev": {}, "plugin-api-version": "2.9.0" } diff --git a/docker-compose.yml b/docker-compose.yml index 0e912c8..ebce565 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,6 @@ services: REDIS_HOST: redis redis: - image: redis:7-alpine + image: ${REDIS_IMAGE:-redis:7-alpine} ports: - - "6379:6379" \ No newline at end of file + - "6379:6379" diff --git a/src/FiberTaskExecutor.php b/src/FiberTaskExecutor.php new file mode 100644 index 0000000..1912787 --- /dev/null +++ b/src/FiberTaskExecutor.php @@ -0,0 +1,66 @@ +timeProvider->getCurrentTimeNanoseconds(); + + $fiber = new Fiber($task); + $fiber->start(); + + while (!$fiber->isTerminated()) { + if ($ttlNanoseconds < ($this->timeProvider->getCurrentTimeNanoseconds() - $start)) { + throw new RuntimeException('Unable to get result under TTL'); + } + + if (Fiber::getCurrent() !== null) { + Fiber::suspend(); + } elseif ($minSuspensionDelayNs > 0) { + /** @var positive-int $delay */ + $delay = $minSuspensionDelayNs / 1000; + usleep($delay); + } + + if (!$fiber->isSuspended()) { + throw new RuntimeException('Fiber error, fiber is not suspended nor terminated'); + } + + $fiber->resume(); + } + + if ($ttlNanoseconds < ($this->timeProvider->getCurrentTimeNanoseconds() - $start)) { + throw new RuntimeException('Unable to get result under TTL'); + } + + /** @var T */ + return $fiber->getReturn(); + } +} diff --git a/src/Locksmith.php b/src/Locksmith.php index a018fb4..a01144d 100644 --- a/src/Locksmith.php +++ b/src/Locksmith.php @@ -5,52 +5,82 @@ namespace MiMatus\Locksmith; use Closure; -use Fiber; use MiMatus\Locksmith\Semaphore\TimeProvider; use Random\Engine; use Random\Engine\Xoshiro256StarStar; +use Revolt\EventLoop; use RuntimeException; use Throwable; readonly class Locksmith { + private TaskExecutorInterface $taskExecutor; + public function __construct( - private Semaphore $semaphore, + private SemaphoreInterface $semaphore, private TimeProvider $timeProvider = new TimeProvider(), private Engine $randomEngine = new Xoshiro256StarStar(), - ) {} + ?TaskExecutorInterface $taskExecutor = null, + ) { + $this->taskExecutor = + $taskExecutor + ?? ( + class_exists(EventLoop::class) + ? new RevoltTaskExecutor($timeProvider) + : new FiberTaskExecutor($timeProvider) + ); + } /** * @template T * @param non-negative-int $maxLockWaitNs * @param non-negative-int $minSuspensionDelayNs + * @param non-negative-int $lockTTLNs * @throws Throwable * @return Closure(Closure(): void): T */ - public function locked(Resource $resource, int $maxLockWaitNs, int $minSuspensionDelayNs): Closure - { - return function (Closure $callback) use ($resource, $maxLockWaitNs, $minSuspensionDelayNs): mixed { + public function locked( + ResourceInterface $resource, + int $lockTTLNs, + int $maxLockWaitNs, + int $minSuspensionDelayNs, + ): Closure { + return function (Closure $callback) use ($resource, $lockTTLNs, $maxLockWaitNs, $minSuspensionDelayNs): mixed { $token = bin2hex($this->randomEngine->generate()); - $this->getResultUnderTTL( - new Fiber(function () use ($token, $resource): void { - $this->semaphore->lock($resource, $token, Fiber::suspend(...)); - }), + $startTimeNs = $this->timeProvider->getCurrentTimeNanoseconds(); + + $suspender = function () use ($startTimeNs, $lockTTLNs, $minSuspensionDelayNs) { + $remainingLockTTLNs = (int) ( + $lockTTLNs + - ($this->timeProvider->getCurrentTimeNanoseconds() - $startTimeNs) + ); + if ($remainingLockTTLNs <= 0) { + throw new RuntimeException('Unable to get result under TTL'); + } + /** @var non-negative-int $remainingLockTTLNs */ + + $this->taskExecutor->getResultUnderTTL(static fn() => null, $remainingLockTTLNs, $minSuspensionDelayNs); + }; + $this->taskExecutor->getResultUnderTTL( + function () use ($token, $resource, $lockTTLNs, $suspender): void { + $this->semaphore->lock($resource, $token, $lockTTLNs, $suspender); + }, $maxLockWaitNs, $minSuspensionDelayNs, ); try { /** @var T */ - return $this->getResultUnderTTL( - new Fiber(function () use ($callback, $resource): mixed { + return $this->taskExecutor->getResultUnderTTL( + function () use ($callback, $resource, $suspender): mixed { if (!$this->semaphore->isLocked($resource)) { throw new RuntimeException('Lock has been lost during process'); } - return $callback(Fiber::suspend(...)); - }), - $resource->ttlNanoseconds, + return $callback($suspender); + }, + $lockTTLNs, $minSuspensionDelayNs, ); } finally { @@ -58,41 +88,4 @@ public function locked(Resource $resource, int $maxLockWaitNs, int $minSuspensio } }; } - - /** - * @template T - * @throws Throwable - * @param Fiber $fiber - * @param non-negative-int $ttlNanoseconds - * @return T - */ - private function getResultUnderTTL(Fiber $fiber, int $ttlNanoseconds, int $minSuspensionDelayNs): mixed - { - $start = $this->timeProvider->getCurrentTimeNanoseconds(); - - $fiber->start(); - - while (!$fiber->isTerminated()) { - if ($ttlNanoseconds < ($this->timeProvider->getCurrentTimeNanoseconds() - $start)) { - throw new RuntimeException('Unable to get result under TTL'); - } - - if (Fiber::getCurrent() !== null) { - Fiber::suspend(); - } elseif ($minSuspensionDelayNs > 0) { - /** @var positive-int $delay */ - $delay = $minSuspensionDelayNs / 1000; - usleep($delay); - } - - if (!$fiber->isSuspended()) { - throw new RuntimeException('Fiber error, fiber is not suspended nor terminated'); - } - - $fiber->resume(); - } - - /** @var T */ - return $fiber->getReturn(); - } } diff --git a/src/Resource.php b/src/Resource.php index a8e8e52..45592d8 100644 --- a/src/Resource.php +++ b/src/Resource.php @@ -4,14 +4,22 @@ namespace MiMatus\Locksmith; -readonly class Resource +readonly class Resource implements ResourceInterface { - /** - * @param non-negative-int $ttlNanoseconds - */ public function __construct( - public int $ttlNanoseconds, public string $namespace = '', public int $version = 1, ) {} + + #[\Override] + public function getNamespace(): string + { + return $this->namespace; + } + + #[\Override] + public function getVersion(): int + { + return $this->version; + } } diff --git a/src/ResourceInterface.php b/src/ResourceInterface.php new file mode 100644 index 0000000..c3e82bf --- /dev/null +++ b/src/ResourceInterface.php @@ -0,0 +1,12 @@ +timeProvider->getCurrentTimeNanoseconds(); + + $deferId = EventLoop::delay($minSuspensionDelayNs / 1_000_000, function () use ( + $task, + $suspension, + $startTime, + $ttlNanoseconds, + ): void { + try { + $result = $task(); + } catch (Throwable $e) { + $suspension->throw($e); + return; + } + + // Check if TTL has been exceeded before resuming the fiber - there might have been a blocking operation in the task that caused us to exceed the TTL + if (($this->timeProvider->getCurrentTimeNanoseconds() - $startTime) >= $ttlNanoseconds) { + $suspension->throw(new RuntimeException('Unable to get result under TTL')); + return; + } + $suspension->resume($result); + }); + + EventLoop::delay($ttlNanoseconds / 1_000_000, static function () use ($deferId, $suspension) { + EventLoop::cancel($deferId); + + $suspension->throw(new RuntimeException('Unable to get result under TTL')); + }); + + /** @var T */ + return $suspension->suspend(); + } +} diff --git a/src/Semaphore.php b/src/Semaphore.php deleted file mode 100644 index 60bf5ab..0000000 --- a/src/Semaphore.php +++ /dev/null @@ -1,16 +0,0 @@ -quorum > count($this->semaphores)) { + throw new LogicException('Acquire quorum cannot be greater than number of semaphores'); + } + } + + /** + * @param Closure(): void $suspension + * @throws GroupedException + * @throws RuntimeException + */ + #[\Override] + public function lock( + ResourceInterface $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ): void { + $successfulLocks = 0; + $startTime = $this->timeProvider->getCurrentTimeNanoseconds(); + $exceptions = []; + $semaphores = clone $this->semaphores; + + do { + $semaphore = $semaphores->getRandom(); + $lockTTLNs -= $this->timeProvider->getCurrentTimeNanoseconds() - $startTime; + if ($lockTTLNs <= 0 || $semaphore === null) { + break; + } + + try { + $semaphore->lock($resource, $token, (int) $lockTTLNs, $suspension); + $successfulLocks++; + $semaphores = $semaphores->without($semaphore); + } catch (Throwable $e) { + $exceptions[] = $e; + } + + if ($successfulLocks >= $this->quorum) { + return; + } + + $suspension(); + } while ($lockTTLNs > 0); + + // Rollback successful locks + try { + $this->unlock($resource, $token); + } catch (Throwable $e) { + $exceptions[] = $e; + } + + throw new GroupedException('Failed to acquire lock quorum', $exceptions); + } + + /** + * @throws GroupedException + */ + #[\Override] + public function unlock(ResourceInterface $resource, #[\SensitiveParameter] string $token): void + { + $successfulUnlocks = 0; + $exceptions = []; + + foreach ($this->semaphores as $semaphore) { + /** @var SemaphoreInterface $semaphore */ + try { + $semaphore->unlock($resource, $token); + $successfulUnlocks++; + } catch (Throwable $e) { + $exceptions[] = $e; + } + } + + if ($successfulUnlocks < $this->quorum) { + throw new GroupedException('Failed to release lock quorum', $exceptions); + } + } + + /** + * @throws GroupedException + */ + #[\Override] + public function isLocked(ResourceInterface $resource): bool + { + $lockedCount = 0; + $exceptions = []; + + foreach ($this->semaphores as $semaphore) { + /** @var SemaphoreInterface $semaphore */ + try { + if ($semaphore->isLocked($resource)) { + $lockedCount++; + } + } catch (Throwable $e) { + $exceptions[] = $e; + } + + if ($lockedCount >= $this->quorum) { + return true; + } + } + + if (count($exceptions) >= $this->quorum) { + throw new GroupedException('Failed to determine lock status quorum', $exceptions); + } + + return false; + } +} diff --git a/src/Semaphore/GroupedException.php b/src/Semaphore/GroupedException.php new file mode 100644 index 0000000..48ec332 --- /dev/null +++ b/src/Semaphore/GroupedException.php @@ -0,0 +1,21 @@ + $exceptions + */ + public function __construct( + string $message, + public array $exceptions, + ) { + parent::__construct($message); + } +} diff --git a/src/Semaphore/InMemorySemaphore.php b/src/Semaphore/InMemorySemaphore.php index a67f69a..d99e6dd 100644 --- a/src/Semaphore/InMemorySemaphore.php +++ b/src/Semaphore/InMemorySemaphore.php @@ -6,11 +6,11 @@ use Closure; use MiMatus\Locksmith\Lock; -use MiMatus\Locksmith\Resource; -use MiMatus\Locksmith\Semaphore; +use MiMatus\Locksmith\ResourceInterface; +use MiMatus\Locksmith\SemaphoreInterface; use RuntimeException; -class InMemorySemaphore implements Semaphore +class InMemorySemaphore implements SemaphoreInterface { /** * @var array}> @@ -30,18 +30,22 @@ public function __construct( * @throws RuntimeException */ #[\Override] - public function lock(Resource $resource, #[\SensitiveParameter] string $token, Closure $suspension): void - { + public function lock( + ResourceInterface $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ): void { if ( - isset($this->locks[$resource->namespace]['version']) - && $this->locks[$resource->namespace]['version'] > $resource->version + isset($this->locks[$resource->getNamespace()]['version']) + && $this->locks[$resource->getNamespace()]['version'] > $resource->getVersion() ) { throw new RuntimeException('Lock version mismatch'); } do { $currentTime = $this->timeProvider->getCurrentTimeNanoseconds(); - $storedResource = $this->locks[$resource->namespace] ?? null; + $storedResource = $this->locks[$resource->getNamespace()] ?? null; if ($storedResource === null) { break; } @@ -55,31 +59,30 @@ public function lock(Resource $resource, #[\SensitiveParameter] string $token, C } } - $higherVersion = $resource->version > $storedResource['version']; + $higherVersion = $resource->getVersion() > $storedResource['version']; $maxLocksReached = $activeLocks >= $this->maxConcurrentLocks; - if ($maxLocksReached || $higherVersion) { - $suspension(); - } else { + if (!$maxLocksReached && !$higherVersion) { break; } + $suspension(); } while (true); - $resourceExpiration = $currentTime + $resource->ttlNanoseconds; - $this->locks[$resource->namespace]['expirations'][$token] = $resourceExpiration; - $this->locks[$resource->namespace]['version'] = $resource->version; + $resourceExpiration = $currentTime + $lockTTLNs; + $this->locks[$resource->getNamespace()]['expirations'][$token] = $resourceExpiration; + $this->locks[$resource->getNamespace()]['version'] = $resource->getVersion(); } #[\Override] - public function unlock(Resource $resource, #[\SensitiveParameter] string $token): void + public function unlock(ResourceInterface $resource, #[\SensitiveParameter] string $token): void { - if (!isset($this->locks[$resource->namespace])) { + if (!isset($this->locks[$resource->getNamespace()])) { return; } - unset($this->locks[$resource->namespace]['expirations'][$token]); - if ($this->locks[$resource->namespace]['expirations'] === []) { - unset($this->locks[$resource->namespace]); + unset($this->locks[$resource->getNamespace()]['expirations'][$token]); + if ($this->locks[$resource->getNamespace()]['expirations'] === []) { + unset($this->locks[$resource->getNamespace()]); } } @@ -87,13 +90,13 @@ public function unlock(Resource $resource, #[\SensitiveParameter] string $token) * @throws RuntimeException */ #[\Override] - public function isLocked(Resource $resource): bool + public function isLocked(ResourceInterface $resource): bool { - if (!isset($this->locks[$resource->namespace])) { + if (!isset($this->locks[$resource->getNamespace()])) { return false; } - foreach ($this->locks[$resource->namespace]['expirations'] as $token => $expiration) { + foreach ($this->locks[$resource->getNamespace()]['expirations'] as $token => $expiration) { if ($expiration > $this->timeProvider->getCurrentTimeNanoseconds()) { return true; } diff --git a/src/Semaphore/Redis/AmPhpRedisClient.php b/src/Semaphore/Redis/AmPhpRedisClient.php new file mode 100644 index 0000000..d7ebef8 --- /dev/null +++ b/src/Semaphore/Redis/AmPhpRedisClient.php @@ -0,0 +1,43 @@ + $keys + * @param list $args + * @throws RuntimeException + */ + #[\Override] + public function eval(string $script, array $keys = [], array $args = []): mixed + { + try { + return $this->redis->eval($script, $keys, $args); + } catch (\RedisException $e) { + throw new RuntimeException('Redis eval failed: ' . $e->getMessage(), 0, $e); + } + } + + /** + * @throws RuntimeException + */ + #[\Override] + public function exists(string $key): bool + { + try { + return $this->redis->has($key); + } catch (\RedisException $e) { + throw new RuntimeException('Redis exists check failed: ' . $e->getMessage(), 0, $e); + } + } +} diff --git a/src/Semaphore/Redis/PhpRedisClient.php b/src/Semaphore/Redis/PhpRedisClient.php new file mode 100644 index 0000000..e010cc4 --- /dev/null +++ b/src/Semaphore/Redis/PhpRedisClient.php @@ -0,0 +1,53 @@ + $keys + * @param list $args + * @throws RuntimeException + */ + #[\Override] + public function eval(string $script, array $keys = [], array $args = []): mixed + { + try { + /** @var mixed */ + $result = $this->redis->eval($script, [...$keys, ...$args], count($keys)); // @mago-ignore analysis:invalid-method-access RedisCluster + } catch (\RedisException $e) { + throw new RuntimeException('Redis eval failed: ' . $e->getMessage(), 0, $e); + } + + if ($result === false) { + /** @var string */ + $errorMessage = $this->redis->getLastError() ?? 'Unknown error'; // @mago-ignore analysis:invalid-method-access RedisCluster + throw new RuntimeException('Redis eval failed: ' . $errorMessage); + } + return $result; + } + + /** + * @throws RuntimeException + */ + #[\Override] + public function exists(string $key): bool + { + try { + return (bool) $this->redis->exists($key); // @mago-ignore analysis:invalid-method-access RedisCluster + } catch (\RedisException $e) { + throw new RuntimeException('Redis exists check failed: ' . $e->getMessage(), 0, $e); + } + } +} diff --git a/src/Semaphore/Redis/PredisRedisClient.php b/src/Semaphore/Redis/PredisRedisClient.php new file mode 100644 index 0000000..4530aca --- /dev/null +++ b/src/Semaphore/Redis/PredisRedisClient.php @@ -0,0 +1,52 @@ + $keys + * @param list $args + * @throws RuntimeException + */ + #[\Override] + public function eval(string $script, array $keys = [], array $args = []): mixed + { + try { + /** @var mixed */ + $response = $this->redis->eval($script, count($keys), ...[...$keys, ...$args]); + } catch (PredisException $e) { + throw new RuntimeException('Redis eval failed: ' . $e->getMessage(), 0, $e); + } + + if ($response instanceof ErrorResponseInterface && !$this->redis->getOptions()->exceptions) { + throw new RuntimeException($response->getMessage()); + } + + return $response; + } + + /** + * @throws RuntimeException + */ + #[\Override] + public function exists(string $key): bool + { + try { + return $this->redis->exists($key) > 0; + } catch (PredisException $e) { + throw new RuntimeException('Redis exists check failed: ' . $e->getMessage(), 0, $e); + } + } +} diff --git a/src/Semaphore/Redis/RedisClientInterface.php b/src/Semaphore/Redis/RedisClientInterface.php new file mode 100644 index 0000000..145c915 --- /dev/null +++ b/src/Semaphore/Redis/RedisClientInterface.php @@ -0,0 +1,22 @@ + $keys + * @param list $args + * @throws RuntimeException + */ + public function eval(string $script, array $keys = [], array $args = []): mixed; + + /** + * @throws RuntimeException + */ + public function exists(string $key): bool; +} diff --git a/src/Semaphore/RedisSemaphore.php b/src/Semaphore/Redis/RedisSemaphore.php similarity index 73% rename from src/Semaphore/RedisSemaphore.php rename to src/Semaphore/Redis/RedisSemaphore.php index ccd2588..1c2c015 100644 --- a/src/Semaphore/RedisSemaphore.php +++ b/src/Semaphore/Redis/RedisSemaphore.php @@ -2,23 +2,26 @@ declare(strict_types=1); -namespace MiMatus\Locksmith\Semaphore; +namespace MiMatus\Locksmith\Semaphore\Redis; use Closure; -use MiMatus\Locksmith\Resource; -use MiMatus\Locksmith\Semaphore; +use MiMatus\Locksmith\ResourceInterface; +use MiMatus\Locksmith\Semaphore\Redis\RedisClientInterface; +use MiMatus\Locksmith\SemaphoreInterface; use RedisException; use RuntimeException; +use Throwable; -readonly class RedisSemaphore implements Semaphore +readonly class RedisSemaphore implements SemaphoreInterface { + private const string RedisKeyPrefix = 'locksmith:semaphore:'; + /** * @param positive-int $maxConcurrentLocks */ public function __construct( - public \Redis $redisClient, + public RedisClientInterface $redisClient, private int $maxConcurrentLocks = 1, - private string $keyPrefix = '', ) {} /** @@ -26,8 +29,12 @@ public function __construct( * @throws RuntimeException */ #[\Override] - public function lock(Resource $resource, #[\SensitiveParameter] string $token, Closure $suspension): void - { + public function lock( + ResourceInterface $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ): void { $luaScript = <<ttlNanoseconds / 1_000_000); + $milisecondsTTL = (int) ($lockTTLNs / 1_000_000); do { try { /** @var bool|int */ - $result = $this->redisClient->eval( + $this->redisClient->eval( $luaScript, [ - $this->keyPrefix . $resource->namespace, + self::RedisKeyPrefix . $resource->getNamespace(), + ], + [ $token, - $milisecondsTTL, - $resource->version, - $this->maxConcurrentLocks, + (string) $milisecondsTTL, + (string) $resource->getVersion(), + (string) $this->maxConcurrentLocks, ], - 1, ); - if ($result !== false) { - break; - } - - $errorMessage = $this->redisClient->getLastError() ?? 'Unknown error'; + break; + } catch (Throwable $e) { + $errorMessage = $e->getMessage(); if (str_contains($errorMessage, 'MiMatus_VERSION_MISMATCH')) { throw new RuntimeException('Lock version mismatch'); } @@ -118,8 +124,6 @@ public function lock(Resource $resource, #[\SensitiveParameter] string $token, C continue; } - throw new RuntimeException(message: 'Redis Error: ' . $errorMessage); - } catch (RedisException $e) { throw new RuntimeException(message: 'Redis Error: ' . $e->getMessage(), previous: $e); } } while (true); @@ -129,7 +133,7 @@ public function lock(Resource $resource, #[\SensitiveParameter] string $token, C * @throws RuntimeException */ #[\Override] - public function unlock(Resource $resource, #[\SensitiveParameter] string $token): void + public function unlock(ResourceInterface $resource, #[\SensitiveParameter] string $token): void { $luaScript = <<redisClient->eval($luaScript, [$this->keyPrefix . $resource->namespace, $token], 1); - if ($result === false) { - throw new RuntimeException('Redis Error: ' . ($this->redisClient->getLastError() ?? 'Unknown error')); - } + $this->redisClient->eval($luaScript, [self::RedisKeyPrefix . $resource->getNamespace()], [$token]); } catch (RedisException $e) { throw new RuntimeException(message: 'Redis Error: ' . $e->getMessage(), previous: $e); } @@ -169,17 +169,12 @@ public function unlock(Resource $resource, #[\SensitiveParameter] string $token) * @throws RuntimeException */ #[\Override] - public function isLocked(Resource $resource): bool + public function isLocked(ResourceInterface $resource): bool { try { - /** @var bool|int */ - $exists = $this->redisClient->exists($resource->namespace); + return $this->redisClient->exists(self::RedisKeyPrefix . $resource->getNamespace()); } catch (RedisException $e) { throw new RuntimeException(message: 'Redis Error: ' . $e->getMessage(), previous: $e); } - if ($exists === false) { - throw new RuntimeException('Redis Error: ' . ($this->redisClient->getLastError() ?? 'Unknown error')); - } - return $exists === 1; } } diff --git a/src/Semaphore/SemaphoreCollection.php b/src/Semaphore/SemaphoreCollection.php new file mode 100644 index 0000000..2abe28d --- /dev/null +++ b/src/Semaphore/SemaphoreCollection.php @@ -0,0 +1,63 @@ + + */ +class SemaphoreCollection implements SemaphoreCollectionInterface +{ + /** + * @param list $semaphores + */ + public function __construct( + private array $semaphores, + private Randomizer $randomizer = new Randomizer(new Xoshiro256StarStar()), + ) {} + + #[\Override] + public function count(): int + { + return count($this->semaphores); + } + + #[\Override] + public function without(SemaphoreInterface $semaphore): static + { + $newSemaphores = array_filter($this->semaphores, static fn(SemaphoreInterface $s) => $s !== $semaphore); + + return new self(array_values($newSemaphores), $this->randomizer); + } + + /** + * @return ?T + */ + #[\Override] + public function getRandom(): ?SemaphoreInterface + { + if ($this->semaphores === []) { + return null; + } + + /** @var int */ + $key = $this->randomizer->pickArrayKeys($this->semaphores, 1)[0]; + return $this->semaphores[$key]; + } + + /** + * @return \Traversable + */ + #[\Override] + public function getIterator(): \Traversable + { + return new ArrayIterator($this->semaphores); + } +} diff --git a/src/Semaphore/SemaphoreCollectionInterface.php b/src/Semaphore/SemaphoreCollectionInterface.php new file mode 100644 index 0000000..afcb529 --- /dev/null +++ b/src/Semaphore/SemaphoreCollectionInterface.php @@ -0,0 +1,29 @@ + + */ + #[\Override] + public function getIterator(): Traversable; +} diff --git a/src/SemaphoreInterface.php b/src/SemaphoreInterface.php new file mode 100644 index 0000000..3b948d2 --- /dev/null +++ b/src/SemaphoreInterface.php @@ -0,0 +1,21 @@ +redis = createRedisClient('tcp://redis:6379'); + } + + protected function tearDown(): void + { + parent::tearDown(); + $this->redis->flushAll(); + } + + protected function advanceTime(int $nanoseconds): void + { + usleep((int) $nanoseconds / 1000); + parent::advanceTime($nanoseconds); + } + + protected function createSemaphore(TimeProvider $timeProvider, $maxConcurrentLocks = 1): SemaphoreInterface + { + $rediClient = new AmPhpRedisClient($this->redis); + return new RedisSemaphore(redisClient: $rediClient, maxConcurrentLocks: $maxConcurrentLocks); + } + + public function testLockingOccupiedKey(): void + { + $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); + + $this->redis->set('test-lock-key', 'occupied'); + + $resource = new Resource(namespace: 'test-lock-key'); + $semaphore->lock( + resource: $resource, + token: 'test-lock-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 5_000_000_000, + suspension: static function (): void { + self::fail('Lock should not have been acquired, suspension should not be called'); + }, + ); + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $this->redis->delete('test-lock-key'); // Deleting same key does not affect lock + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $semaphore->unlock(resource: $resource, token: 'test-lock-token'); // @mago-ignore lint:no-literal-password + + self::assertFalse($semaphore->isLocked(resource: $resource), 'Resource should be unlocked'); + } +} diff --git a/tests/Integration/AmpPhp/LocksmithTest.php b/tests/Integration/AmpPhp/LocksmithTest.php new file mode 100644 index 0000000..92e731f --- /dev/null +++ b/tests/Integration/AmpPhp/LocksmithTest.php @@ -0,0 +1,78 @@ +semaphore = new RedisSemaphore( + redisClient: new \MiMatus\Locksmith\Semaphore\Redis\AmPhpRedisClient(redis: createRedisClient( + 'tcp://redis:6379', + )), + ); + + $this->timeProvider = new TimeProvider(); + } + + public function testConcurrentTask(): void + { + $sharedCounter = 0; + + $task = static function (Closure $suspension) use (&$sharedCounter) { + $suspension(); // Suspend! This allows the other fiber to run. + $sharedCounter += 1; + }; + + $task2 = static function (Closure $suspension) use (&$sharedCounter) { + self::assertSame(0, $sharedCounter, 'Task 2 should not see the incremented counter value'); + $suspension(); // Suspend! This allows the other fiber to run. + $sharedCounter += 1; + }; + + $locksmith = new Locksmith(semaphore: $this->semaphore, timeProvider: $this->timeProvider); + + $locked = $locksmith->locked( + resource: new Resource(namespace: 'test-lock key'), + lockTTLNs: 5_000_000_000, + maxLockWaitNs: 1_000_000_000, + minSuspensionDelayNs: 10_000, + ); + $locked2 = $locksmith->locked( + resource: new Resource(namespace: 'test-lock key2'), + lockTTLNs: 5_000_000_000, + maxLockWaitNs: 1_000_000_000, + minSuspensionDelayNs: 10_000, + ); + + $future1 = async(static fn() => $locked($task)); + $future2 = async(static fn() => $locked2($task2)); + + await([$future2, $future1]); + + self::assertSame(2, $sharedCounter); + } +} diff --git a/tests/Integration/PhpRedis/PhpRedisSemaphoreTest.php b/tests/Integration/PhpRedis/PhpRedisSemaphoreTest.php new file mode 100644 index 0000000..a15c749 --- /dev/null +++ b/tests/Integration/PhpRedis/PhpRedisSemaphoreTest.php @@ -0,0 +1,77 @@ +redis = new Redis(); + $this->redis->connect('redis'); + } + + protected function tearDown(): void + { + parent::tearDown(); + $this->redis->flushAll(); + $this->redis->close(); + } + + protected function advanceTime(int $nanoseconds): void + { + usleep((int) $nanoseconds / 1000); + parent::advanceTime($nanoseconds); + } + + protected function createSemaphore(TimeProvider $timeProvider, $maxConcurrentLocks = 1): SemaphoreInterface + { + $rediClient = new PhpRedisClient($this->redis); + return new RedisSemaphore(redisClient: $rediClient, maxConcurrentLocks: $maxConcurrentLocks); + } + + public function testLockingOccupiedKey(): void + { + $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); + + $this->redis->set('test-lock-key', 'occupied'); + + $resource = new Resource(namespace: 'test-lock-key'); + $semaphore->lock( + resource: $resource, + token: 'test-lock-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 5_000_000_000, + suspension: static function (): void { + self::fail('Lock should not have been acquired, suspension should not be called'); + }, + ); + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $this->redis->del('test-lock-key'); // Deleting same key does not affect lock + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $semaphore->unlock(resource: $resource, token: 'test-lock-token'); // @mago-ignore lint:no-literal-password + + self::assertFalse($semaphore->isLocked(resource: $resource), 'Resource should be unlocked'); + } +} diff --git a/tests/Integration/Predis/PredisSemaphoreTest.php b/tests/Integration/Predis/PredisSemaphoreTest.php new file mode 100644 index 0000000..8c73ab0 --- /dev/null +++ b/tests/Integration/Predis/PredisSemaphoreTest.php @@ -0,0 +1,76 @@ +redis = new Client('tcp://redis:6379'); + } + + protected function tearDown(): void + { + parent::tearDown(); + $this->redis->flushAll(); + $this->redis->disconnect(); + } + + protected function advanceTime(int $nanoseconds): void + { + usleep((int) $nanoseconds / 1000); + parent::advanceTime($nanoseconds); + } + + protected function createSemaphore(TimeProvider $timeProvider, $maxConcurrentLocks = 1): SemaphoreInterface + { + $rediClient = new PredisRedisClient($this->redis); + return new RedisSemaphore(redisClient: $rediClient, maxConcurrentLocks: $maxConcurrentLocks); + } + + public function testLockingOccupiedKey(): void + { + $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); + + $this->redis->set('test-lock-key', 'occupied'); + + $resource = new Resource(namespace: 'test-lock-key'); + $semaphore->lock( + resource: $resource, + token: 'test-lock-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 5_000_000_000, + suspension: static function (): void { + self::fail('Lock should not have been acquired, suspension should not be called'); + }, + ); + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $this->redis->del('test-lock-key'); // Deleting same key does not affect lock + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $semaphore->unlock(resource: $resource, token: 'test-lock-token'); // @mago-ignore lint:no-literal-password + + self::assertFalse($semaphore->isLocked(resource: $resource), 'Resource should be unlocked'); + } +} diff --git a/tests/Integration/Revolt/LocksmithTest.php b/tests/Integration/Revolt/LocksmithTest.php new file mode 100644 index 0000000..79c28f5 --- /dev/null +++ b/tests/Integration/Revolt/LocksmithTest.php @@ -0,0 +1,18 @@ +semaphore = $this->createMock(SemaphoreInterface::class); + $this->timeProvider = $this->createMock(TimeProvider::class); + $this->randomEngine = $this->createMock(Engine::class); + $this->taskExecutor = $this->createTaskExecutor($this->timeProvider); + } + + abstract protected function createTaskExecutor(TimeProvider $timeProvider): TaskExecutorInterface; + + public function testUnableToAcquireLockTimeout(): void + { + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->atLeastOnce()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + $currentTime += 500_000_001; + $suspension(); + }); + + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: new Resource(namespace: 'test-resource', version: 1), + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $this->expectExceptionObject(new RuntimeException('Unable to get result under TTL')); + $locked(static function (): void { + self::fail('Lock should not be acquired'); + }); + } + + public function testUnableToAcquireLock(): void + { + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->atLeastOnce()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + throw new RuntimeException('error'); + }); + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: new Resource(namespace: 'test-resource', version: 1), + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $this->expectExceptionObject(new RuntimeException('error')); + $locked(static function (): void { + self::fail('Lock should not be acquired'); + }); + } + + public function testLostDuringExecution(): void + { + $resource = new Resource(namespace: 'test-resource', version: 1); + + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->once()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + // Lock acquired immediately + }); + + $this->semaphore + ->expects($this->once()) + ->method('isLocked') + ->with(self::equalTo($resource)) + ->willReturn(false); + + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: $resource, + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $this->expectExceptionObject(new RuntimeException('Lock has been lost during process')); + $locked(static function (Closure $suspension): void { + $suspension(); // Simulate some processing and force lock check + }); + } + + public function testUnableToUnlock(): void + { + $resource = new Resource(namespace: 'test-resource', version: 1); + + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->once()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + // Lock acquired immediately + }); + + $this->semaphore + ->expects($this->once()) + ->method('unlock') + ->willReturnCallback(static function (Resource $resource, #[\SensitiveParameter] string $token) use ( + &$currentTime, + ): void { + throw new RuntimeException('error during unlock'); + }); + + $this->semaphore + ->expects($this->once()) + ->method('isLocked') + ->with(self::equalTo($resource)) + ->willReturn(false); + + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: $resource, + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $this->expectExceptionObject(new RuntimeException('error during unlock')); + $locked(static function (Closure $suspension): void { + // Simulate some processing + }); + } + + public function testLocked(): void + { + $resource = new Resource(namespace: 'test-resource', version: 1); + + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->once()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + // Lock acquired immediately + }); + + $this->semaphore + ->expects($this->once()) + ->method('unlock') + ->willReturnCallback(static function (Resource $resource, #[\SensitiveParameter] string $token) use ( + &$currentTime, + ): void { + // Unlock successful + }); + + $this->semaphore + ->expects($this->once()) + ->method('isLocked') + ->with(self::equalTo($resource)) + ->willReturn(true); + + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: $resource, + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $called = false; + $locked(static function (Closure $suspension) use (&$called): void { + // Simulate some processing + $called = true; + }); + + self::assertTrue($called, 'Locked callback executed'); + } +} diff --git a/tests/Unit/SemaphoreTestCase.php b/tests/SemaphoreTestCase.php similarity index 50% rename from tests/Unit/SemaphoreTestCase.php rename to tests/SemaphoreTestCase.php index cf87d1a..af94e3e 100644 --- a/tests/Unit/SemaphoreTestCase.php +++ b/tests/SemaphoreTestCase.php @@ -2,14 +2,12 @@ declare(strict_types=1); -namespace MiMatus\Locksmith\Tests\Unit; +namespace MiMatus\Locksmith\Tests; use Exception; -use Fiber; use MiMatus\Locksmith\Resource; -use MiMatus\Locksmith\Semaphore; -use MiMatus\Locksmith\Semaphore\RedisSemaphore; use MiMatus\Locksmith\Semaphore\TimeProvider; +use MiMatus\Locksmith\SemaphoreInterface; use Override; use PHPUnit\Framework\MockObject\Stub; use PHPUnit\Framework\TestCase; @@ -18,9 +16,9 @@ abstract class SemaphoreTestCase extends TestCase { - private TimeProvider&Stub $timeProvider; + protected TimeProvider&Stub $timeProvider; - private int $currentTime = 0; + protected int $currentTime = 0; /** * @throws Exception @@ -41,7 +39,10 @@ protected function advanceTime(int $nanoseconds): void $this->currentTime += $nanoseconds; } - abstract protected function createSemaphore(TimeProvider $timeProvider, int $maxConcurrentLocks = 1): Semaphore; + abstract protected function createSemaphore( + TimeProvider $timeProvider, + int $maxConcurrentLocks = 1, + ): SemaphoreInterface; /** * @throws Throwable @@ -50,11 +51,16 @@ public function testBasicLock(): void { $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); - $resource = new Resource(ttlNanoseconds: 5_000_000_000, namespace: 'test-lock-key'); + $resource = new Resource(namespace: 'test-lock-key'); - $semaphore->lock($resource, 'test-lock-token', static function (): void { - self::fail('Suspension should not be called when lock is available'); - }); + $semaphore->lock( + resource: $resource, + token: 'test-lock-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 5_000_000_000, + suspension: static function (): void { + self::fail('Suspension should not be called when lock is available'); + }, + ); self::assertTrue($semaphore->isLocked($resource)); @@ -70,18 +76,18 @@ public function testLockingWithLowerVersion(): void { $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); - $resource1 = new Resource(ttlNanoseconds: 5_000_000_000, namespace: 'test-lock-key', version: 1); + $resource1 = new Resource(namespace: 'test-lock-key', version: 1); - $resource2 = new Resource(ttlNanoseconds: 5_000_000_000, namespace: 'test-lock-key', version: 0); + $resource2 = new Resource(namespace: 'test-lock-key', version: 0); - $semaphore->lock($resource1, 'test-lock-token-1', static function (): void { + $semaphore->lock($resource1, 'test-lock-token-1', 5_000_000_000, static function (): void { self::fail('Suspension should not be called when lock is available'); }); self::assertTrue($semaphore->isLocked($resource1)); $this->expectExceptionObject(new RuntimeException('Lock version mismatch')); - $semaphore->lock($resource2, 'test-lock-token-2', static function () { + $semaphore->lock($resource2, 'test-lock-token-2', 5_000_000_000, static function () { self::fail('Suspension should not be called'); }); } @@ -93,28 +99,26 @@ public function testLockingAlreadyLockedKey(): void { $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); - $resource = new Resource( - ttlNanoseconds: 5_000_000_000, // 5s - namespace: 'test-lock-key', - ); + $resource = new Resource(namespace: 'test-lock-key'); - $semaphore->lock($resource, 'test-lock-token', static function (): void { + $semaphore->lock($resource, 'test-lock-token', 5_000_000_000, static function (): void { self::fail('Suspension should not be called when lock is available'); }); - $called = false; - $fiber = new Fiber(static function () use ($semaphore, $resource, &$called): void { - $semaphore->lock($resource, 'test-lock-token-2', static function () use (&$called): void { - $called = true; - Fiber::suspend(); - }); - }); - - $fiber->start(); - - self::assertTrue($called); - self::assertTrue($fiber->isSuspended()); - self::assertFalse($fiber->isTerminated()); + $isSuspended = static function () use ($resource, $semaphore): bool { + try { + $semaphore->lock($resource, 'test-lock-token-2', 5_000_000_000, static function () use ( + &$called, + ): void { + throw new RuntimeException('SUSPENSION_CALLED'); + }); + } catch (Throwable $e) { + return $e->getMessage() === 'SUSPENSION_CALLED' ? true : throw $e; + } + return false; + }; + + self::assertTrue($isSuspended()); self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); @@ -122,7 +126,7 @@ public function testLockingAlreadyLockedKey(): void self::assertFalse($semaphore->isLocked($resource), 'Lock should be released after unlock'); - $fiber->resume(); + self::assertFalse($isSuspended()); self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); $semaphore->unlock($resource, 'test-lock-token-2'); @@ -136,35 +140,28 @@ public function testLockingHigherVersion(): void { $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); - $resource1 = new Resource( - ttlNanoseconds: 5_000_000_000, // 5s - namespace: 'test-lock-key', - version: 0, - ); + $resource1 = new Resource(namespace: 'test-lock-key', version: 0); - $resource2 = new Resource( - ttlNanoseconds: 5_000_000_000, // 5s - namespace: 'test-lock-key', - version: 1, - ); + $resource2 = new Resource(namespace: 'test-lock-key', version: 1); - $semaphore->lock($resource1, 'test-lock-token', static function (): void { + $semaphore->lock($resource1, 'test-lock-token', 5_000_000_000, static function (): void { self::fail('Suspension should not be called when lock is available'); }); - $called = false; - $fiber = new Fiber(static function () use ($semaphore, $resource2, &$called): void { - $semaphore->lock($resource2, 'test-lock-token-2', static function () use (&$called): void { - $called = true; - Fiber::suspend(); - }); - }); - - $fiber->start(); - - self::assertTrue($called); - self::assertTrue($fiber->isSuspended()); - self::assertFalse($fiber->isTerminated()); + $isSuspended = static function () use ($resource2, $semaphore): bool { + try { + $semaphore->lock($resource2, 'test-lock-token-2', 5_000_000_000, static function () use ( + &$called, + ): void { + throw new RuntimeException('SUSPENSION_CALLED'); + }); + } catch (Throwable $e) { + return $e->getMessage() === 'SUSPENSION_CALLED' ? true : throw $e; + } + return false; + }; + + self::assertTrue($isSuspended()); self::assertTrue($semaphore->isLocked($resource1), 'Lock should be held'); @@ -172,7 +169,7 @@ public function testLockingHigherVersion(): void self::assertFalse($semaphore->isLocked($resource1), 'Lock should be released after unlock'); - $fiber->resume(); + self::assertFalse($isSuspended()); self::assertTrue($semaphore->isLocked($resource2), 'Lock should be held'); $semaphore->unlock($resource2, 'test-lock-token-2'); @@ -187,32 +184,30 @@ public function testSemaphoreWithCapacity(): void { $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider, maxConcurrentLocks: 2); - $resource = new Resource( - ttlNanoseconds: 5_000_000_000, // 5s - namespace: 'test-lock-key', - ); + $resource = new Resource(namespace: 'test-lock-key'); - $semaphore->lock($resource, 'test-lock-token-1', static function (): void { + $semaphore->lock($resource, 'test-lock-token-1', 5_000_000_000, static function (): void { self::fail('Suspension should not be called when lock is available'); }); - $semaphore->lock($resource, 'test-lock-token-2', static function (): void { + $semaphore->lock($resource, 'test-lock-token-2', 5_000_000_000, static function (): void { self::fail('Suspension should not be called when lock is available'); }); - $called = false; - $fiber = new Fiber(static function () use ($semaphore, $resource, &$called): void { - $semaphore->lock($resource, 'test-lock-token-3', static function () use (&$called): void { - $called = true; - Fiber::suspend(); - }); - }); - - $fiber->start(); - - self::assertTrue($called); - self::assertTrue($fiber->isSuspended()); - self::assertFalse($fiber->isTerminated()); + $isSuspended = static function () use ($resource, $semaphore): bool { + try { + $semaphore->lock($resource, 'test-lock-token-3', 5_000_000_000, static function () use ( + &$called, + ): void { + throw new RuntimeException('SUSPENSION_CALLED'); + }); + } catch (Throwable $e) { + return $e->getMessage() === 'SUSPENSION_CALLED' ? true : throw $e; + } + return false; + }; + + self::assertTrue($isSuspended()); self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); @@ -228,50 +223,34 @@ public function testSemaphoreWithCapacity(): void */ public function testLockExpiration(): void { - /** - * @var RedisSemaphore - */ $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider, maxConcurrentLocks: 2); - $resource1 = new Resource( - ttlNanoseconds: 500_000_000, // 0.5 second - namespace: 'test-lock-key', - ); - - $resource2 = new Resource( - ttlNanoseconds: 2_000_000_000, // 2 seconds - namespace: 'test-lock-key', - ); - - $resource3 = new Resource( - ttlNanoseconds: 2_000_000_000, // 2 seconds - namespace: 'test-lock-key', - ); + $resource = new Resource(namespace: 'test-lock-key'); - $semaphore->lock($resource1, '1', static function (): void { + $semaphore->lock($resource, '1', 500_000_000, static function (): void { self::fail('Suspension should not be called when lock is available'); }); - $semaphore->lock($resource2, '2', static function (): void { + $semaphore->lock($resource, '2', 2_000_000_000, static function (): void { self::fail('Suspension should not be called when lock is available'); }); - self::assertTrue($semaphore->isLocked($resource1), 'Lock should be held'); + self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); // Move time forward to just after key1 expiration $this->advanceTime(1_000_000_000); - self::assertTrue($semaphore->isLocked($resource1), 'Lock should be held'); + self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); - $semaphore->lock($resource3, '3', static function (): void { + $semaphore->lock($resource, '3', 2_000_000_000, static function (): void { self::fail('Suspension should not be called when lock is available - token 1 expired'); }); - self::assertTrue($semaphore->isLocked($resource1), 'Lock should be held'); + self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); $this->advanceTime(3_000_000_000); - self::assertFalse($semaphore->isLocked($resource1), 'Lock should be released after all expirations'); + self::assertFalse($semaphore->isLocked($resource), 'Lock should be released after all expirations'); } /** @@ -281,22 +260,19 @@ public function testUnlockingWithInvalidToken(): void { $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); - $resource1 = new Resource( - ttlNanoseconds: 500_000_000, // 0.5 second - namespace: 'test-lock-key', - ); + $resource = new Resource(namespace: 'test-lock-key'); - $semaphore->lock($resource1, '1', static function (): void { + $semaphore->lock($resource, '1', 500_000_000, static function (): void { self::fail('Suspension should not be called when lock is available'); }); - self::assertTrue($semaphore->isLocked($resource1), 'Lock should be held'); + self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); - $semaphore->unlock($resource1, 'invalid-token'); + $semaphore->unlock($resource, 'invalid-token'); - self::assertTrue($semaphore->isLocked($resource1), 'Lock should be held'); + self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); - $semaphore->unlock($resource1, '1'); + $semaphore->unlock($resource, '1'); - self::assertFalse($semaphore->isLocked($resource1), 'Lock should be released after unlock'); + self::assertFalse($semaphore->isLocked($resource), 'Lock should be released after unlock'); } } diff --git a/tests/Unit/DistributedSemaphoreTest.php b/tests/Unit/DistributedSemaphoreTest.php new file mode 100644 index 0000000..09816a0 --- /dev/null +++ b/tests/Unit/DistributedSemaphoreTest.php @@ -0,0 +1,323 @@ +method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + self::assertEquals(1_000_000_000 - ($lockAttempt * 500_000_001), $lockTTLNs); + + $currentTime += 500_000_001; + $lockAttempt++; + throw new \RuntimeException('Lock failed on semaphore 1'); + }); + + $semaphore2 + ->method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + self::assertEquals(1_000_000_000 - ($lockAttempt * 500_000_001), $lockTTLNs); + + $currentTime += 500_000_001; + $lockAttempt++; + throw new \RuntimeException('Lock failed on semaphore 2'); + }); + + $semaphore1->expects(self::atLeastOnce())->method('unlock'); + $semaphore2->expects(self::atLeastOnce())->method('unlock'); + + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime): int { + return $currentTime; + }); + + $this->expectExceptionObject(new GroupedException('Failed to acquire lock quorum', [])); + + $distributedSemaphore->lock( + resource: new Resource(namespace: 'test-resource'), + token: 'test-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 1_000_000_000, // 1 second + suspension: static function (): void {}, + ); + } + + public function testAcquiredLockWithoutErrors(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore1 = self::createStub(SemaphoreInterface::class); + $semaphore2 = self::createStub(SemaphoreInterface::class); + $timeProvider = self::createStub(TimeProvider::class); + $currentTime = 0; + + $distributedSemaphore = new DistributedSemaphore( + semaphores: new SemaphoreCollection([$semaphore1, $semaphore2]), + quorum: 1, + timeProvider: $timeProvider, + ); + + $lockAttempt = 0; + $lockedSemaphore1 = false; + $lockedSemaphore2 = false; + $semaphore1 + ->method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt, &$lockedSemaphore1): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + self::assertEquals(1_000_000_000 - ($lockAttempt * 500_000_001), $lockTTLNs); + self::assertFalse($lockedSemaphore1, 'Semaphore 1 should not have been locked yet'); + + $currentTime += 500_000_001; + $lockAttempt++; + $lockedSemaphore1 = true; + }); + + $semaphore2 + ->method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt, &$lockedSemaphore2): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + self::assertEquals(1_000_000_000 - ($lockAttempt * 500_000_001), $lockTTLNs); + self::assertFalse($lockedSemaphore2, 'Semaphore 2 should not have been locked yet'); + + $currentTime += 500_000_001; + $lockAttempt++; + $lockedSemaphore2 = true; + }); + + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime): int { + return $currentTime; + }); + + $distributedSemaphore->lock( + resource: new Resource(namespace: 'test-resource'), + token: 'test-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 1_000_000_000, // 1 second + suspension: static function (): void {}, + ); + + self::assertTrue($lockedSemaphore1 || $lockedSemaphore2, 'At least one semaphore should have been locked'); + } + + public function testAcquiredLockWithErrors(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore = self::createStub(SemaphoreInterface::class); + $timeProvider = self::createStub(TimeProvider::class); + $currentTime = 0; + $lockAttempt = 0; + $locksAquired = 0; + + $semaphore + ->method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt, &$locksAquired): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + + $currentTime += 10_000; + + if (($lockAttempt % 3) === 0) { + $lockAttempt++; + $locksAquired++; + } else { + $lockAttempt++; + throw new \RuntimeException('Lock failed on semaphore'); + } + }); + + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime): int { + return $currentTime; + }); + + $distributedSemaphore = new DistributedSemaphore( + semaphores: new SemaphoreCollection([ + clone $semaphore, + clone $semaphore, + clone $semaphore, + clone $semaphore, + clone $semaphore, + ]), + quorum: 3, + timeProvider: $timeProvider, + ); + + $distributedSemaphore->lock( + resource: new Resource(namespace: 'test-resource'), + token: 'test-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 1_000_000_000, // 1 second + suspension: static function (): void {}, + ); + + self::assertEquals(3, $locksAquired, 'Exactly three semaphores should have been locked'); + } + + public function testUnlockWithErrors(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore = self::createStub(SemaphoreInterface::class); + + $distributedSemaphore = new DistributedSemaphore(semaphores: new SemaphoreCollection([ + $semaphore, + $semaphore, + $semaphore, + $semaphore, + $semaphore, + ]), quorum: 3); + + $unlockAttempt = 0; + $unlocksPerformed = 0; + $semaphore + ->method('unlock') + ->willReturnCallback(static function (ResourceInterface $r, #[\SensitiveParameter] string $token) use ( + $resource, + &$unlockAttempt, + &$unlocksPerformed, + ): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + + if (($unlockAttempt % 3) === 0) { + $unlockAttempt++; + throw new \RuntimeException('Unlock failed on semaphore'); + } else { + $unlockAttempt++; + $unlocksPerformed++; + } + }); + + $distributedSemaphore->unlock(resource: new Resource(namespace: 'test-resource'), token: 'test-token'); // @mago-ignore lint:no-literal-password + + self::assertEquals(3, $unlocksPerformed, 'Exactly three semaphores should have been unlocked'); + } + + public function testUnableToUnlock(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore = self::createStub(SemaphoreInterface::class); + + $distributedSemaphore = new DistributedSemaphore(semaphores: new SemaphoreCollection([ + $semaphore, + $semaphore, + $semaphore, + $semaphore, + $semaphore, + ]), quorum: 3); + + $unlockAttempt = 0; + $unlocksPerformed = 0; + $semaphore + ->method('unlock') + ->willReturnCallback(static function (ResourceInterface $r, #[\SensitiveParameter] string $token) use ( + $resource, + &$unlockAttempt, + &$unlocksPerformed, + ): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + + if (($unlockAttempt % 3) === 0) { + $unlocksPerformed++; + $unlockAttempt++; + } else { + $unlockAttempt++; + throw new \RuntimeException('Unlock failed on semaphore'); + } + }); + + $this->expectExceptionObject(new GroupedException('Failed to release lock quorum', [])); + + $distributedSemaphore->unlock(resource: new Resource(namespace: 'test-resource'), token: 'test-token'); // @mago-ignore lint:no-literal-password + } + + public function testIsLockedOnQuorum(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore = self::createStub(SemaphoreInterface::class); + + $distributedSemaphore = new DistributedSemaphore(semaphores: new SemaphoreCollection([ + $semaphore, + $semaphore, + $semaphore, + $semaphore, + $semaphore, + ]), quorum: 3); + + $isLockedAttempt = 0; + $semaphore + ->method('isLocked') + ->willReturnCallback(static function (ResourceInterface $r) use ($resource, &$isLockedAttempt): bool { + self::assertEquals($resource, $r); + + if (($isLockedAttempt % 3) === 0) { + $isLockedAttempt++; + throw new \RuntimeException('Unlock failed on semaphore'); + } else { + $isLockedAttempt++; + return true; + } + }); + + self::assertTrue($distributedSemaphore->isLocked(resource: new Resource(namespace: 'test-resource'))); + } +} diff --git a/tests/Unit/FiberTaskExecutorTest.php b/tests/Unit/FiberTaskExecutorTest.php new file mode 100644 index 0000000..0b6b2d2 --- /dev/null +++ b/tests/Unit/FiberTaskExecutorTest.php @@ -0,0 +1,119 @@ +getResultUnderTTL( + static function () use ($timeProvider) { + return 'result'; + }, + ttlNanoseconds: 1_000_000_000, // Set TTL to 1 second + minSuspensionDelayNs: 100_000_000, // Set minimum suspension delay to 0.1 seconds + ); + + $this->assertSame('result', $result); + } + + public function testResultRetrievalTookTooLongBlocking(): void + { + $currentTime = 0; + $timeProvider = self::createStub(TimeProvider::class); + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $taskExecutor = new FiberTaskExecutor($timeProvider); + + $this->expectExceptionObject(new RuntimeException('Unable to get result under TTL')); + + $taskExecutor->getResultUnderTTL( + static function () use ($timeProvider, &$currentTime) { + $currentTime += 1_500_000_000; // Simulate a long-running task (1.5 seconds) + return 'result'; + }, + ttlNanoseconds: 1_000_000_000, // Set TTL to 1 second + minSuspensionDelayNs: 100_000_000, // Set minimum suspension delay to 0.1 seconds + ); + } + + public function testResultRetrievalNonBlocking(): void + { + $currentTime = 0; + $timeProvider = self::createStub(TimeProvider::class); + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $taskExecutor = new FiberTaskExecutor($timeProvider); + + $fiber = new Fiber(static function () use ($taskExecutor, $timeProvider, &$currentTime) { + return $taskExecutor->getResultUnderTTL( + static function () use ($timeProvider, &$currentTime) { + $currentTime += 100_000_000; // Simulate a short task (0.1 seconds) + Fiber::suspend(); // Suspend to trigger the TTL check in the executor + $currentTime += 100_000_000; // Simulate a short task (0.1 seconds) + return 'result'; + }, + ttlNanoseconds: 1_000_000_000, // Set TTL to 1 second + minSuspensionDelayNs: 100_000_000, // Set minimum suspension delay to 0.1 seconds + ); + }); + + $fiber->start(); + $this->assertSame(100_000_000, $currentTime); + + $fiber->resume(); + + $this->assertSame(200_000_000, $currentTime); + $this->assertTrue($fiber->isTerminated()); + $this->assertSame('result', $fiber->getReturn()); + } + + public function testResultRetrievalTookTooLongNonBlocking(): void + { + $currentTime = 0; + $timeProvider = self::createStub(TimeProvider::class); + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $taskExecutor = new FiberTaskExecutor($timeProvider); + + $fiber = new Fiber(static function () use ($taskExecutor, $timeProvider, &$currentTime) { + return $taskExecutor->getResultUnderTTL( + static function () use ($timeProvider, &$currentTime) { + $currentTime += 1_500_000_000; // Simulate a long-running task (1.5 seconds) + Fiber::suspend(); // Suspend to trigger the TTL check in the executor + return 'result'; + }, + ttlNanoseconds: 1_000_000_000, // Set TTL to 1 second + minSuspensionDelayNs: 100_000_000, // Set minimum suspension delay to 0.1 seconds + ); + }); + + $this->expectExceptionObject(new RuntimeException('Unable to get result under TTL')); + + $fiber->start(); + $this->assertSame(1_500_000_000, $currentTime); + } +} diff --git a/tests/Unit/InMemorySemaphoreTest.php b/tests/Unit/InMemorySemaphoreTest.php index d822864..6d9dd92 100644 --- a/tests/Unit/InMemorySemaphoreTest.php +++ b/tests/Unit/InMemorySemaphoreTest.php @@ -4,13 +4,14 @@ namespace MiMatus\Locksmith\Tests\Unit; -use MiMatus\Locksmith\Semaphore; use MiMatus\Locksmith\Semaphore\InMemorySemaphore; use MiMatus\Locksmith\Semaphore\TimeProvider; +use MiMatus\Locksmith\SemaphoreInterface; +use MiMatus\Locksmith\Tests\SemaphoreTestCase; class InMemorySemaphoreTest extends SemaphoreTestCase { - protected function createSemaphore(TimeProvider $timeProvider, int $maxConcurrentLocks = 1): Semaphore + protected function createSemaphore(TimeProvider $timeProvider, int $maxConcurrentLocks = 1): SemaphoreInterface { return new InMemorySemaphore(timeProvider: $timeProvider, maxConcurrentLocks: $maxConcurrentLocks); } diff --git a/tests/Unit/LocksmithTest.php b/tests/Unit/LocksmithTest.php index 89d4f73..b271eec 100644 --- a/tests/Unit/LocksmithTest.php +++ b/tests/Unit/LocksmithTest.php @@ -4,285 +4,15 @@ namespace MiMatus\Locksmith\Tests\Unit; -use Closure; -use Exception; -use MiMatus\Locksmith\Locksmith; -use MiMatus\Locksmith\Resource; -use MiMatus\Locksmith\Semaphore; +use MiMatus\Locksmith\FiberTaskExecutor; use MiMatus\Locksmith\Semaphore\TimeProvider; -use Override; -use PHPUnit\Framework\MockObject\MockObject; -use PHPUnit\Framework\TestCase; -use Random\Engine; -use RuntimeException; +use MiMatus\Locksmith\TaskExecutorInterface; +use MiMatus\Locksmith\Tests\LocksmithTestCase; -class LocksmithTest extends TestCase +class LocksmithTest extends LocksmithTestCase { - private Semaphore&MockObject $semaphore; - private TimeProvider&MockObject $timeProvider; - private Engine&MockObject $randomEngine; - - /** - * @throws Exception - */ - #[Override] - protected function setUp(): void - { - $this->semaphore = $this->createMock(Semaphore::class); - $this->timeProvider = $this->createMock(TimeProvider::class); - $this->randomEngine = $this->createMock(Engine::class); - } - - public function testUnableToAcquireLockTimeout(): void - { - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->atLeastOnce()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - Closure $suspension, - ) use (&$currentTime): void { - $suspension(); - $currentTime += 500_000_001; - $suspension(); - }); - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked( - resource: new Resource(namespace: 'test-resource', version: 1, ttlNanoseconds: 1_000_000_000), - maxLockWaitNs: 500_000_000, - minSuspensionDelayNs: 10_000, - ); - - $this->expectExceptionObject(new RuntimeException('Unable to get result under TTL')); - $locked(static function (): void { - self::fail('Lock should not be acquired'); - }); - } - - public function testUnableToAcquireLock(): void - { - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->atLeastOnce()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - Closure $suspension, - ) use (&$currentTime): void { - throw new RuntimeException('error'); - }); - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked( - resource: new Resource(namespace: 'test-resource', version: 1, ttlNanoseconds: 1_000_000_000), - maxLockWaitNs: 500_000_000, - minSuspensionDelayNs: 10_000, - ); - - $this->expectExceptionObject(new RuntimeException('error')); - $locked(static function (): void { - self::fail('Lock should not be acquired'); - }); - } - - public function testLostDuringExecution(): void - { - $resource = new Resource(namespace: 'test-resource', version: 1, ttlNanoseconds: 1_000_000_000); - - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->once()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - Closure $suspension, - ) use (&$currentTime): void { - // Lock acquired immediately - }); - - $this->semaphore - ->expects($this->once()) - ->method('isLocked') - ->with(self::equalTo($resource)) - ->willReturn(false); - - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked(resource: $resource, maxLockWaitNs: 500_000_000, minSuspensionDelayNs: 10_000); - - $this->expectExceptionObject(new RuntimeException('Lock has been lost during process')); - $locked(static function (Closure $suspension): void { - $suspension(); // Simulate some processing and force lock check - }); - } - - public function testUnableToUnlock(): void - { - $resource = new Resource(namespace: 'test-resource', version: 1, ttlNanoseconds: 1_000_000_000); - - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->once()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - Closure $suspension, - ) use (&$currentTime): void { - // Lock acquired immediately - }); - - $this->semaphore - ->expects($this->once()) - ->method('unlock') - ->willReturnCallback(static function (Resource $resource, #[\SensitiveParameter] string $token) use ( - &$currentTime, - ): void { - throw new RuntimeException('error during unlock'); - }); - - $this->semaphore - ->expects($this->once()) - ->method('isLocked') - ->with(self::equalTo($resource)) - ->willReturn(false); - - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked(resource: $resource, maxLockWaitNs: 500_000_000, minSuspensionDelayNs: 10_000); - - $this->expectExceptionObject(new RuntimeException('error during unlock')); - $locked(static function (Closure $suspension): void { - // Simulate some processing - }); - } - - public function testLocked(): void + protected function createTaskExecutor(TimeProvider $timeProvider): TaskExecutorInterface { - $resource = new Resource(namespace: 'test-resource', version: 1, ttlNanoseconds: 1_000_000_000); - - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->once()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - Closure $suspension, - ) use (&$currentTime): void { - // Lock acquired immediately - }); - - $this->semaphore - ->expects($this->once()) - ->method('unlock') - ->willReturnCallback(static function (Resource $resource, #[\SensitiveParameter] string $token) use ( - &$currentTime, - ): void { - // Unlock successful - }); - - $this->semaphore - ->expects($this->once()) - ->method('isLocked') - ->with(self::equalTo($resource)) - ->willReturn(true); - - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked(resource: $resource, maxLockWaitNs: 500_000_000, minSuspensionDelayNs: 10_000); - - $called = false; - $locked(static function (Closure $suspension) use (&$called): void { - // Simulate some processing - $called = true; - }); - - self::assertTrue($called, 'Locked callback executed'); + return new FiberTaskExecutor($timeProvider); } } diff --git a/tests/Unit/RedisSemaphoreTest.php b/tests/Unit/RedisSemaphoreTest.php deleted file mode 100644 index d5d13fb..0000000 --- a/tests/Unit/RedisSemaphoreTest.php +++ /dev/null @@ -1,46 +0,0 @@ -redis = new Redis(); - $this->redis->connect('redis'); - } - - protected function tearDown(): void - { - parent::tearDown(); - $this->redis->flushAll(); - $this->redis->close(); - } - - protected function advanceTime(int $nanoseconds): void - { - usleep((int) $nanoseconds / 1000); - parent::advanceTime($nanoseconds); - } - - protected function createSemaphore(TimeProvider $timeProvider, $maxConcurrentLocks = 1): Semaphore - { - return new RedisSemaphore(redisClient: $this->redis, maxConcurrentLocks: $maxConcurrentLocks); - } -} diff --git a/tests/Unit/SemaphoreCollectionTest.php b/tests/Unit/SemaphoreCollectionTest.php new file mode 100644 index 0000000..d75e725 --- /dev/null +++ b/tests/Unit/SemaphoreCollectionTest.php @@ -0,0 +1,70 @@ +createStub(SemaphoreInterface::class); + $semaphore2 = $this->createStub(SemaphoreInterface::class); + $semaphore3 = $this->createStub(SemaphoreInterface::class); + + $collection = new SemaphoreCollection([ + $semaphore1, + $semaphore2, + $semaphore3, + ]); + + $semaphores = []; + foreach ($collection as $semaphore) { + $semaphores[] = $semaphore; + } + + self::assertSame([$semaphore1, $semaphore2, $semaphore3], $semaphores); + } + + public function testCount(): void + { + $semaphore1 = $this->createStub(SemaphoreInterface::class); + $semaphore2 = $this->createStub(SemaphoreInterface::class); + + $collection = new SemaphoreCollection([ + $semaphore1, + $semaphore2, + ]); + + self::assertSame(2, $collection->count()); + } + + public function testWithout(): void + { + $semaphore1 = $this->createStub(SemaphoreInterface::class); + $semaphore2 = $this->createStub(SemaphoreInterface::class); + $semaphore3 = $this->createStub(SemaphoreInterface::class); + + $collection = new SemaphoreCollection([ + $semaphore1, + $semaphore2, + $semaphore3, + ]); + + $newCollection = $collection->without($semaphore2); + + self::assertSame(3, $collection->count()); + self::assertSame(2, $newCollection->count()); + + $semaphores = []; + foreach ($newCollection as $semaphore) { + $semaphores[] = $semaphore; + } + + self::assertSame([$semaphore1, $semaphore3], $semaphores); + } +}