From e77a1e4375cd894dfe88c25392852e36f13a71fb Mon Sep 17 00:00:00 2001 From: Katie Volz Date: Wed, 1 Dec 2021 01:54:12 -0500 Subject: [PATCH 1/4] AmPHP Beanstalk v1 --- composer.json | 24 +- docs/index.md | 2 +- docs/jobs.md | 20 +- docs/misc.md | 2 +- docs/tubes.md | 24 +- examples/consumer.php | 14 +- examples/producer.php | 14 +- examples/stats.php | 19 +- src/BeanstalkClient.php | 453 ++++++++----------- src/Connection.php | 96 ++-- src/Parser.php | 8 +- src/Stats/Job.php | 13 +- src/Stats/System.php | 13 +- src/Stats/Tube.php | 13 +- test/BeanstalkClientConnectionClosedTest.php | 27 +- test/IntegrationTest.php | 145 +++--- test/ParserTest.php | 35 +- 17 files changed, 405 insertions(+), 517 deletions(-) diff --git a/composer.json b/composer.json index 3f062c9..ef808a5 100644 --- a/composer.json +++ b/composer.json @@ -13,20 +13,28 @@ "Amp\\Beanstalk\\": "src" } }, - "require": { - "amphp/amp": "^2", - "amphp/socket": "^1.0", + "autoload-dev": { + "psr-4": { + "Amp\\Beanstalk\\Test\\": "test" + } + }, + "minimum-stability": "dev", + "prefer-stable": true, + "require": { + "php": "^8.1", + "amphp/amp": "^3", + "amphp/socket": "^2", "amphp/uri": "^0.1", - "symfony/yaml": "^3.3|^4|^5" + "symfony/yaml": "^3.3|^4|^5|^6" }, "require-dev": { - "amphp/phpunit-util": "^1", - "phpunit/phpunit": "^6", - "friendsofphp/php-cs-fixer": "^2.3" + "amphp/phpunit-util": "^2", + "phpunit/phpunit": "^9", + "friendsofphp/php-cs-fixer": "^3" }, "config": { "platform": { - "php": "7.1.0" + "php": "8.1.0" } } } diff --git a/docs/index.md b/docs/index.md index 139e40f..7f10b3d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -20,7 +20,7 @@ $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); // can connect to the server with an additional tube query parameter. // $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300?tube=foobar"); -$systemStats = yield $beanstalk->getSystemStats(); +$systemStats = $beanstalk->getSystemStats(); $readyJobs = $systemStats->currentJobsReady; ``` diff --git a/docs/jobs.md b/docs/jobs.md index 0b23d4f..9a39407 100644 --- a/docs/jobs.md +++ b/docs/jobs.md @@ -20,7 +20,7 @@ $payload = json_encode([ "path" => "/path/to/image.png" ]); -$jobId = yield $beanstalk->put($payload); +$jobId = $beanstalk->put($payload); ``` ## Pulling Jobs off a Queue @@ -30,16 +30,16 @@ $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); $beanstalk->watch('foobar'); -while([$jobId, $jobData] = yield $beanstalk->reserve()) { +while([$jobId, $jobData] = $beanstalk->reserve()) { // Work the job using $jobData // Once you're finished, delete the job - yield $beanstalk->delete($jobId); - + $beanstalk->delete($jobId); + // If there was an error, you can bury the job for inspection later - yield $beanstalk->bury($jobId); - + $beanstalk->bury($jobId); + // Of you can release the job, to be picked up by a new worker - yield $beanstalk->release($jobId); + $beanstalk->release($jobId); } ``` @@ -50,10 +50,10 @@ $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); $beanstalk->watch('foobar'); -while([$jobId, $jobData] = yield $beanstalk->reserve()) { +while([$jobId, $jobData] = $beanstalk->reserve()) { // Work the job // If you still need time to work the job, you can utilize the touch command - yield $beantstalk->touch($jobId); + $beantstalk->touch($jobId); } ``` @@ -62,6 +62,6 @@ while([$jobId, $jobData] = yield $beanstalk->reserve()) { ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$jobStats = yield $beanstalk->getJobStats($jobId = 42); +$jobStats = $beanstalk->getJobStats($jobId = 42); $jobStats->state; // ready ``` diff --git a/docs/misc.md b/docs/misc.md index 18cc189..c3569c7 100644 --- a/docs/misc.md +++ b/docs/misc.md @@ -13,7 +13,7 @@ To see what stats are available for the system, checkout the [System](classes/sy ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$stats = yield $beanstalk->getSystemStats(); +$stats = $beanstalk->getSystemStats(); ``` ## Close the Connection diff --git a/docs/tubes.md b/docs/tubes.md index 588a257..9d331aa 100644 --- a/docs/tubes.md +++ b/docs/tubes.md @@ -14,7 +14,7 @@ By default Beanstalk will use the default tube for reserving and storing new job $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); // This will store the job on the "default" tube. -$jobId = yield $beanstalk->put($payload = json_encode([ +$jobId = $beanstalk->put($payload = json_encode([ "job" => bin2hex(random_bytes(16)), "type" => "compress-image" "path" => "/path/to/image.png" @@ -23,7 +23,7 @@ $jobId = yield $beanstalk->put($payload = json_encode([ $beanstalk->use('foobar'); // This will store the job on the "foobar" tube. -$jobId = yield $beanstalk->put($payload = json_encode([ +$jobId = $beanstalk->put($payload = json_encode([ "job" => bin2hex(random_bytes(16)), "type" => "compress-image" "path" => "/path/to/image.png" @@ -37,7 +37,7 @@ If you need to pause a tube, preventing any new jobs from being reserved, you ca ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -yield $beanstalk->pause($tube = 'foobar'); +$beanstalk->pause($tube = 'foobar'); ``` ## Watching and Ignoring Tubes @@ -47,9 +47,9 @@ By default when you reserve a job you'll either pull from the `default` tube, or ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -yield $beanstalk->watch($tube = 'foobar'); -yield $beanstalk->watch($tube = 'barbaz'); -yield $beanstalk->ignore($tube = 'default'); +$beanstalk->watch($tube = 'foobar'); +$beanstalk->watch($tube = 'barbaz'); +$beanstalk->ignore($tube = 'default'); // Watchlist will contain "foobar" and "barbaz" ``` @@ -60,9 +60,9 @@ To find out which tubes your connection is currently watching. ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -yield $beanstalk->watch($tube = 'foobar'); -yield $beanstalk->watch($tube = 'barbaz'); -yield $beanstalk->ignore($tube = 'default'); +$beanstalk->watch($tube = 'foobar'); +$beanstalk->watch($tube = 'barbaz'); +$beanstalk->ignore($tube = 'default'); $watchlist = $beanstalk->listWatchedTubes(); ``` @@ -74,7 +74,7 @@ If you need to see a list of all the tubes that exist on the server. ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$tubes = yield $beanstalk->listTubes(); +$tubes = $beanstalk->listTubes(); ``` ## Get the Tube Being Used @@ -84,7 +84,7 @@ To determine which tube your client is currently using. ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$tube = yield $beanstalk->getUsedTube(); +$tube = $beanstalk->getUsedTube(); ``` ## Get Tube Stats @@ -94,5 +94,5 @@ To see what stats are available for a tube, checkout the [Tube](classes/tube) cl ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$stats = yield $beanstalk->getTubeStats($tube = 'default'); +$stats = $beanstalk->getTubeStats($tube = 'default'); ``` diff --git a/examples/consumer.php b/examples/consumer.php index 2c189dc..0b5f851 100644 --- a/examples/consumer.php +++ b/examples/consumer.php @@ -3,16 +3,16 @@ require __DIR__ . '/../vendor/autoload.php'; use Amp\Beanstalk\BeanstalkClient; -use Amp\Loop; +$beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); +try { + $beanstalk->watch('foobar'); -Loop::run(function () { - $beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); - yield $beanstalk->watch('foobar'); - - while (list($jobId, $payload) = yield $beanstalk->reserve()) { + while (list($jobId, $payload) = $beanstalk->reserve()) { echo "Job id: $jobId\n"; echo "Payload: $payload\n"; $beanstalk->delete($jobId); } -}); +} finally { + $beanstalk->quit(); +} diff --git a/examples/producer.php b/examples/producer.php index 9f1104b..e9d9900 100644 --- a/examples/producer.php +++ b/examples/producer.php @@ -3,11 +3,9 @@ require __DIR__ . '/../vendor/autoload.php'; use Amp\Beanstalk\BeanstalkClient; -use Amp\Loop; - -Loop::run(function () { - $beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); - yield $beanstalk->use('foobar'); +$beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); +try { + $beanstalk->use('foobar'); $payload = json_encode([ "job" => bin2hex(random_bytes(16)), @@ -15,9 +13,9 @@ "path" => "/path/to/image.png" ]); - $jobId = yield $beanstalk->put($payload); + $jobId = $beanstalk->put($payload); echo "Inserted job id: $jobId\n"; - +} finally { $beanstalk->quit(); -}); +} diff --git a/examples/stats.php b/examples/stats.php index e6d46d6..42f37b4 100644 --- a/examples/stats.php +++ b/examples/stats.php @@ -3,18 +3,15 @@ require __DIR__ . '/../vendor/autoload.php'; use Amp\Beanstalk\BeanstalkClient; -use Amp\Beanstalk\Stats\System; -use Amp\Loop; -Loop::run(function () { - $beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); - - /** - * @var System $systemStats - */ - $systemStats = yield $beanstalk->getSystemStats(); +$beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); +try { + $systemStats = $beanstalk->getSystemStats(); echo "Active connections: {$systemStats->currentConnections}\n"; echo "Jobs ready: {$systemStats->currentJobsReady}\n"; - +} catch (\Throwable $t) { + echo $t::class . PHP_EOL; +} finally { + echo "Quit\n"; $beanstalk->quit(); -}); +} diff --git a/src/BeanstalkClient.php b/src/BeanstalkClient.php index e786173..353ddff 100644 --- a/src/BeanstalkClient.php +++ b/src/BeanstalkClient.php @@ -5,22 +5,18 @@ use Amp\Beanstalk\Stats\Job; use Amp\Beanstalk\Stats\System; use Amp\Beanstalk\Stats\Tube; -use function Amp\call; use Amp\Deferred; -use Amp\Promise; use Amp\Uri\Uri; use Symfony\Component\Yaml\Yaml; use Throwable; class BeanstalkClient { /** @var Deferred[] */ - private $deferreds; + private array $deferreds; - /** @var Connection */ - private $connection; + private Connection $connection; - /** @var string */ - private $tube; + private ?string $tube; public function __construct(string $uri) { $this->applyUri($uri); @@ -29,13 +25,12 @@ public function __construct(string $uri) { $this->connection = new Connection($uri); $this->connection->addEventHandler("response", function ($response) { - /** @var Deferred $deferred */ $deferred = array_shift($this->deferreds); if ($response instanceof Throwable) { - $deferred->fail($response); + $deferred->error($response); } else { - $deferred->resolve($response); + $deferred->complete($response); } }); @@ -61,394 +56,302 @@ private function applyUri(string $uri) { $this->tube = (new Uri($uri))->getQueryParameter("tube"); } - private function send(string $message, callable $transform = null): Promise { - return call(function () use ($message, $transform) { - $this->deferreds[] = $deferred = new Deferred; - $promise = $deferred->promise(); - - yield $this->connection->send($message); - $response = yield $promise; + private function send(string $message): array { + $this->deferreds[] = $deferred = new Deferred; - return $transform ? $transform($response) : $response; - }); + $this->connection->send($message); + return $deferred->getFuture()->await(); } - public function use(string $tube) { - return $this->send("use " . $tube . "\r\n", function () use ($tube) { - $this->tube = $tube; - return null; - }); + public function use(string $tube): void { + $this->send("use " . $tube . "\r\n"); + $this->tube = $tube; } - public function pause(string $tube, int $delay): Promise { + public function pause(string $tube, int $delay): void { $payload = "pause-tube $tube $delay\r\n"; + $response = $this->send($payload); + $type = $response[0]; + switch ($type) { + case "PAUSED": + return; - return $this->send($payload, function (array $response) use ($tube) { - list($type) = $response; - - switch ($type) { - case "PAUSED": - return null; + case "NOT_FOUND": + throw new NotFoundException("Tube with name $tube is not found"); - case "NOT_FOUND": - throw new NotFoundException("Tube with name $tube is not found"); - - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + default: + throw new BeanstalkException("Unknown response: " . $type); + } } - public function put(string $payload, int $timeout = 60, int $delay = 0, $priority = 0): Promise { + public function put(string $payload, int $timeout = 60, int $delay = 0, $priority = 0): int { $payload = "put $priority $delay $timeout " . strlen($payload) . "\r\n$payload\r\n"; - return $this->send($payload, function (array $response): int { - list($type) = $response; - - switch ($type) { - case "INSERTED": - case "BURIED": - return (int) $response[1]; + $response = $this->send($payload); + $type = $response[0]; - case "EXPECTED_CRLF": - throw new ExpectedCrlfException; - - case "JOB_TOO_BIG": - throw new JobTooBigException; - - case "DRAINING": - throw new DrainingException; - - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + return match ($type) { + "INSERTED", "BURIED" => (int) $response[1], + "EXPECTED_CRLF" => throw new ExpectedCrlfException, + "JOB_TOO_BIG" => throw new JobTooBigException, + "DRAINING" => throw new DrainingException, + default => throw new BeanstalkException("Unknown response: " . $type), + }; } - public function reserve(int $timeout = null): Promise { + public function reserve(int $timeout = null): array { $payload = $timeout === null ? "reserve\r\n" : "reserve-with-timeout $timeout\r\n"; - return $this->send($payload, function (array $response): array { - list($type) = $response; + $response = $this->send($payload); + $type = $response[0]; - switch ($type) { - case "DEADLINE_SOON": - throw new DeadlineSoonException; - - case "TIMED_OUT": - throw new TimedOutException; - - case "RESERVED": - return [$response[1], $response[2]]; - - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + return match ($type) { + "DEADLINE_SOON" => throw new DeadlineSoonException(), + "TIMED_OUT" => throw new TimedOutException(), + "RESERVED" => [$response[1], $response[2]], + default => throw new BeanstalkException("Unknown response: " . $type), + }; } - public function delete(int $id): Promise { + public function delete(int $id): bool { $payload = "delete $id\r\n"; - return $this->send($payload, function (array $response): int { - list($type) = $response; - - switch ($type) { - case "DELETED": - return true; - - case "NOT_FOUND": - return false; + $response = $this->send($payload); + list($type) = $response; - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + return match ($type) { + "DELETED" => true, + "NOT_FOUND" => false, + default => throw new BeanstalkException("Unknown response: " . $type), + }; } - public function release(int $id, int $delay = 0, int $priority = 0): Promise { + public function release(int $id, int $delay = 0, int $priority = 0): string { $payload = "release $id $priority $delay\r\n"; + $response = $this->send($payload); + list($type) = $response; - return $this->send($payload, function (array $response): string { - list($type) = $response; - - switch ($type) { - case "BURIED": - case "RELEASED": - case "NOT_FOUND": - return $type; - - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + return match ($type) { + "BURIED", "RELEASED", "NOT_FOUND" => $type, + default => throw new BeanstalkException("Unknown response: " . $type), + }; } - public function bury(int $id, int $priority = 0): Promise { + public function bury(int $id, int $priority = 0): bool { $payload = "bury $id $priority\r\n"; - return $this->send($payload, function (array $response): int { - list($type) = $response; - - switch ($type) { - case "BURIED": - return true; + $response = $this->send($payload); + list($type) = $response; - case "NOT_FOUND": - return false; - - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + return match ($type) { + "BURIED" => true, + "NOT_FOUND" => false, + default => throw new BeanstalkException("Unknown response: " . $type), + }; } - public function kickJob(int $id): Promise { + public function kickJob(int $id): bool { $payload = "kick-job $id\r\n"; - return $this->send($payload, function (array $response): bool { - list($type) = $response; - - switch ($type) { - case "KICKED": - return true; - - case "NOT_FOUND": - return false; + $response = $this->send($payload); + list($type) = $response; - default: - throw new BeanstalkException("Unknown response: $type"); - } - }); + return match ($type) { + "KICKED" => true, + "NOT_FOUND" => false, + default => throw new BeanstalkException("Unknown response: $type"), + }; } - public function kick(int $count): Promise { + public function kick(int $count): int { $payload = "kick $count\r\n"; - return $this->send($payload, function (array $response): int { - list($type) = $response; + $response = $this->send($payload); + list($type) = $response; - switch ($type) { - case "KICKED": - return (int) $response[1]; - - default: - throw new BeanstalkException("Unknown response: $type"); - } - }); + return match ($type) { + "KICKED" => (int)$response[1], + default => throw new BeanstalkException("Unknown response: $type"), + }; } - public function touch(int $id): Promise { + public function touch(int $id): bool { $payload = "touch $id\r\n"; - return $this->send($payload, function (array $response): int { - list($type) = $response; - - switch ($type) { - case "TOUCHED": - return true; - - case "NOT_FOUND": - return false; + $response = $this->send($payload); + list($type) = $response; - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + return match ($type) { + "TOUCHED" => true, + "NOT_FOUND" => false, + default => throw new BeanstalkException("Unknown response: " . $type), + }; } - public function watch(string $tube): Promise { + public function watch(string $tube): int { $payload = "watch $tube\r\n"; - return $this->send($payload, function (array $response): int { - if ($response[0] !== "WATCHING") { - throw new BeanstalkException("Unknown response: " . $response[0]); - } + $response = $this->send($payload); + if ($response[0] !== "WATCHING") { + throw new BeanstalkException("Unknown response: " . $response[0]); + } - return (int) $response[1]; - }); + return (int) $response[1]; } - public function ignore(string $tube): Promise { + public function ignore(string $tube): int { $payload = "ignore $tube\r\n"; - return $this->send($payload, function (array $response): int { - list($type) = $response; + $response = $this->send($payload); + list($type) = $response; - switch ($type) { - case "WATCHING": - return (int) $response[1]; + switch ($type) { + case "WATCHING": + return (int) $response[1]; - case "NOT_IGNORED": - throw new NotIgnoredException; + case "NOT_IGNORED": + throw new NotIgnoredException; - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + default: + throw new BeanstalkException("Unknown response: " . $type); + } } - public function quit() { + public function quit(): void { $this->send("quit\r\n"); } - public function getJobStats(int $id): Promise { + public function getJobStats(int $id): Job { $payload = "stats-job $id\r\n"; + $response = $this->send($payload); - return $this->send($payload, function (array $response) use ($id): Job { - list($type) = $response; - - switch ($type) { - case "OK": - return new Job(Yaml::parse($response[1])); - - case "NOT_FOUND": - throw new NotFoundException("Job with $id is not found"); + list($type) = $response; - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + return match ($type) { + "OK" => new Job(Yaml::parse($response[1])), + "NOT_FOUND" => throw new NotFoundException("Job with $id is not found"), + default => throw new BeanstalkException("Unknown response: " . $type), + }; } - public function getTubeStats(string $tube): Promise { + public function getTubeStats(string $tube): Tube { $payload = "stats-tube $tube\r\n"; + $response = $this->send($payload); - return $this->send($payload, function (array $response) use ($tube): Tube { - list($type) = $response; + list($type) = $response; - switch ($type) { - case "OK": - return new Tube(Yaml::parse($response[1])); - - case "NOT_FOUND": - throw new NotFoundException("Tube $tube is not found"); - - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + return match ($type) { + "OK" => new Tube(Yaml::parse($response[1])), + "NOT_FOUND" => throw new NotFoundException("Tube $tube is not found"), + default => throw new BeanstalkException("Unknown response: " . $type), + }; } - public function getSystemStats(): Promise { + public function getSystemStats(): System { $payload = "stats\r\n"; - return $this->send($payload, function (array $response): System { - if ($response[0] !== "OK") { - throw new BeanstalkException("Unknown response: " . $response[0]); - } + $response = $this->send($payload); + if ($response[0] !== "OK") { + throw new BeanstalkException("Unknown response: " . $response[0]); + } - return new System(Yaml::parse($response[1])); - }); + return new System(Yaml::parse($response[1])); } - public function listTubes(): Promise { + public function listTubes(): array { $payload = "list-tubes\r\n"; - return $this->send($payload, function (array $response): array { - list($type) = $response; + $response = $this->send($payload); + list($type) = $response; - switch ($type) { - case "OK": - return Yaml::parse($response[1]); + switch ($type) { + case "OK": + return Yaml::parse($response[1]); - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + default: + throw new BeanstalkException("Unknown response: " . $type); + } } - public function listWatchedTubes(): Promise { + public function listWatchedTubes(): array { $payload = "list-tubes-watched\r\n"; - return $this->send($payload, function (array $response): array { - list($type) = $response; + $response = $this->send($payload); + list($type) = $response; - switch ($type) { - case "OK": - return Yaml::parse($response[1]); + switch ($type) { + case "OK": + return Yaml::parse($response[1]); - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + default: + throw new BeanstalkException("Unknown response: " . $type); + } } - public function getUsedTube(): Promise { + public function getUsedTube(): string { $payload = "list-tube-used\r\n"; - return $this->send($payload, function (array $response): string { - list($type) = $response; + $response = $this->send($payload); + list($type) = $response; - switch ($type) { - case "USING": - return $response[1]; + switch ($type) { + case "USING": + return $response[1]; - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + default: + throw new BeanstalkException("Unknown response: " . $type); + } } public function peek(int $id): Promise { $payload = "peek $id\r\n"; - return $this->send($payload, function (array $response) use ($id): string { - list($type) = $response; + $response = $this->send($payload); + list($type) = $response; - switch ($type) { - case "FOUND": - return $response[2]; + switch ($type) { + case "FOUND": + return $response[2]; - case "NOT_FOUND": - throw new NotFoundException("Job with $id is not found"); + case "NOT_FOUND": + throw new NotFoundException("Job with $id is not found"); - default: - throw new BeanstalkException("Unknown response: " . $type); - } - }); + default: + throw new BeanstalkException("Unknown response: " . $type); + } } - public function peekReady(): Promise { + public function peekReady(): string { return $this->peekInState('ready'); } - public function peekDelayed(): Promise { + public function peekDelayed(): string { return $this->peekInState('delayed'); } - public function peekBuried(): Promise { + public function peekBuried(): string { return $this->peekInState('buried'); } - private function peekInState(string $state): Promise { + private function peekInState(string $state): string { $payload = "peek-$state\r\n"; - return $this->send( - $payload, - function (array $response) use ($state): string { - list($type) = $response; - - switch ($type) { - case "FOUND": - return $response[2]; + $response = $this->send($payload); + list($type) = $response; - case "NOT_FOUND": - throw new NotFoundException("No Job in $state state"); - - default: - throw new BeanstalkException("Unknown response: " . $type); - } - } - ); + return match ($type) { + "FOUND" => $response[2], + "NOT_FOUND" => throw new NotFoundException("No Job in $state state"), + default => throw new BeanstalkException("Unknown response: " . $type), + }; } - private function failAllDeferreds(Throwable $error) { + private function failAllDeferreds(Throwable $error): void { // Fail any outstanding promises while ($this->deferreds) { /** @var Deferred $deferred */ $deferred = array_shift($this->deferreds); - $deferred->fail($error); + $deferred->error($error); } } } diff --git a/src/Connection.php b/src/Connection.php index ee4655c..fa18042 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -2,33 +2,26 @@ namespace Amp\Beanstalk; -use function Amp\asyncCall; -use function Amp\call; use Amp\Deferred; +use Revolt\EventLoop; use function Amp\Socket\connect; use Amp\Socket\ConnectContext; use Amp\Socket\Socket; -use Amp\Success; use Amp\Uri\Uri; class Connection { - /** @var Deferred */ - private $connectPromisor; + private Parser $parser; - /** @var Parser */ - private $parser; + private int $timeout = 5000; - /** @var int */ - private $timeout = 5000; - - /** @var Socket */ - private $socket; + private ?Socket $socket = null; /** @var string */ - private $uri; + private string $uri; /** @var callable[][] */ - private $handlers; + private array $handlers; + private ?Deferred $connectPromisor = null; public function __construct(string $uri) { $this->applyUri($uri); @@ -69,63 +62,42 @@ public function addEventHandler($events, callable $callback) { } } - public function send(string $payload) { - return call(function () use ($payload) { - yield $this->connect(); - yield $this->socket->write($payload); - }); + public function send(string $payload): void { + $this->connect(); + $this->socket->write($payload); } - private function connect() { - // If we're in the process of connecting already return that same promise - if ($this->connectPromisor) { - return $this->connectPromisor->promise(); + private function connect(): void + { + if($this->connectPromisor) { + $this->connectPromisor->getFuture()->await(); + return; } - - // If a read watcher exists we know we're already connected - if ($this->socket) { - return new Success; + $this->connectPromisor = new Deferred(); + try { + $this->socket = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout)); + } catch(\Throwable $error) { + $this->connectPromisor->error(new ConnectException( + "Connection attempt failed", + $code = 0, + $error + )); + $this->connectPromisor->getFuture()->await(); } - $this->connectPromisor = new Deferred; - $socketPromise = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout)); + foreach ($this->handlers["connect"] as $handler) { + $pipelinedCommand = $handler(); - $socketPromise->onResolve(function ($error, $socket) { - $connectPromisor = $this->connectPromisor; - $this->connectPromisor = null; - - if ($error) { - $connectPromisor->fail(new ConnectException( - "Connection attempt failed", - $code = 0, - $error - )); - - return; + if (!empty($pipelinedCommand)) { + $this->socket->write($pipelinedCommand); } - - $this->socket = $socket; - - foreach ($this->handlers["connect"] as $handler) { - $pipelinedCommand = $handler(); - - if (!empty($pipelinedCommand)) { - $this->socket->write($pipelinedCommand); - } + } + EventLoop::queue(function () { + while (null !== $chunk = yield $this->socket->read()) { + $this->parser->send($chunk); } - - asyncCall(function () { - while (null !== $chunk = yield $this->socket->read()) { - $this->parser->send($chunk); - } - - $this->close(); - }); - - $connectPromisor->resolve(); + $this->close(); }); - - return $this->connectPromisor->promise(); } private function onError(\Throwable $exception) { diff --git a/src/Parser.php b/src/Parser.php index 2a3b467..885f633 100644 --- a/src/Parser.php +++ b/src/Parser.php @@ -2,6 +2,8 @@ namespace Amp\Beanstalk; +use Closure; + class Parser { const CRLF = "\r\n"; @@ -10,11 +12,11 @@ class Parser { const ERROR_BAD_FORMAT = "BAD_FORMAT"; const ERROR_UNKNOWN_COMMAND = "UNKNOWN_COMMAND"; - private $responseCallback; - private $buffer = ""; + private Closure $responseCallback; + private string $buffer = ""; public function __construct(callable $responseCallback) { - $this->responseCallback = $responseCallback; + $this->responseCallback = $responseCallback(...); } public function send(string $bytes) { diff --git a/src/Stats/Job.php b/src/Stats/Job.php index 541ac7c..71d7c9f 100644 --- a/src/Stats/Job.php +++ b/src/Stats/Job.php @@ -2,10 +2,17 @@ namespace Amp\Beanstalk\Stats; -use Amp\Struct; - class Job { - use Struct; + + public function __get(string $property): never + { + throw new \Error("Property $property does not exist"); + } + + public function __set(string $property, mixed $value): never + { + throw new \Error("Property $property does not exist"); + } const STATE_READY = "ready"; const STATE_DELAYED = "delayed"; diff --git a/src/Stats/System.php b/src/Stats/System.php index 49067b2..d513be7 100644 --- a/src/Stats/System.php +++ b/src/Stats/System.php @@ -2,10 +2,17 @@ namespace Amp\Beanstalk\Stats; -use Amp\Struct; - class System { - use Struct; + + public function __get(string $property): never + { + throw new \Error("Property $property does not exist"); + } + + public function __set(string $property, mixed $value): never + { + throw new \Error("Property $property does not exist"); + } public function __construct(array $struct) { $this->currentJobsUrgent = (int) $struct["current-jobs-urgent"]; diff --git a/src/Stats/Tube.php b/src/Stats/Tube.php index f24b000..363849e 100644 --- a/src/Stats/Tube.php +++ b/src/Stats/Tube.php @@ -2,10 +2,17 @@ namespace Amp\Beanstalk\Stats; -use Amp\Struct; - class Tube { - use Struct; + + public function __get(string $property): never + { + throw new \Error("Property $property does not exist"); + } + + public function __set(string $property, mixed $value): never + { + throw new \Error("Property $property does not exist"); + } public function __construct(array $struct) { $this->name = $struct["name"]; diff --git a/test/BeanstalkClientConnectionClosedTest.php b/test/BeanstalkClientConnectionClosedTest.php index 069468d..96d2a93 100644 --- a/test/BeanstalkClientConnectionClosedTest.php +++ b/test/BeanstalkClientConnectionClosedTest.php @@ -4,12 +4,11 @@ use Amp\Beanstalk\BeanstalkClient; use Amp\Beanstalk\ConnectionClosedException; -use function Amp\call; -use Amp\Delayed; +use Amp\Deferred; use Amp\PHPUnit\AsyncTestCase; -use function Amp\Promise\all; use Amp\Socket\Server; use Amp\Socket\SocketException; +use Revolt\EventLoop; class BeanstalkClientConnectionClosedTest extends AsyncTestCase { /** @var Server */ @@ -18,12 +17,12 @@ class BeanstalkClientConnectionClosedTest extends AsyncTestCase { /** * @throws SocketException */ - public function setUp() { + public function setUp(): void { parent::setUp(); $this->server = Server::listen("tcp://127.0.0.1:0"); } - public function tearDown() { + public function tearDown(): void { parent::tearDown(); $this->server->close(); } @@ -34,20 +33,20 @@ public function tearDown() { * @param $reserveTimeout int|null Seconds * @param $connectionCloseTimeout int Milliseconds * @param $testFailTimeout int Milliseconds - * @return \Generator */ - public function testReserve($reserveTimeout, $connectionCloseTimeout, $testFailTimeout) { + public function testReserve(?int $reserveTimeout, int $connectionCloseTimeout, int $testFailTimeout): void { $beanstalk = new BeanstalkClient("tcp://". $this->server->getAddress()); - $connectionClosePromise = call(function ($connectionCloseTimeout) { - yield new Delayed($connectionCloseTimeout); + $suspension = EventLoop::createSuspension(); + EventLoop::delay($connectionCloseTimeout / 1000, function() use ($suspension,$connectionCloseTimeout) { $this->server->close(); - }, $connectionCloseTimeout); + $suspension->resume(); + }); $this->setTimeout($testFailTimeout); $this->expectException(ConnectionClosedException::class); - yield all([ - $beanstalk->reserve($reserveTimeout), - $connectionClosePromise - ]); + EventLoop::defer(function() use($beanstalk, $reserveTimeout){ + $beanstalk->reserve($reserveTimeout); + }); + $suspension->suspend(); } public function dataProviderReserve(): array { diff --git a/test/IntegrationTest.php b/test/IntegrationTest.php index 3d63f0b..674ca66 100644 --- a/test/IntegrationTest.php +++ b/test/IntegrationTest.php @@ -5,126 +5,105 @@ use Amp\Beanstalk\BeanstalkClient; use Amp\Beanstalk\Stats\Job; use Amp\Beanstalk\Stats\System; -use function Amp\call; -use function Amp\Promise\wait; use PHPUnit\Framework\TestCase; class IntegrationTest extends TestCase { - /** @var BeanstalkClient */ - private $beanstalk; + private BeanstalkClient $beanstalk; - public function setUp() { + public function setUp(): void { if (!\getenv("AMP_TEST_BEANSTALK_INTEGRATION") && !\getenv("TRAVIS")) { $this->markTestSkipped("You need to set AMP_TEST_BEANSTALK_INTEGRATION=1 in order to run the integration tests."); } $this->beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); - - wait(call(function () { - /** @var System $stats */ - $stats = yield $this->beanstalk->getSystemStats(); - for ($jobId = 1; $jobId <= $stats->totalJobs; $jobId++) { - yield $this->beanstalk->delete($jobId); - } - })); + $stats = $this->beanstalk->getSystemStats(); + for ($jobId = 1; $jobId <= $stats->totalJobs; $jobId++) { + $this->beanstalk->delete($jobId); + } } public function testPut() { - wait(call(function () { - /** @var System $statsBefore */ - $statsBefore = yield $this->beanstalk->getSystemStats(); + $statsBefore = $this->beanstalk->getSystemStats(); - $jobId = yield $this->beanstalk->put("hi"); - $this->assertInternalType("int", $jobId); + $jobId = $this->beanstalk->put("hi"); + $this->assertIsInt($jobId); - /** @var Job $jobStats */ - $jobStats = yield $this->beanstalk->getJobStats($jobId); + $jobStats = $this->beanstalk->getJobStats($jobId); - $this->assertSame($jobId, $jobStats->id); - $this->assertSame(0, $jobStats->priority); - $this->assertSame(0, $jobStats->delay); + $this->assertSame($jobId, $jobStats->id); + $this->assertSame(0, $jobStats->priority); + $this->assertSame(0, $jobStats->delay); - /** @var System $statsAfter */ - $statsAfter = yield $this->beanstalk->getSystemStats(); + $statsAfter = $this->beanstalk->getSystemStats(); - $this->assertSame($statsBefore->cmdPut + 1, $statsAfter->cmdPut); - })); + $this->assertSame($statsBefore->cmdPut + 1, $statsAfter->cmdPut); } public function testPeek() { - wait(call(function () { - $jobId = yield $this->beanstalk->put('I am ready'); - $this->assertInternalType("int", $jobId); + $jobId = $this->beanstalk->put('I am ready'); + $this->assertIsInt($jobId); - $peekedJob = yield $this->beanstalk->peek($jobId); - $this->assertEquals('I am ready', $peekedJob); + $peekedJob = $this->beanstalk->peek($jobId); + $this->assertEquals('I am ready', $peekedJob); - $peekedJob = yield $this->beanstalk->peekReady(); - $this->assertEquals('I am ready', $peekedJob); + $peekedJob = $this->beanstalk->peekReady(); + $this->assertEquals('I am ready', $peekedJob); - list($jobId) = yield $this->beanstalk->reserve(); - $buried = yield $this->beanstalk->bury($jobId); - $this->assertEquals(1, $buried); - $peekedJob = yield $this->beanstalk->peekBuried(); - $this->assertEquals('I am ready', $peekedJob); + list($jobId) = $this->beanstalk->reserve(); + $buried = $this->beanstalk->bury($jobId); + $this->assertEquals(1, $buried); + $peekedJob = $this->beanstalk->peekBuried(); + $this->assertEquals('I am ready', $peekedJob); - $jobId = yield $this->beanstalk->put('I am delayed', 60, 60); - $peekedJob = yield $this->beanstalk->peekDelayed(); - $this->assertEquals('I am delayed', $peekedJob); - })); + $jobId = $this->beanstalk->put('I am delayed', 60, 60); + $peekedJob = $this->beanstalk->peekDelayed(); + $this->assertEquals('I am delayed', $peekedJob); } public function testKickJob() { - wait(call(function () { - $jobId = yield $this->beanstalk->put("hi"); - $this->assertInternalType("int", $jobId); + $jobId = $this->beanstalk->put("hi"); + $this->assertIsInt($jobId); - $kicked = yield $this->beanstalk->kickJob($jobId); - $this->assertFalse($kicked); + $kicked = $this->beanstalk->kickJob($jobId); + $this->assertFalse($kicked); - list($jobId, ) = yield $this->beanstalk->reserve(); - $buried = yield $this->beanstalk->bury($jobId); - $this->assertEquals(1, $buried); - /** @var Job $jobStats */ - $jobStats = yield $this->beanstalk->getJobStats($jobId); - $this->assertEquals('buried', $jobStats->state); + list($jobId, ) = $this->beanstalk->reserve(); + $buried = $this->beanstalk->bury($jobId); + $this->assertEquals(1, $buried); + $jobStats = $this->beanstalk->getJobStats($jobId); + $this->assertEquals('buried', $jobStats->state); - $kicked = yield $this->beanstalk->kickJob($jobId); - $this->assertTrue($kicked); - })); + $kicked = $this->beanstalk->kickJob($jobId); + $this->assertTrue($kicked); } public function testKick() { - wait(call(function () { - for ($i = 0; $i < 10; $i++) { - yield $this->beanstalk->put("Job $i"); - } - for ($i = 0; $i < 8; $i++) { - list($jobId, ) = yield $this->beanstalk->reserve(); - $buried = yield $this->beanstalk->bury($jobId); - $this->assertEquals(1, $buried); - } - - $kicked = yield $this->beanstalk->kick(4); - $this->assertEquals(4, $kicked); - - $kicked = yield $this->beanstalk->kick(10); - $this->assertEquals(4, $kicked); - - $kicked = yield $this->beanstalk->kick(1); - $this->assertEquals(0, $kicked); - })); + for ($i = 0; $i < 10; $i++) { + $this->beanstalk->put("Job $i"); + } + for ($i = 0; $i < 8; $i++) { + list($jobId, ) = $this->beanstalk->reserve(); + $buried = $this->beanstalk->bury($jobId); + $this->assertEquals(1, $buried); + } + + $kicked = $this->beanstalk->kick(4); + $this->assertEquals(4, $kicked); + + $kicked = $this->beanstalk->kick(10); + $this->assertEquals(4, $kicked); + + $kicked = $this->beanstalk->kick(1); + $this->assertEquals(0, $kicked); } public function testReservedJobShouldHaveTheSamePayloadAsThePutPayload() { - wait(call(function () { - $jobId = yield $this->beanstalk->put(str_repeat('*', 65535)); + $jobId = $this->beanstalk->put(str_repeat('*', 65535)); - yield $this->beanstalk->watch('default'); - list($reservedJobId, $reservedJobPayload) = yield $this->beanstalk->reserve(); + $this->beanstalk->watch('default'); + list($reservedJobId, $reservedJobPayload) = $this->beanstalk->reserve(); - $this->assertSame($jobId, $reservedJobId); - $this->assertSame(65535, strlen($reservedJobPayload)); - })); + $this->assertSame($jobId, $reservedJobId); + $this->assertSame(65535, strlen($reservedJobPayload)); } } diff --git a/test/ParserTest.php b/test/ParserTest.php index ebf5d24..c977517 100644 --- a/test/ParserTest.php +++ b/test/ParserTest.php @@ -2,33 +2,41 @@ namespace Amp\Beanstalk\Test; +use Amp\Beanstalk\BadFormatException; +use Amp\Beanstalk\InternalErrorException; +use Amp\Beanstalk\OutOfMemoryException; use Amp\Beanstalk\Parser; +use Amp\Beanstalk\UnknownCommandException; +use Exception; use PHPUnit\Framework\TestCase; class ParserTest extends TestCase { - protected $parserToTest; + protected Parser $parserToTest; - protected $parsedElements; + /** + * @var null|list|Exception + */ + protected null|array|Exception $parsedElements = null; - public function setUp() { + public function setUp(): void { $this->parserToTest = new Parser(function ($result) { $this->parsedElements = $result; }); } - public function testParsesPartialResponseCorrectly() { + public function testParsesPartialResponseCorrectly(): void { $this->parserToTest->send("OK 5\r\nhello\r"); $this->assertNull($this->parsedElements); $this->parserToTest->send("\n"); $this->assertSame(["OK", "hello"], $this->parsedElements); } - public function testParsesFound() { + public function testParsesFound(): void { $this->parserToTest->send("FOUND 5 5\r\nhello\r\n"); $this->assertSame(["FOUND", 5, 'hello'], $this->parsedElements); } - public function testParsesReserved() { + public function testParsesReserved(): void { $this->parserToTest->send("RESERVED 2 30\r\n"); $this->assertNull($this->parsedElements); $this->parserToTest->reset(); @@ -36,7 +44,7 @@ public function testParsesReserved() { $this->assertSame(["RESERVED", 5, 'hello'], $this->parsedElements); } - public function testResetBuffer() { + public function testResetBuffer(): void { $this->parserToTest->send("OK 7\r\nmorn"); $this->assertNull($this->parsedElements); $this->parserToTest->send("ing\r\n"); @@ -54,28 +62,29 @@ public function testResetBuffer() { /** * @dataProvider dataProviderTestExceptions */ - public function testParserExceptions($buffer, $exceptionExpected) { + public function testParserExceptions(string $buffer, string $exceptionExpected): void { $this->parserToTest->send($buffer); $this->assertInstanceOf($exceptionExpected, $this->parsedElements); } - public function dataProviderTestExceptions() { + public function dataProviderTestExceptions(): array + { return [ [ "OUT_OF_MEMORY\r\nhello\r", - \Amp\Beanstalk\OutOfMemoryException::class + OutOfMemoryException::class ], [ "INTERNAL_ERROR\r\nhello\r", - \Amp\Beanstalk\InternalErrorException::class + InternalErrorException::class ], [ "BAD_FORMAT\r\nhello\r", - \Amp\Beanstalk\BadFormatException::class + BadFormatException::class ], [ "UNKNOWN_COMMAND\r\nhello\r", - \Amp\Beanstalk\UnknownCommandException::class + UnknownCommandException::class ] ]; } From 127ff8a8c6646695dac66229b078eeaea2e6fd17 Mon Sep 17 00:00:00 2001 From: Katie Volz Date: Wed, 15 Dec 2021 15:06:08 -0500 Subject: [PATCH 2/4] Working version for v1 --- .gitignore | 3 +- .php_cs.dist => .php-cs-fixer.dist.php | 6 +- composer.json | 2 +- src/BeanstalkClient.php | 56 ++----- src/Connection.php | 153 ++++++++++--------- src/Stats/Job.php | 7 +- src/Stats/System.php | 7 +- src/Stats/Tube.php | 7 +- test/BeanstalkClientConnectionClosedTest.php | 9 +- test/IntegrationTest.php | 2 - test/ParserTest.php | 3 +- 11 files changed, 112 insertions(+), 143 deletions(-) rename .php_cs.dist => .php-cs-fixer.dist.php (90%) diff --git a/.gitignore b/.gitignore index 5293e46..535e2a4 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ /composer.lock /vendor/ /protocol.txt -/.php_cs.cache +/.php-cs-fixer.cache +/.phpunit.result.cache diff --git a/.php_cs.dist b/.php-cs-fixer.dist.php similarity index 90% rename from .php_cs.dist rename to .php-cs-fixer.dist.php index 106b620..9758223 100644 --- a/.php_cs.dist +++ b/.php-cs-fixer.dist.php @@ -1,6 +1,6 @@ setRiskyAllowed(true) ->setRules([ "@PSR1" => true, @@ -13,7 +13,7 @@ "cast_spaces" => true, "combine_consecutive_unsets" => true, "function_to_constant" => true, - "no_multiline_whitespace_before_semicolons" => true, + "multiline_whitespace_before_semicolons" => true, "no_unused_imports" => true, "no_useless_else" => true, "no_useless_return" => true, @@ -27,7 +27,7 @@ "php_unit_fqcn_annotation" => true, "phpdoc_summary" => true, "phpdoc_types" => true, - "psr4" => true, + "psr_autoloading" => true, "return_type_declaration" => ["space_before" => "none"], "short_scalar_cast" => true, "single_blank_line_before_namespace" => true, diff --git a/composer.json b/composer.json index ef808a5..e0c84bc 100644 --- a/composer.json +++ b/composer.json @@ -28,7 +28,7 @@ "symfony/yaml": "^3.3|^4|^5|^6" }, "require-dev": { - "amphp/phpunit-util": "^2", + "amphp/phpunit-util": "^3", "phpunit/phpunit": "^9", "friendsofphp/php-cs-fixer": "^3" }, diff --git a/src/BeanstalkClient.php b/src/BeanstalkClient.php index 353ddff..b8a8a4b 100644 --- a/src/BeanstalkClient.php +++ b/src/BeanstalkClient.php @@ -5,15 +5,10 @@ use Amp\Beanstalk\Stats\Job; use Amp\Beanstalk\Stats\System; use Amp\Beanstalk\Stats\Tube; -use Amp\Deferred; use Amp\Uri\Uri; use Symfony\Component\Yaml\Yaml; -use Throwable; class BeanstalkClient { - /** @var Deferred[] */ - private array $deferreds; - private Connection $connection; private ?string $tube; @@ -21,46 +16,20 @@ class BeanstalkClient { public function __construct(string $uri) { $this->applyUri($uri); - $this->deferreds = []; - $this->connection = new Connection($uri); - $this->connection->addEventHandler("response", function ($response) { - $deferred = array_shift($this->deferreds); - - if ($response instanceof Throwable) { - $deferred->error($response); - } else { - $deferred->complete($response); - } - }); - - $this->connection->addEventHandler("error", function (Throwable $error = null) { - if ($error) { - $this->failAllDeferreds($error); - } - }); - $this->connection->addEventHandler("close", function () { - $this->failAllDeferreds(new ConnectionClosedException("Connection closed")); - }); if ($this->tube) { - $this->connection->addEventHandler("connect", function () { - array_unshift($this->deferreds, new Deferred); - - return "use $this->tube\r\n"; - }); + $this->send("use $this->tube\r\n"); } } - private function applyUri(string $uri) { + private function applyUri(string $uri): void { $this->tube = (new Uri($uri))->getQueryParameter("tube"); } private function send(string $message): array { - $this->deferreds[] = $deferred = new Deferred; - $this->connection->send($message); - return $deferred->getFuture()->await(); + return $this->connection->awaitResponse(); } public function use(string $tube): void { @@ -170,7 +139,7 @@ public function kick(int $count): int { list($type) = $response; return match ($type) { - "KICKED" => (int)$response[1], + "KICKED" => (int) $response[1], default => throw new BeanstalkException("Unknown response: $type"), }; } @@ -218,7 +187,11 @@ public function ignore(string $tube): int { } public function quit(): void { - $this->send("quit\r\n"); + try { + $this->send("quit\r\n"); + } catch (ConnectionClosedException) { + // Okay + } } public function getJobStats(int $id): Job { @@ -303,7 +276,7 @@ public function getUsedTube(): string { } } - public function peek(int $id): Promise { + public function peek(int $id): string { $payload = "peek $id\r\n"; $response = $this->send($payload); @@ -345,13 +318,4 @@ private function peekInState(string $state): string { default => throw new BeanstalkException("Unknown response: " . $type), }; } - - private function failAllDeferreds(Throwable $error): void { - // Fail any outstanding promises - while ($this->deferreds) { - /** @var Deferred $deferred */ - $deferred = array_shift($this->deferreds); - $deferred->error($error); - } - } } diff --git a/src/Connection.php b/src/Connection.php index fa18042..32739d4 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -2,8 +2,9 @@ namespace Amp\Beanstalk; -use Amp\Deferred; -use Revolt\EventLoop; +use function Amp\async; +use Amp\DeferredFuture; +use Amp\Future; use function Amp\Socket\connect; use Amp\Socket\ConnectContext; use Amp\Socket\Socket; @@ -19,27 +20,46 @@ class Connection { /** @var string */ private string $uri; - /** @var callable[][] */ - private array $handlers; - private ?Deferred $connectPromisor = null; +// /** +// * @var DeferredFuture[] +// * On connect, all handlers are triggered and removed +// */ +// private array $connectHandlers = []; + + /** + * @var DeferredFuture[] + * On response, the top handler is triggered and removed + */ + private array $responseHandlers = []; + +// /** +// * @var DeferredFuture[] +// * On error, all handlers are triggered and removed +// */ +// private array $errorHandlers = []; + +// /** +// * @var DeferredFuture[] +// * On close, all handlers are triggered and removed +// */ +// private array $closeHandlers = []; + + private ?Future $connectFuture = null; public function __construct(string $uri) { $this->applyUri($uri); - $this->handlers = [ - "connect" => [], - "response" => [], - "error" => [], - "close" => [], - ]; $this->parser = new Parser(function ($response) { - foreach ($this->handlers["response"] as $handler) { - $handler($response); - } - - if ($response instanceof BadFormatException) { - $this->onError($response); - } +// var_dump("Parser response"); + $handler = array_shift($this->responseHandlers); +// var_dump("complete " . spl_object_id($handler)); + $handler->complete($response); +// if ($response instanceof BadFormatException) { +// foreach($this->errorHandlers as $errorHandler) { +// $errorHandler->complete($response); +// } +// $this->errorHandlers = []; +// } }); } @@ -50,75 +70,70 @@ private function applyUri($uri) { $this->uri = $uri->getScheme() . "://" . $uri->getHost() . ":" . $uri->getPort(); } - public function addEventHandler($events, callable $callback) { - $events = (array) $events; - - foreach ($events as $event) { - if (!isset($this->handlers[$event])) { - throw new \Error("Unknown event: " . $event); - } - - $this->handlers[$event][] = $callback; - } + public function awaitResponse(): mixed { + $df = ($this->responseHandlers[] = new DeferredFuture()); + $val = $df->getFuture()->await(); + return $val; } +// public function awaitError(): mixed +// { +// return ($this->errorHandlers[] = new DeferredFuture())->getFuture()->await(); +// } + public function send(string $payload): void { +// var_dump(__LINE__); $this->connect(); +// var_dump(__LINE__); $this->socket->write($payload); +// var_dump(__LINE__); } - private function connect(): void - { - if($this->connectPromisor) { - $this->connectPromisor->getFuture()->await(); - return; - } - $this->connectPromisor = new Deferred(); - try { - $this->socket = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout)); - } catch(\Throwable $error) { - $this->connectPromisor->error(new ConnectException( - "Connection attempt failed", - $code = 0, - $error - )); - $this->connectPromisor->getFuture()->await(); - } - - foreach ($this->handlers["connect"] as $handler) { - $pipelinedCommand = $handler(); - - if (!empty($pipelinedCommand)) { - $this->socket->write($pipelinedCommand); - } - } - EventLoop::queue(function () { - while (null !== $chunk = yield $this->socket->read()) { - $this->parser->send($chunk); + private function connect(): void { + $this->connectFuture ??= async(function () { +// var_dump("do connect"); + try { + $this->socket = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout)); +// var_dump("connect done"); + } catch (\Throwable $error) { +// var_dump("connect fail"); + throw new ConnectException( + "Connection attempt failed", + $code = 0, + $error + ); } - $this->close(); - }); - } - private function onError(\Throwable $exception) { - foreach ($this->handlers["error"] as $handler) { - $handler($exception); - } - - $this->close(); +// foreach ($this->handlers["connect"] as $handler) { +// $pipelinedCommand = $handler(); +// +// if (!empty($pipelinedCommand)) { +// $this->socket->write($pipelinedCommand); +// } +// } + async(function () { + while (null !== $chunk = $this->socket->read()) { + $this->parser->send($chunk); + } + $this->close(); + }); + }); +// var_dump("connect()"); + $this->connectFuture->await(); +// var_dump("connect() awaited"); } public function close() { + // Fail all response handlers + while ($responseHandler = array_shift($this->responseHandlers)) { + $responseHandler->error(new ConnectionClosedException()); + } $this->parser->reset(); if ($this->socket) { $this->socket->close(); $this->socket = null; } - - foreach ($this->handlers["close"] as $handler) { - $handler(); - } } public function __destruct() { diff --git a/src/Stats/Job.php b/src/Stats/Job.php index 71d7c9f..2b0fd84 100644 --- a/src/Stats/Job.php +++ b/src/Stats/Job.php @@ -3,14 +3,11 @@ namespace Amp\Beanstalk\Stats; class Job { - - public function __get(string $property): never - { + public function __get(string $property): never { throw new \Error("Property $property does not exist"); } - public function __set(string $property, mixed $value): never - { + public function __set(string $property, mixed $value): never { throw new \Error("Property $property does not exist"); } diff --git a/src/Stats/System.php b/src/Stats/System.php index d513be7..0eb0dfe 100644 --- a/src/Stats/System.php +++ b/src/Stats/System.php @@ -3,14 +3,11 @@ namespace Amp\Beanstalk\Stats; class System { - - public function __get(string $property): never - { + public function __get(string $property): never { throw new \Error("Property $property does not exist"); } - public function __set(string $property, mixed $value): never - { + public function __set(string $property, mixed $value): never { throw new \Error("Property $property does not exist"); } diff --git a/src/Stats/Tube.php b/src/Stats/Tube.php index 363849e..0fe2996 100644 --- a/src/Stats/Tube.php +++ b/src/Stats/Tube.php @@ -3,14 +3,11 @@ namespace Amp\Beanstalk\Stats; class Tube { - - public function __get(string $property): never - { + public function __get(string $property): never { throw new \Error("Property $property does not exist"); } - public function __set(string $property, mixed $value): never - { + public function __set(string $property, mixed $value): never { throw new \Error("Property $property does not exist"); } diff --git a/test/BeanstalkClientConnectionClosedTest.php b/test/BeanstalkClientConnectionClosedTest.php index 96d2a93..d004944 100644 --- a/test/BeanstalkClientConnectionClosedTest.php +++ b/test/BeanstalkClientConnectionClosedTest.php @@ -4,8 +4,8 @@ use Amp\Beanstalk\BeanstalkClient; use Amp\Beanstalk\ConnectionClosedException; -use Amp\Deferred; use Amp\PHPUnit\AsyncTestCase; +use Amp\PHPUnit\UnhandledException; use Amp\Socket\Server; use Amp\Socket\SocketException; use Revolt\EventLoop; @@ -37,13 +37,14 @@ public function tearDown(): void { public function testReserve(?int $reserveTimeout, int $connectionCloseTimeout, int $testFailTimeout): void { $beanstalk = new BeanstalkClient("tcp://". $this->server->getAddress()); $suspension = EventLoop::createSuspension(); - EventLoop::delay($connectionCloseTimeout / 1000, function() use ($suspension,$connectionCloseTimeout) { + EventLoop::delay($connectionCloseTimeout / 1000, function () use ($suspension, $connectionCloseTimeout) { $this->server->close(); $suspension->resume(); }); $this->setTimeout($testFailTimeout); - $this->expectException(ConnectionClosedException::class); - EventLoop::defer(function() use($beanstalk, $reserveTimeout){ + $this->expectException(UnhandledException::class); + $this->expectExceptionMessage(ConnectionClosedException::class); + EventLoop::defer(function () use ($beanstalk, $reserveTimeout) { $beanstalk->reserve($reserveTimeout); }); $suspension->suspend(); diff --git a/test/IntegrationTest.php b/test/IntegrationTest.php index 674ca66..67e7e21 100644 --- a/test/IntegrationTest.php +++ b/test/IntegrationTest.php @@ -3,8 +3,6 @@ namespace Amp\Beanstalk\Test; use Amp\Beanstalk\BeanstalkClient; -use Amp\Beanstalk\Stats\Job; -use Amp\Beanstalk\Stats\System; use PHPUnit\Framework\TestCase; class IntegrationTest extends TestCase { diff --git a/test/ParserTest.php b/test/ParserTest.php index c977517..95064a5 100644 --- a/test/ParserTest.php +++ b/test/ParserTest.php @@ -67,8 +67,7 @@ public function testParserExceptions(string $buffer, string $exceptionExpected): $this->assertInstanceOf($exceptionExpected, $this->parsedElements); } - public function dataProviderTestExceptions(): array - { + public function dataProviderTestExceptions(): array { return [ [ "OUT_OF_MEMORY\r\nhello\r", From 1aad78f11bc81eadedbe0d889fc8ff75101f1675 Mon Sep 17 00:00:00 2001 From: Katie Volz Date: Wed, 15 Dec 2021 15:15:33 -0500 Subject: [PATCH 3/4] Cleanup var_dump()s and allow PHP 8.0 compatibility --- composer.json | 1 - src/Connection.php | 58 +----------- src/Parser.php | 2 +- src/Stats/Job.php | 32 +++---- src/Stats/System.php | 96 ++++++++++---------- src/Stats/Tube.php | 32 +++---- test/BeanstalkClientConnectionClosedTest.php | 3 +- 7 files changed, 86 insertions(+), 138 deletions(-) diff --git a/composer.json b/composer.json index e0c84bc..e3e17d0 100644 --- a/composer.json +++ b/composer.json @@ -21,7 +21,6 @@ "minimum-stability": "dev", "prefer-stable": true, "require": { - "php": "^8.1", "amphp/amp": "^3", "amphp/socket": "^2", "amphp/uri": "^0.1", diff --git a/src/Connection.php b/src/Connection.php index 32739d4..4dce606 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -17,49 +17,22 @@ class Connection { private ?Socket $socket = null; - /** @var string */ private string $uri; -// /** -// * @var DeferredFuture[] -// * On connect, all handlers are triggered and removed -// */ -// private array $connectHandlers = []; - /** * @var DeferredFuture[] * On response, the top handler is triggered and removed */ private array $responseHandlers = []; -// /** -// * @var DeferredFuture[] -// * On error, all handlers are triggered and removed -// */ -// private array $errorHandlers = []; - -// /** -// * @var DeferredFuture[] -// * On close, all handlers are triggered and removed -// */ -// private array $closeHandlers = []; - private ?Future $connectFuture = null; public function __construct(string $uri) { $this->applyUri($uri); $this->parser = new Parser(function ($response) { -// var_dump("Parser response"); $handler = array_shift($this->responseHandlers); -// var_dump("complete " . spl_object_id($handler)); $handler->complete($response); -// if ($response instanceof BadFormatException) { -// foreach($this->errorHandlers as $errorHandler) { -// $errorHandler->complete($response); -// } -// $this->errorHandlers = []; -// } }); } @@ -71,46 +44,25 @@ private function applyUri($uri) { } public function awaitResponse(): mixed { - $df = ($this->responseHandlers[] = new DeferredFuture()); - $val = $df->getFuture()->await(); - return $val; + return ($this->responseHandlers[] = new DeferredFuture())->getFuture()->await(); } -// public function awaitError(): mixed -// { -// return ($this->errorHandlers[] = new DeferredFuture())->getFuture()->await(); -// } - public function send(string $payload): void { -// var_dump(__LINE__); $this->connect(); -// var_dump(__LINE__); $this->socket->write($payload); -// var_dump(__LINE__); } private function connect(): void { $this->connectFuture ??= async(function () { -// var_dump("do connect"); try { $this->socket = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout)); -// var_dump("connect done"); } catch (\Throwable $error) { -// var_dump("connect fail"); throw new ConnectException( - "Connection attempt failed", - $code = 0, - $error + message: "Connection attempt failed", + code: 0, + previous: $error ); } - -// foreach ($this->handlers["connect"] as $handler) { -// $pipelinedCommand = $handler(); -// -// if (!empty($pipelinedCommand)) { -// $this->socket->write($pipelinedCommand); -// } -// } async(function () { while (null !== $chunk = $this->socket->read()) { $this->parser->send($chunk); @@ -118,9 +70,7 @@ private function connect(): void { $this->close(); }); }); -// var_dump("connect()"); $this->connectFuture->await(); -// var_dump("connect() awaited"); } public function close() { diff --git a/src/Parser.php b/src/Parser.php index 885f633..9e5c17d 100644 --- a/src/Parser.php +++ b/src/Parser.php @@ -16,7 +16,7 @@ class Parser { private string $buffer = ""; public function __construct(callable $responseCallback) { - $this->responseCallback = $responseCallback(...); + $this->responseCallback = fn(...$args) => $responseCallback(...$args); } public function send(string $bytes) { diff --git a/src/Stats/Job.php b/src/Stats/Job.php index 2b0fd84..312f13f 100644 --- a/src/Stats/Job.php +++ b/src/Stats/Job.php @@ -3,11 +3,11 @@ namespace Amp\Beanstalk\Stats; class Job { - public function __get(string $property): never { + public function __get(string $property): void { throw new \Error("Property $property does not exist"); } - public function __set(string $property, mixed $value): never { + public function __set(string $property, mixed $value): void { throw new \Error("Property $property does not exist"); } @@ -33,18 +33,18 @@ public function __construct(array $struct) { $this->kicks = (int) $struct["kicks"]; } - public $id; - public $tube; - public $state; - public $priority; - public $age; - public $delay; - public $ttr; - public $timeLeft; - public $file; - public $reserves; - public $timeouts; - public $releases; - public $buries; - public $kicks; + public int $id; + public mixed $tube; + public mixed $state; + public int $priority; + public int $age; + public int $delay; + public int $ttr; + public int $timeLeft; + public mixed $file; + public int $reserves; + public int $timeouts; + public int $releases; + public int $buries; + public int $kicks; } diff --git a/src/Stats/System.php b/src/Stats/System.php index 0eb0dfe..e3f2e9a 100644 --- a/src/Stats/System.php +++ b/src/Stats/System.php @@ -3,11 +3,11 @@ namespace Amp\Beanstalk\Stats; class System { - public function __get(string $property): never { + public function __get(string $property) { throw new \Error("Property $property does not exist"); } - public function __set(string $property, mixed $value): never { + public function __set(string $property, mixed $value) { throw new \Error("Property $property does not exist"); } @@ -60,50 +60,50 @@ public function __construct(array $struct) { $this->hostname = $struct["hostname"]; } - public $currentJobsUrgent; - public $currentJobsReady; - public $currentJobsReserved; - public $currentJobsDelayed; - public $currentJobsBuried; - public $cmdPut; - public $cmdPeek; - public $cmdPeekReady; - public $cmdPeekDelayed; - public $cmdPeekBuried; - public $cmdReserve; - public $cmdUse; - public $cmdWatch; - public $cmdIgnore; - public $cmdDelete; - public $cmdRelease; - public $cmdBury; - public $cmdKick; - public $cmdStats; - public $cmdStatsJob; - public $cmdStatsTube; - public $cmdListTubes; - public $cmdListTubeUsed; - public $cmdListTubesWatched; - public $cmdPauseTube; - public $jobTimeouts; - public $totalJobs; - public $maxJobSize; - public $currentTubes; - public $currentConnections; - public $currentProducers; - public $currentWorkers; - public $currentWaiting; - public $totalConnections; - public $pid; - public $version; - public $rusageUtime; - public $rusageStime; - public $uptime; - public $binlogOldestIndex; - public $binlogCurrentIndex; - public $binlogMaxSize; - public $binlogRecordsWritten; - public $binlogRecordsMigrated; - public $id; - public $hostname; + public int $currentJobsUrgent; + public int $currentJobsReady; + public int $currentJobsReserved; + public int $currentJobsDelayed; + public int $currentJobsBuried; + public int $cmdPut; + public int $cmdPeek; + public int $cmdPeekReady; + public int $cmdPeekDelayed; + public int $cmdPeekBuried; + public int $cmdReserve; + public int $cmdUse; + public int $cmdWatch; + public int $cmdIgnore; + public int $cmdDelete; + public int $cmdRelease; + public int $cmdBury; + public int $cmdKick; + public int $cmdStats; + public int $cmdStatsJob; + public int $cmdStatsTube; + public int $cmdListTubes; + public int $cmdListTubeUsed; + public int $cmdListTubesWatched; + public int $cmdPauseTube; + public int $jobTimeouts; + public int $totalJobs; + public int $maxJobSize; + public int $currentTubes; + public int $currentConnections; + public int $currentProducers; + public int $currentWorkers; + public int $currentWaiting; + public int $totalConnections; + public int $pid; + public mixed $version; + public float $rusageUtime; + public float $rusageStime; + public int $uptime; + public int $binlogOldestIndex; + public int $binlogCurrentIndex; + public int $binlogMaxSize; + public int $binlogRecordsWritten; + public int $binlogRecordsMigrated; + public mixed $id; + public mixed $hostname; } diff --git a/src/Stats/Tube.php b/src/Stats/Tube.php index 0fe2996..7751941 100644 --- a/src/Stats/Tube.php +++ b/src/Stats/Tube.php @@ -3,11 +3,11 @@ namespace Amp\Beanstalk\Stats; class Tube { - public function __get(string $property): never { + public function __get(string $property): void { throw new \Error("Property $property does not exist"); } - public function __set(string $property, mixed $value): never { + public function __set(string $property, mixed $value): void { throw new \Error("Property $property does not exist"); } @@ -28,18 +28,18 @@ public function __construct(array $struct) { $this->pauseTimeLeft = (int) $struct["pause-time-left"]; } - public $name; - public $currentJobsUrgent; - public $currentJobsReady; - public $currentJobsReserved; - public $currentJobsDelayed; - public $currentJobsBuried; - public $totalJobs; - public $currentUsing; - public $currentWaiting; - public $currentWatching; - public $pause; - public $cmdDelete; - public $cmdPauseTube; - public $pauseTimeLeft; + public mixed $name; + public int $currentJobsUrgent; + public int $currentJobsReady; + public int $currentJobsReserved; + public int $currentJobsDelayed; + public int $currentJobsBuried; + public int $totalJobs; + public int $currentUsing; + public int $currentWaiting; + public int $currentWatching; + public int $pause; + public int $cmdDelete; + public int $cmdPauseTube; + public int $pauseTimeLeft; } diff --git a/test/BeanstalkClientConnectionClosedTest.php b/test/BeanstalkClientConnectionClosedTest.php index d004944..74fde38 100644 --- a/test/BeanstalkClientConnectionClosedTest.php +++ b/test/BeanstalkClientConnectionClosedTest.php @@ -11,8 +11,7 @@ use Revolt\EventLoop; class BeanstalkClientConnectionClosedTest extends AsyncTestCase { - /** @var Server */ - private $server; + private Server $server; /** * @throws SocketException From 05c55ee50089f6a0137d5f87275e0c16cd36da43 Mon Sep 17 00:00:00 2001 From: Katie Volz Date: Mon, 17 Jan 2022 14:26:11 -0500 Subject: [PATCH 4/4] Fix references to names changed in Amp beta --- test/BeanstalkClientConnectionClosedTest.php | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/BeanstalkClientConnectionClosedTest.php b/test/BeanstalkClientConnectionClosedTest.php index 74fde38..2792ac2 100644 --- a/test/BeanstalkClientConnectionClosedTest.php +++ b/test/BeanstalkClientConnectionClosedTest.php @@ -6,19 +6,20 @@ use Amp\Beanstalk\ConnectionClosedException; use Amp\PHPUnit\AsyncTestCase; use Amp\PHPUnit\UnhandledException; -use Amp\Socket\Server; +use Amp\Socket\ResourceSocketServer; use Amp\Socket\SocketException; use Revolt\EventLoop; +use function Amp\Socket\listen; class BeanstalkClientConnectionClosedTest extends AsyncTestCase { - private Server $server; + private ResourceSocketServer $server; /** * @throws SocketException */ public function setUp(): void { parent::setUp(); - $this->server = Server::listen("tcp://127.0.0.1:0"); + $this->server = listen("tcp://127.0.0.1:0"); } public function tearDown(): void {