diff --git a/.gitignore b/.gitignore index 5293e46..29c90e1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ /vendor/ /protocol.txt /.php_cs.cache +/.idea/ diff --git a/composer.json b/composer.json index 3f062c9..bab5ae2 100644 --- a/composer.json +++ b/composer.json @@ -14,19 +14,19 @@ } }, "require": { - "amphp/amp": "^2", - "amphp/socket": "^1.0", + "amphp/amp": "^3", + "amphp/socket": "^2", "amphp/uri": "^0.1", "symfony/yaml": "^3.3|^4|^5" }, "require-dev": { - "amphp/phpunit-util": "^1", - "phpunit/phpunit": "^6", + "amphp/phpunit-util": "^v3.0.0", + "phpunit/phpunit": "^9", "friendsofphp/php-cs-fixer": "^2.3" }, "config": { "platform": { - "php": "7.1.0" + "php": "8.1.0" } } } diff --git a/docs/index.md b/docs/index.md index 139e40f..2eb054d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -20,7 +20,7 @@ $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); // can connect to the server with an additional tube query parameter. // $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300?tube=foobar"); -$systemStats = yield $beanstalk->getSystemStats(); +$systemStats = $beanstalk->getSystemStats()->await(); $readyJobs = $systemStats->currentJobsReady; ``` diff --git a/docs/jobs.md b/docs/jobs.md index 0b23d4f..ff73b52 100644 --- a/docs/jobs.md +++ b/docs/jobs.md @@ -12,7 +12,7 @@ permalink: /jobs $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); // This step not required if you included a tube query parameter when creating the client -$beanstalk->use('foobar'); +$beanstalk->use('foobar')->await(); $payload = json_encode([ "job" => bin2hex(random_bytes(16)), @@ -20,7 +20,7 @@ $payload = json_encode([ "path" => "/path/to/image.png" ]); -$jobId = yield $beanstalk->put($payload); +$jobId = $beanstalk->put($payload)->await(); ``` ## Pulling Jobs off a Queue @@ -28,18 +28,18 @@ $jobId = yield $beanstalk->put($payload); ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$beanstalk->watch('foobar'); +$beanstalk->watch('foobar')->await(); -while([$jobId, $jobData] = yield $beanstalk->reserve()) { +while([$jobId, $jobData] = $beanstalk->reserve()->await()) { // Work the job using $jobData // Once you're finished, delete the job - yield $beanstalk->delete($jobId); + $beanstalk->delete($jobId)->await(); // If there was an error, you can bury the job for inspection later - yield $beanstalk->bury($jobId); + $beanstalk->bury($jobId)->await(); // Of you can release the job, to be picked up by a new worker - yield $beanstalk->release($jobId); + $beanstalk->release($jobId)->await(); } ``` @@ -48,12 +48,12 @@ while([$jobId, $jobData] = yield $beanstalk->reserve()) { ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$beanstalk->watch('foobar'); +$beanstalk->watch('foobar')->await(); -while([$jobId, $jobData] = yield $beanstalk->reserve()) { +while([$jobId, $jobData] = $beanstalk->reserve()->await()) { // Work the job // If you still need time to work the job, you can utilize the touch command - yield $beantstalk->touch($jobId); + $beantstalk->touch($jobId)->await(); } ``` @@ -62,6 +62,6 @@ while([$jobId, $jobData] = yield $beanstalk->reserve()) { ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$jobStats = yield $beanstalk->getJobStats($jobId = 42); +$jobStats = $beanstalk->getJobStats($jobId = 42)->await(); $jobStats->state; // ready ``` diff --git a/docs/misc.md b/docs/misc.md index 18cc189..695b7ce 100644 --- a/docs/misc.md +++ b/docs/misc.md @@ -13,7 +13,7 @@ To see what stats are available for the system, checkout the [System](classes/sy ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$stats = yield $beanstalk->getSystemStats(); +$stats = $beanstalk->getSystemStats()->await(); ``` ## Close the Connection diff --git a/docs/tubes.md b/docs/tubes.md index 588a257..79b07b8 100644 --- a/docs/tubes.md +++ b/docs/tubes.md @@ -14,20 +14,20 @@ By default Beanstalk will use the default tube for reserving and storing new job $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); // This will store the job on the "default" tube. -$jobId = yield $beanstalk->put($payload = json_encode([ +$jobId = $beanstalk->put($payload = json_encode([ "job" => bin2hex(random_bytes(16)), "type" => "compress-image" "path" => "/path/to/image.png" -]);); +]))->await(); -$beanstalk->use('foobar'); +$beanstalk->use('foobar')->await(); // This will store the job on the "foobar" tube. -$jobId = yield $beanstalk->put($payload = json_encode([ +$jobId = $beanstalk->put($payload = json_encode([ "job" => bin2hex(random_bytes(16)), "type" => "compress-image" "path" => "/path/to/image.png" -])); +]))->await(); ``` ## Pausing a Tube @@ -37,7 +37,7 @@ If you need to pause a tube, preventing any new jobs from being reserved, you ca ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -yield $beanstalk->pause($tube = 'foobar'); +$beanstalk->pause($tube = 'foobar')->await(); ``` ## Watching and Ignoring Tubes @@ -47,9 +47,9 @@ By default when you reserve a job you'll either pull from the `default` tube, or ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -yield $beanstalk->watch($tube = 'foobar'); -yield $beanstalk->watch($tube = 'barbaz'); -yield $beanstalk->ignore($tube = 'default'); + $beanstalk->watch($tube = 'foobar')->await(); + $beanstalk->watch($tube = 'barbaz')->await(); + $beanstalk->ignore($tube = 'default')->await(); // Watchlist will contain "foobar" and "barbaz" ``` @@ -60,11 +60,11 @@ To find out which tubes your connection is currently watching. ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -yield $beanstalk->watch($tube = 'foobar'); -yield $beanstalk->watch($tube = 'barbaz'); -yield $beanstalk->ignore($tube = 'default'); + $beanstalk->watch($tube = 'foobar')->await(); + $beanstalk->watch($tube = 'barbaz')->await(); + $beanstalk->ignore($tube = 'default')->await(); -$watchlist = $beanstalk->listWatchedTubes(); +$watchlist = $beanstalk->listWatchedTubes()->await(); ``` ## Get a List of All Existing Tubes @@ -74,7 +74,7 @@ If you need to see a list of all the tubes that exist on the server. ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$tubes = yield $beanstalk->listTubes(); +$tubes = $beanstalk->listTubes()->await(); ``` ## Get the Tube Being Used @@ -84,7 +84,7 @@ To determine which tube your client is currently using. ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$tube = yield $beanstalk->getUsedTube(); +$tube = $beanstalk->getUsedTube()->await(); ``` ## Get Tube Stats @@ -94,5 +94,5 @@ To see what stats are available for a tube, checkout the [Tube](classes/tube) cl ```php $beanstalk = new Amp\Beanstalk\BeanstalkClient("tcp://127.0.0.1:11300"); -$stats = yield $beanstalk->getTubeStats($tube = 'default'); +$stats = $beanstalk->getTubeStats($tube = 'default')->await(); ``` diff --git a/examples/consumer.php b/examples/consumer.php index 2c189dc..40f88b2 100644 --- a/examples/consumer.php +++ b/examples/consumer.php @@ -3,16 +3,16 @@ require __DIR__ . '/../vendor/autoload.php'; use Amp\Beanstalk\BeanstalkClient; -use Amp\Loop; +use Revolt\EventLoop; -Loop::run(function () { +EventLoop::queue(function () { $beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); - yield $beanstalk->watch('foobar'); + $beanstalk->watch('foobar')->await(); - while (list($jobId, $payload) = yield $beanstalk->reserve()) { + while (list($jobId, $payload) = $beanstalk->reserve()->await()) { echo "Job id: $jobId\n"; echo "Payload: $payload\n"; - $beanstalk->delete($jobId); + $beanstalk->delete($jobId)->await(); } }); diff --git a/examples/producer.php b/examples/producer.php index 9f1104b..4a6e80f 100644 --- a/examples/producer.php +++ b/examples/producer.php @@ -3,11 +3,10 @@ require __DIR__ . '/../vendor/autoload.php'; use Amp\Beanstalk\BeanstalkClient; -use Amp\Loop; -Loop::run(function () { +Revolt\EventLoop::queue(function () { $beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); - yield $beanstalk->use('foobar'); + $beanstalk->use('foobar')->await(); $payload = json_encode([ "job" => bin2hex(random_bytes(16)), @@ -15,7 +14,7 @@ "path" => "/path/to/image.png" ]); - $jobId = yield $beanstalk->put($payload); + $jobId = $beanstalk->put($payload)->await(); echo "Inserted job id: $jobId\n"; diff --git a/examples/stats.php b/examples/stats.php index e6d46d6..e35d4f9 100644 --- a/examples/stats.php +++ b/examples/stats.php @@ -4,15 +4,15 @@ use Amp\Beanstalk\BeanstalkClient; use Amp\Beanstalk\Stats\System; -use Amp\Loop; -Loop::run(function () { + +Revolt\EventLoop::queue(function () { $beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); /** * @var System $systemStats */ - $systemStats = yield $beanstalk->getSystemStats(); + $systemStats = $beanstalk->getSystemStats()->await(); echo "Active connections: {$systemStats->currentConnections}\n"; echo "Jobs ready: {$systemStats->currentJobsReady}\n"; diff --git a/phpunit.xml.dist b/phpunit.xml.dist index a11edbb..c5efb24 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,14 +1,13 @@ - - - - - ./test - - - - - ./src - - + + + + ./src + + + + + ./test + + diff --git a/src/BeanstalkClient.php b/src/BeanstalkClient.php index e786173..8b9de5a 100644 --- a/src/BeanstalkClient.php +++ b/src/BeanstalkClient.php @@ -5,41 +5,42 @@ use Amp\Beanstalk\Stats\Job; use Amp\Beanstalk\Stats\System; use Amp\Beanstalk\Stats\Tube; -use function Amp\call; -use Amp\Deferred; -use Amp\Promise; +use Amp\DeferredFuture; +use Amp\Future; use Amp\Uri\Uri; use Symfony\Component\Yaml\Yaml; use Throwable; -class BeanstalkClient { - /** @var Deferred[] */ - private $deferreds; +use function Amp\async; + +class BeanstalkClient +{ + /** @var DeferredFuture[] */ + private array $deferreds = []; /** @var Connection */ - private $connection; + private Connection $connection; - /** @var string */ - private $tube; + /** @var string|null */ + private ?string $tube = null; - public function __construct(string $uri) { + public function __construct(string $uri) + { $this->applyUri($uri); - $this->deferreds = []; - $this->connection = new Connection($uri); $this->connection->addEventHandler("response", function ($response) { - /** @var Deferred $deferred */ + /** @var DeferredFuture $deferred */ $deferred = array_shift($this->deferreds); if ($response instanceof Throwable) { - $deferred->fail($response); + $deferred->error($response); } else { - $deferred->resolve($response); + $deferred->complete($response); } }); - $this->connection->addEventHandler("error", function (Throwable $error = null) { + $this->connection->addEventHandler("error", function (?Throwable $error = null) { if ($error) { $this->failAllDeferreds($error); } @@ -50,41 +51,45 @@ public function __construct(string $uri) { if ($this->tube) { $this->connection->addEventHandler("connect", function () { - array_unshift($this->deferreds, new Deferred); + array_unshift($this->deferreds, new DeferredFuture()); return "use $this->tube\r\n"; }); } } - private function applyUri(string $uri) { + private function applyUri(string $uri): void + { $this->tube = (new Uri($uri))->getQueryParameter("tube"); } - private function send(string $message, callable $transform = null): Promise { - return call(function () use ($message, $transform) { - $this->deferreds[] = $deferred = new Deferred; - $promise = $deferred->promise(); + private function send(string $message, ?callable $transform = null): Future + { + return async(function () use ($message, $transform) { + $this->deferreds[] = $deferred = new DeferredFuture(); + $promise = $deferred->getFuture(); - yield $this->connection->send($message); - $response = yield $promise; + $this->connection->send($message)->await(); + $response = $promise->await(); return $transform ? $transform($response) : $response; }); } - public function use(string $tube) { + public function use(string $tube): Future + { return $this->send("use " . $tube . "\r\n", function () use ($tube) { $this->tube = $tube; return null; }); } - public function pause(string $tube, int $delay): Promise { + public function pause(string $tube, int $delay): Future + { $payload = "pause-tube $tube $delay\r\n"; return $this->send($payload, function (array $response) use ($tube) { - list($type) = $response; + [$type] = $response; switch ($type) { case "PAUSED": @@ -99,16 +104,17 @@ public function pause(string $tube, int $delay): Promise { }); } - public function put(string $payload, int $timeout = 60, int $delay = 0, $priority = 0): Promise { + public function put(string $payload, int $timeout = 60, int $delay = 0, $priority = 0): Future + { $payload = "put $priority $delay $timeout " . strlen($payload) . "\r\n$payload\r\n"; return $this->send($payload, function (array $response): int { - list($type) = $response; + [$type] = $response; switch ($type) { case "INSERTED": case "BURIED": - return (int) $response[1]; + return (int)$response[1]; case "EXPECTED_CRLF": throw new ExpectedCrlfException; @@ -125,11 +131,12 @@ public function put(string $payload, int $timeout = 60, int $delay = 0, $priorit }); } - public function reserve(int $timeout = null): Promise { + public function reserve(?int $timeout = null): Future + { $payload = $timeout === null ? "reserve\r\n" : "reserve-with-timeout $timeout\r\n"; return $this->send($payload, function (array $response): array { - list($type) = $response; + [$type] = $response; switch ($type) { case "DEADLINE_SOON": @@ -147,11 +154,12 @@ public function reserve(int $timeout = null): Promise { }); } - public function delete(int $id): Promise { + public function delete(int $id): Future + { $payload = "delete $id\r\n"; - return $this->send($payload, function (array $response): int { - list($type) = $response; + return $this->send($payload, function (array $response): bool { + [$type] = $response; switch ($type) { case "DELETED": @@ -166,11 +174,12 @@ public function delete(int $id): Promise { }); } - public function release(int $id, int $delay = 0, int $priority = 0): Promise { + public function release(int $id, int $delay = 0, int $priority = 0): Future + { $payload = "release $id $priority $delay\r\n"; return $this->send($payload, function (array $response): string { - list($type) = $response; + [$type] = $response; switch ($type) { case "BURIED": @@ -184,11 +193,12 @@ public function release(int $id, int $delay = 0, int $priority = 0): Promise { }); } - public function bury(int $id, int $priority = 0): Promise { + public function bury(int $id, int $priority = 0): Future + { $payload = "bury $id $priority\r\n"; - return $this->send($payload, function (array $response): int { - list($type) = $response; + return $this->send($payload, function (array $response): bool { + [$type] = $response; switch ($type) { case "BURIED": @@ -203,11 +213,12 @@ public function bury(int $id, int $priority = 0): Promise { }); } - public function kickJob(int $id): Promise { + public function kickJob(int $id): Future + { $payload = "kick-job $id\r\n"; return $this->send($payload, function (array $response): bool { - list($type) = $response; + [$type] = $response; switch ($type) { case "KICKED": @@ -222,15 +233,16 @@ public function kickJob(int $id): Promise { }); } - public function kick(int $count): Promise { + public function kick(int $count): Future + { $payload = "kick $count\r\n"; return $this->send($payload, function (array $response): int { - list($type) = $response; + [$type] = $response; switch ($type) { case "KICKED": - return (int) $response[1]; + return (int)$response[1]; default: throw new BeanstalkException("Unknown response: $type"); @@ -238,11 +250,12 @@ public function kick(int $count): Promise { }); } - public function touch(int $id): Promise { + public function touch(int $id): Future + { $payload = "touch $id\r\n"; - return $this->send($payload, function (array $response): int { - list($type) = $response; + return $this->send($payload, function (array $response): bool { + [$type] = $response; switch ($type) { case "TOUCHED": @@ -257,7 +270,8 @@ public function touch(int $id): Promise { }); } - public function watch(string $tube): Promise { + public function watch(string $tube): Future + { $payload = "watch $tube\r\n"; return $this->send($payload, function (array $response): int { @@ -265,19 +279,20 @@ public function watch(string $tube): Promise { throw new BeanstalkException("Unknown response: " . $response[0]); } - return (int) $response[1]; + return (int)$response[1]; }); } - public function ignore(string $tube): Promise { + public function ignore(string $tube): Future + { $payload = "ignore $tube\r\n"; return $this->send($payload, function (array $response): int { - list($type) = $response; + [$type] = $response; switch ($type) { case "WATCHING": - return (int) $response[1]; + return (int)$response[1]; case "NOT_IGNORED": throw new NotIgnoredException; @@ -288,15 +303,17 @@ public function ignore(string $tube): Promise { }); } - public function quit() { + public function quit(): void + { $this->send("quit\r\n"); } - public function getJobStats(int $id): Promise { + public function getJobStats(int $id): Future + { $payload = "stats-job $id\r\n"; return $this->send($payload, function (array $response) use ($id): Job { - list($type) = $response; + [$type] = $response; switch ($type) { case "OK": @@ -311,11 +328,12 @@ public function getJobStats(int $id): Promise { }); } - public function getTubeStats(string $tube): Promise { + public function getTubeStats(string $tube): Future + { $payload = "stats-tube $tube\r\n"; return $this->send($payload, function (array $response) use ($tube): Tube { - list($type) = $response; + [$type] = $response; switch ($type) { case "OK": @@ -330,7 +348,8 @@ public function getTubeStats(string $tube): Promise { }); } - public function getSystemStats(): Promise { + public function getSystemStats(): Future + { $payload = "stats\r\n"; return $this->send($payload, function (array $response): System { @@ -342,11 +361,12 @@ public function getSystemStats(): Promise { }); } - public function listTubes(): Promise { + public function listTubes(): Future + { $payload = "list-tubes\r\n"; return $this->send($payload, function (array $response): array { - list($type) = $response; + [$type] = $response; switch ($type) { case "OK": @@ -358,11 +378,12 @@ public function listTubes(): Promise { }); } - public function listWatchedTubes(): Promise { + public function listWatchedTubes(): Future + { $payload = "list-tubes-watched\r\n"; return $this->send($payload, function (array $response): array { - list($type) = $response; + [$type] = $response; switch ($type) { case "OK": @@ -374,11 +395,12 @@ public function listWatchedTubes(): Promise { }); } - public function getUsedTube(): Promise { + public function getUsedTube(): Future + { $payload = "list-tube-used\r\n"; return $this->send($payload, function (array $response): string { - list($type) = $response; + [$type] = $response; switch ($type) { case "USING": @@ -390,11 +412,12 @@ public function getUsedTube(): Promise { }); } - public function peek(int $id): Promise { + public function peek(int $id): Future + { $payload = "peek $id\r\n"; return $this->send($payload, function (array $response) use ($id): string { - list($type) = $response; + [$type] = $response; switch ($type) { case "FOUND": @@ -409,28 +432,35 @@ public function peek(int $id): Promise { }); } - public function peekReady(): Promise { - return $this->peekInState('ready'); + public function peekReady(bool $peekId = false): Future + { + return $this->peekInState('ready', $peekId); } - public function peekDelayed(): Promise { - return $this->peekInState('delayed'); + public function peekDelayed(bool $peekId = false): Future + { + return $this->peekInState('delayed', $peekId); } - public function peekBuried(): Promise { - return $this->peekInState('buried'); + public function peekBuried(bool $peekId = false): Future + { + return $this->peekInState('buried', $peekId); } - private function peekInState(string $state): Promise { + private function peekInState(string $state, bool $peekId = false): Future + { $payload = "peek-$state\r\n"; return $this->send( $payload, - function (array $response) use ($state): string { - list($type) = $response; + function (array $response) use ($state, $peekId): string { + [$type] = $response; switch ($type) { case "FOUND": + if ($peekId) { + return $response[1]; + } return $response[2]; case "NOT_FOUND": @@ -443,12 +473,12 @@ function (array $response) use ($state): string { ); } - private function failAllDeferreds(Throwable $error) { - // Fail any outstanding promises + private function failAllDeferreds(Throwable $error): void + { while ($this->deferreds) { - /** @var Deferred $deferred */ + /** @var DeferredFuture $deferred */ $deferred = array_shift($this->deferreds); - $deferred->fail($error); + $deferred->error($error); } } } diff --git a/src/Connection.php b/src/Connection.php index ee4655c..361fb1b 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -2,41 +2,44 @@ namespace Amp\Beanstalk; -use function Amp\asyncCall; -use function Amp\call; -use Amp\Deferred; -use function Amp\Socket\connect; +use Amp\DeferredFuture; +use Amp\Future; use Amp\Socket\ConnectContext; +use Amp\Socket\ConnectException; use Amp\Socket\Socket; -use Amp\Success; use Amp\Uri\Uri; +use Revolt\EventLoop; -class Connection { - /** @var Deferred */ - private $connectPromisor; +use function Amp\Socket\socketConnector; + +class Connection +{ + /** @var DeferredFuture|null */ + private ?DeferredFuture $connectPromisor = null; /** @var Parser */ - private $parser; + private Parser $parser; /** @var int */ - private $timeout = 5000; + private int $timeout = 5000; - /** @var Socket */ - private $socket; + /** @var Socket|null */ + private ?Socket $socket = null; /** @var string */ - private $uri; + private string $uri; /** @var callable[][] */ - private $handlers; + private array $handlers; - public function __construct(string $uri) { + public function __construct(string $uri) + { $this->applyUri($uri); $this->handlers = [ - "connect" => [], + "connect" => [], "response" => [], - "error" => [], - "close" => [], + "error" => [], + "close" => [], ]; $this->parser = new Parser(function ($response) { @@ -50,15 +53,17 @@ public function __construct(string $uri) { }); } - private function applyUri($uri) { + private function applyUri(string $uri): void + { $uri = new Uri($uri); - $this->timeout = (int) ($uri->getQueryParameter("timeout") ?? $this->timeout); + $this->timeout = (int)($uri->getQueryParameter("timeout") ?? $this->timeout); $this->uri = $uri->getScheme() . "://" . $uri->getHost() . ":" . $uri->getPort(); } - public function addEventHandler($events, callable $callback) { - $events = (array) $events; + public function addEventHandler($events, callable $callback): void + { + $events = (array)$events; foreach ($events as $event) { if (!isset($this->handlers[$event])) { @@ -69,66 +74,63 @@ public function addEventHandler($events, callable $callback) { } } - public function send(string $payload) { - return call(function () use ($payload) { - yield $this->connect(); - yield $this->socket->write($payload); + public function send(string $payload): Future + { + return \Amp\async(function () use ($payload) { + $this->connect()->await(); + $this->socket->write($payload); }); } - private function connect() { - // If we're in the process of connecting already return that same promise + private function connect(): Future + { if ($this->connectPromisor) { - return $this->connectPromisor->promise(); + return $this->connectPromisor->getFuture(); } - // If a read watcher exists we know we're already connected if ($this->socket) { - return new Success; + return Future::complete(null); } - $this->connectPromisor = new Deferred; - $socketPromise = connect($this->uri, (new ConnectContext)->withConnectTimeout($this->timeout)); - - $socketPromise->onResolve(function ($error, $socket) { - $connectPromisor = $this->connectPromisor; - $this->connectPromisor = null; - - if ($error) { - $connectPromisor->fail(new ConnectException( - "Connection attempt failed", - $code = 0, - $error - )); - - return; - } - - $this->socket = $socket; + $this->connectPromisor = new DeferredFuture(); + $connector = socketConnector(); - foreach ($this->handlers["connect"] as $handler) { - $pipelinedCommand = $handler(); + EventLoop::queue(function () use ($connector) { + try { + $this->socket = $connector->connect($this->uri, (new ConnectContext())->withConnectTimeout($this->timeout)); + foreach ($this->handlers["connect"] as $handler) { + $pipelinedCommand = $handler(); - if (!empty($pipelinedCommand)) { - $this->socket->write($pipelinedCommand); - } - } - - asyncCall(function () { - while (null !== $chunk = yield $this->socket->read()) { - $this->parser->send($chunk); + if (!empty($pipelinedCommand)) { + $this->socket->write($pipelinedCommand); + } } - $this->close(); - }); - - $connectPromisor->resolve(); + EventLoop::queue(function () { + while (null !== $chunk = $this->socket->read()) { + $this->parser->send($chunk); + } + $this->close(); + }); + + $this->connectPromisor->complete(); + } catch (\Throwable $e) { + $this->connectPromisor->error( + new ConnectException( + "Connection attempt failed", + 0, + $e + ) + ); + $this->connectPromisor = null; + } }); - return $this->connectPromisor->promise(); + return $this->connectPromisor->getFuture(); } - private function onError(\Throwable $exception) { + private function onError(\Throwable $exception): void + { foreach ($this->handlers["error"] as $handler) { $handler($exception); } @@ -136,7 +138,8 @@ private function onError(\Throwable $exception) { $this->close(); } - public function close() { + public function close(): void + { $this->parser->reset(); if ($this->socket) { @@ -149,7 +152,8 @@ public function close() { } } - public function __destruct() { + public function __destruct() + { $this->close(); } } diff --git a/src/Stats/Job.php b/src/Stats/Job.php index 541ac7c..ce3afce 100644 --- a/src/Stats/Job.php +++ b/src/Stats/Job.php @@ -2,31 +2,30 @@ namespace Amp\Beanstalk\Stats; -use Amp\Struct; - -class Job { - use Struct; +class Job +{ const STATE_READY = "ready"; const STATE_DELAYED = "delayed"; const STATE_RESERVED = "reserved"; const STATE_BURIED = "buried"; - public function __construct(array $struct) { - $this->id = (int) $struct["id"]; + public function __construct(array $struct) + { + $this->id = (int)$struct["id"]; $this->tube = $struct["tube"]; $this->state = $struct["state"]; - $this->priority = (int) $struct["pri"]; - $this->age = (int) $struct["age"]; - $this->delay = (int) $struct["delay"]; - $this->ttr = (int) $struct["ttr"]; - $this->timeLeft = (int) $struct["time-left"]; + $this->priority = (int)$struct["pri"]; + $this->age = (int)$struct["age"]; + $this->delay = (int)$struct["delay"]; + $this->ttr = (int)$struct["ttr"]; + $this->timeLeft = (int)$struct["time-left"]; $this->file = $struct["file"]; - $this->reserves = (int) $struct["reserves"]; - $this->timeouts = (int) $struct["timeouts"]; - $this->releases = (int) $struct["releases"]; - $this->buries = (int) $struct["buries"]; - $this->kicks = (int) $struct["kicks"]; + $this->reserves = (int)$struct["reserves"]; + $this->timeouts = (int)$struct["timeouts"]; + $this->releases = (int)$struct["releases"]; + $this->buries = (int)$struct["buries"]; + $this->kicks = (int)$struct["kicks"]; } public $id; diff --git a/src/Stats/System.php b/src/Stats/System.php index 49067b2..8ccbb7b 100644 --- a/src/Stats/System.php +++ b/src/Stats/System.php @@ -2,56 +2,55 @@ namespace Amp\Beanstalk\Stats; -use Amp\Struct; +class System +{ -class System { - use Struct; - - public function __construct(array $struct) { - $this->currentJobsUrgent = (int) $struct["current-jobs-urgent"]; - $this->currentJobsReady = (int) $struct["current-jobs-ready"]; - $this->currentJobsReserved = (int) $struct["current-jobs-reserved"]; - $this->currentJobsDelayed = (int) $struct["current-jobs-delayed"]; - $this->currentJobsBuried = (int) $struct["current-jobs-buried"]; - $this->cmdPut = (int) $struct["cmd-put"]; - $this->cmdPeek = (int) $struct["cmd-peek"]; - $this->cmdPeekReady = (int) $struct["cmd-peek-ready"]; - $this->cmdPeekDelayed = (int) $struct["cmd-peek-delayed"]; - $this->cmdPeekBuried = (int) $struct["cmd-peek-buried"]; - $this->cmdReserve = (int) $struct["cmd-reserve"]; - $this->cmdUse = (int) $struct["cmd-use"]; - $this->cmdWatch = (int) $struct["cmd-watch"]; - $this->cmdIgnore = (int) $struct["cmd-ignore"]; - $this->cmdDelete = (int) $struct["cmd-delete"]; - $this->cmdRelease = (int) $struct["cmd-release"]; - $this->cmdBury = (int) $struct["cmd-bury"]; - $this->cmdKick = (int) $struct["cmd-kick"]; - $this->cmdStats = (int) $struct["cmd-stats"]; - $this->cmdStatsJob = (int) $struct["cmd-stats-job"]; - $this->cmdStatsTube = (int) $struct["cmd-stats-tube"]; - $this->cmdListTubes = (int) $struct["cmd-list-tubes"]; - $this->cmdListTubeUsed = (int) $struct["cmd-list-tube-used"]; - $this->cmdListTubesWatched = (int) $struct["cmd-list-tubes-watched"]; - $this->cmdPauseTube = (int) $struct["cmd-pause-tube"]; - $this->jobTimeouts = (int) $struct["job-timeouts"]; - $this->totalJobs = (int) $struct["total-jobs"]; - $this->maxJobSize = (int) $struct["max-job-size"]; - $this->currentTubes = (int) $struct["current-tubes"]; - $this->currentConnections = (int) $struct["current-connections"]; - $this->currentProducers = (int) $struct["current-producers"]; - $this->currentWorkers = (int) $struct["current-workers"]; - $this->currentWaiting = (int) $struct["current-waiting"]; - $this->totalConnections = (int) $struct["total-connections"]; - $this->pid = (int) $struct["pid"]; + public function __construct(array $struct) + { + $this->currentJobsUrgent = (int)$struct["current-jobs-urgent"]; + $this->currentJobsReady = (int)$struct["current-jobs-ready"]; + $this->currentJobsReserved = (int)$struct["current-jobs-reserved"]; + $this->currentJobsDelayed = (int)$struct["current-jobs-delayed"]; + $this->currentJobsBuried = (int)$struct["current-jobs-buried"]; + $this->cmdPut = (int)$struct["cmd-put"]; + $this->cmdPeek = (int)$struct["cmd-peek"]; + $this->cmdPeekReady = (int)$struct["cmd-peek-ready"]; + $this->cmdPeekDelayed = (int)$struct["cmd-peek-delayed"]; + $this->cmdPeekBuried = (int)$struct["cmd-peek-buried"]; + $this->cmdReserve = (int)$struct["cmd-reserve"]; + $this->cmdUse = (int)$struct["cmd-use"]; + $this->cmdWatch = (int)$struct["cmd-watch"]; + $this->cmdIgnore = (int)$struct["cmd-ignore"]; + $this->cmdDelete = (int)$struct["cmd-delete"]; + $this->cmdRelease = (int)$struct["cmd-release"]; + $this->cmdBury = (int)$struct["cmd-bury"]; + $this->cmdKick = (int)$struct["cmd-kick"]; + $this->cmdStats = (int)$struct["cmd-stats"]; + $this->cmdStatsJob = (int)$struct["cmd-stats-job"]; + $this->cmdStatsTube = (int)$struct["cmd-stats-tube"]; + $this->cmdListTubes = (int)$struct["cmd-list-tubes"]; + $this->cmdListTubeUsed = (int)$struct["cmd-list-tube-used"]; + $this->cmdListTubesWatched = (int)$struct["cmd-list-tubes-watched"]; + $this->cmdPauseTube = (int)$struct["cmd-pause-tube"]; + $this->jobTimeouts = (int)$struct["job-timeouts"]; + $this->totalJobs = (int)$struct["total-jobs"]; + $this->maxJobSize = (int)$struct["max-job-size"]; + $this->currentTubes = (int)$struct["current-tubes"]; + $this->currentConnections = (int)$struct["current-connections"]; + $this->currentProducers = (int)$struct["current-producers"]; + $this->currentWorkers = (int)$struct["current-workers"]; + $this->currentWaiting = (int)$struct["current-waiting"]; + $this->totalConnections = (int)$struct["total-connections"]; + $this->pid = (int)$struct["pid"]; $this->version = $struct["version"]; - $this->rusageUtime = (float) $struct["rusage-utime"]; - $this->rusageStime = (float) $struct["rusage-stime"]; - $this->uptime = (int) $struct["uptime"]; - $this->binlogOldestIndex = (int) $struct["binlog-oldest-index"]; - $this->binlogCurrentIndex = (int) $struct["binlog-current-index"]; - $this->binlogMaxSize = (int) $struct["binlog-max-size"]; - $this->binlogRecordsWritten = (int) $struct["binlog-records-written"]; - $this->binlogRecordsMigrated = (int) $struct["binlog-records-migrated"]; + $this->rusageUtime = (float)$struct["rusage-utime"]; + $this->rusageStime = (float)$struct["rusage-stime"]; + $this->uptime = (int)$struct["uptime"]; + $this->binlogOldestIndex = (int)$struct["binlog-oldest-index"]; + $this->binlogCurrentIndex = (int)$struct["binlog-current-index"]; + $this->binlogMaxSize = (int)$struct["binlog-max-size"]; + $this->binlogRecordsWritten = (int)$struct["binlog-records-written"]; + $this->binlogRecordsMigrated = (int)$struct["binlog-records-migrated"]; $this->id = $struct["id"]; $this->hostname = $struct["hostname"]; } diff --git a/src/Stats/Tube.php b/src/Stats/Tube.php index f24b000..7a458e6 100644 --- a/src/Stats/Tube.php +++ b/src/Stats/Tube.php @@ -2,26 +2,25 @@ namespace Amp\Beanstalk\Stats; -use Amp\Struct; +class Tube +{ -class Tube { - use Struct; - - public function __construct(array $struct) { + public function __construct(array $struct) + { $this->name = $struct["name"]; - $this->currentJobsUrgent = (int) $struct["current-jobs-urgent"]; - $this->currentJobsReady = (int) $struct["current-jobs-ready"]; - $this->currentJobsReserved = (int) $struct["current-jobs-reserved"]; - $this->currentJobsDelayed = (int) $struct["current-jobs-delayed"]; - $this->currentJobsBuried = (int) $struct["current-jobs-buried"]; - $this->totalJobs = (int) $struct["total-jobs"]; - $this->currentUsing = (int) $struct["current-using"]; - $this->currentWaiting = (int) $struct["current-waiting"]; - $this->currentWatching = (int) $struct["current-watching"]; - $this->pause = (int) $struct["pause"]; - $this->cmdDelete = (int) $struct["cmd-delete"]; - $this->cmdPauseTube = (int) $struct["cmd-pause-tube"]; - $this->pauseTimeLeft = (int) $struct["pause-time-left"]; + $this->currentJobsUrgent = (int)$struct["current-jobs-urgent"]; + $this->currentJobsReady = (int)$struct["current-jobs-ready"]; + $this->currentJobsReserved = (int)$struct["current-jobs-reserved"]; + $this->currentJobsDelayed = (int)$struct["current-jobs-delayed"]; + $this->currentJobsBuried = (int)$struct["current-jobs-buried"]; + $this->totalJobs = (int)$struct["total-jobs"]; + $this->currentUsing = (int)$struct["current-using"]; + $this->currentWaiting = (int)$struct["current-waiting"]; + $this->currentWatching = (int)$struct["current-watching"]; + $this->pause = (int)$struct["pause"]; + $this->cmdDelete = (int)$struct["cmd-delete"]; + $this->cmdPauseTube = (int)$struct["cmd-pause-tube"]; + $this->pauseTimeLeft = (int)$struct["pause-time-left"]; } public $name; diff --git a/test/BeanstalkClientConnectionClosedTest.php b/test/BeanstalkClientConnectionClosedTest.php index 069468d..b7f659e 100644 --- a/test/BeanstalkClientConnectionClosedTest.php +++ b/test/BeanstalkClientConnectionClosedTest.php @@ -4,26 +4,37 @@ use Amp\Beanstalk\BeanstalkClient; use Amp\Beanstalk\ConnectionClosedException; -use function Amp\call; -use Amp\Delayed; +use Amp\DeferredFuture; +use Amp\Future; +use Amp\Socket\ConnectContext; + + use Amp\PHPUnit\AsyncTestCase; -use function Amp\Promise\all; -use Amp\Socket\Server; + +use Amp\Socket\Socket; use Amp\Socket\SocketException; +use function Amp\async; +use function Amp\delay; +use function Amp\Future\awaitAll; +use function Amp\Socket\connect; +use function Amp\Socket\listen; + class BeanstalkClientConnectionClosedTest extends AsyncTestCase { - /** @var Server */ + /** @var Socket */ private $server; /** * @throws SocketException */ - public function setUp() { + public function setUp(): void + { parent::setUp(); - $this->server = Server::listen("tcp://127.0.0.1:0"); + $this->server = listen("tcp://127.0.0.1:0", ); } - public function tearDown() { + public function tearDown(): void + { parent::tearDown(); $this->server->close(); } @@ -34,17 +45,17 @@ public function tearDown() { * @param $reserveTimeout int|null Seconds * @param $connectionCloseTimeout int Milliseconds * @param $testFailTimeout int Milliseconds - * @return \Generator */ - public function testReserve($reserveTimeout, $connectionCloseTimeout, $testFailTimeout) { - $beanstalk = new BeanstalkClient("tcp://". $this->server->getAddress()); - $connectionClosePromise = call(function ($connectionCloseTimeout) { - yield new Delayed($connectionCloseTimeout); + public function testReserve(?int $reserveTimeout, int $connectionCloseTimeout, int $testFailTimeout) { + $beanstalk = new BeanstalkClient("tcp://". $this->server->getAddress()->toString()); + + $connectionClosePromise = async(function ($connectionCloseTimeout) { + delay($connectionCloseTimeout); $this->server->close(); - }, $connectionCloseTimeout); + },$connectionCloseTimeout); $this->setTimeout($testFailTimeout); $this->expectException(ConnectionClosedException::class); - yield all([ + Future\await([ $beanstalk->reserve($reserveTimeout), $connectionClosePromise ]); @@ -52,8 +63,8 @@ public function testReserve($reserveTimeout, $connectionCloseTimeout, $testFailT public function dataProviderReserve(): array { return [ - "no timeout" => [null, 500, 600], - "one second timeout" => [1, 900, 1100], + "no timeout" => [null, 2, 3], + "one second timeout" => [1, 2, 5], ]; } } diff --git a/test/IntegrationTest.php b/test/IntegrationTest.php index 3d63f0b..9e6fc82 100644 --- a/test/IntegrationTest.php +++ b/test/IntegrationTest.php @@ -3,128 +3,150 @@ namespace Amp\Beanstalk\Test; use Amp\Beanstalk\BeanstalkClient; +use Amp\Beanstalk\NotFoundException; use Amp\Beanstalk\Stats\Job; use Amp\Beanstalk\Stats\System; -use function Amp\call; -use function Amp\Promise\wait; + use PHPUnit\Framework\TestCase; -class IntegrationTest extends TestCase { +use function getenv; + +class IntegrationTest extends TestCase +{ /** @var BeanstalkClient */ - private $beanstalk; + private $beanstalk = null; + + private $tubeName = 'tests'; - public function setUp() { - if (!\getenv("AMP_TEST_BEANSTALK_INTEGRATION") && !\getenv("TRAVIS")) { - $this->markTestSkipped("You need to set AMP_TEST_BEANSTALK_INTEGRATION=1 in order to run the integration tests."); + public function setUp(): void + { + if (!getenv("AMP_TEST_BEANSTALK_INTEGRATION") && !getenv("TRAVIS")) { + $this->markTestSkipped("You need to set AMP_TEST_BEANSTALK_INTEGRATION=1 in order to run the integration tests."); } + $this->beanstalk?->quit(); + $this->beanstalk = new BeanstalkClient("tcp://127.0.0.1:11300"); + $this->beanstalk->use($this->tubeName); + $this->beanstalk->watch($this->tubeName); + + /** @var System $stats */ - wait(call(function () { - /** @var System $stats */ - $stats = yield $this->beanstalk->getSystemStats(); - for ($jobId = 1; $jobId <= $stats->totalJobs; $jobId++) { - yield $this->beanstalk->delete($jobId); - } - })); + try { + do { + $jobId = $this->beanstalk->peekReady(true)->await(); + $this->beanstalk->delete($jobId)->await(); + } while (true); + } catch (NotFoundException) { + } + try { + do { + $jobId = $this->beanstalk->peekDelayed(true)->await(); + $this->beanstalk->delete($jobId)->await(); + } while (true); + } catch (NotFoundException) { + } + + try { + do { + $jobId = $this->beanstalk->peekBuried(true)->await(); + $this->beanstalk->delete($jobId)->await(); + } while (true); + } catch (NotFoundException) { + } } - public function testPut() { - wait(call(function () { - /** @var System $statsBefore */ - $statsBefore = yield $this->beanstalk->getSystemStats(); + public function testPut() + { + /** @var System $statsBefore */ + $statsBefore = $this->beanstalk->getSystemStats()->await(); - $jobId = yield $this->beanstalk->put("hi"); - $this->assertInternalType("int", $jobId); + $jobId = $this->beanstalk->put("hi")->await(); - /** @var Job $jobStats */ - $jobStats = yield $this->beanstalk->getJobStats($jobId); + $this->assertIsInt($jobId); - $this->assertSame($jobId, $jobStats->id); - $this->assertSame(0, $jobStats->priority); - $this->assertSame(0, $jobStats->delay); + /** @var Job $jobStats */ + $jobStats = $this->beanstalk->getJobStats($jobId)->await(); - /** @var System $statsAfter */ - $statsAfter = yield $this->beanstalk->getSystemStats(); + $this->assertSame($jobId, $jobStats->id); + $this->assertSame(0, $jobStats->priority); + $this->assertSame(0, $jobStats->delay); - $this->assertSame($statsBefore->cmdPut + 1, $statsAfter->cmdPut); - })); + /** @var System $statsAfter */ + $statsAfter = $this->beanstalk->getSystemStats()->await(); + + $this->assertSame($statsBefore->cmdPut + 1, $statsAfter->cmdPut); } - public function testPeek() { - wait(call(function () { - $jobId = yield $this->beanstalk->put('I am ready'); - $this->assertInternalType("int", $jobId); + public function testPeek() + { + $jobId = $this->beanstalk->put('I am ready')->await(); + $this->assertIsInt($jobId); - $peekedJob = yield $this->beanstalk->peek($jobId); - $this->assertEquals('I am ready', $peekedJob); + $peekedJob = $this->beanstalk->peek($jobId)->await(); + $this->assertEquals('I am ready', $peekedJob); - $peekedJob = yield $this->beanstalk->peekReady(); - $this->assertEquals('I am ready', $peekedJob); + $peekedJob = $this->beanstalk->peekReady()->await(); + $this->assertEquals('I am ready', $peekedJob); - list($jobId) = yield $this->beanstalk->reserve(); - $buried = yield $this->beanstalk->bury($jobId); - $this->assertEquals(1, $buried); - $peekedJob = yield $this->beanstalk->peekBuried(); - $this->assertEquals('I am ready', $peekedJob); + [$jobId] = $this->beanstalk->reserve()->await(); + $buried = $this->beanstalk->bury($jobId)->await(); + $this->assertEquals(1, $buried); + $peekedJob = $this->beanstalk->peekBuried()->await(); + $this->assertEquals('I am ready', $peekedJob); - $jobId = yield $this->beanstalk->put('I am delayed', 60, 60); - $peekedJob = yield $this->beanstalk->peekDelayed(); - $this->assertEquals('I am delayed', $peekedJob); - })); + $jobId = $this->beanstalk->put('I am delayed', 60, 60)->await(); + $peekedJob = $this->beanstalk->peekDelayed()->await(); + $this->assertEquals('I am delayed', $peekedJob); } - public function testKickJob() { - wait(call(function () { - $jobId = yield $this->beanstalk->put("hi"); - $this->assertInternalType("int", $jobId); + public function testKickJob() + { + $jobId = $this->beanstalk->put("hi")->await(); + $this->assertIsInt($jobId); - $kicked = yield $this->beanstalk->kickJob($jobId); - $this->assertFalse($kicked); + $kicked = $this->beanstalk->kickJob($jobId)->await(); + $this->assertFalse($kicked); - list($jobId, ) = yield $this->beanstalk->reserve(); - $buried = yield $this->beanstalk->bury($jobId); - $this->assertEquals(1, $buried); - /** @var Job $jobStats */ - $jobStats = yield $this->beanstalk->getJobStats($jobId); - $this->assertEquals('buried', $jobStats->state); + [$jobId,] = $this->beanstalk->reserve()->await(); + $buried = $this->beanstalk->bury($jobId)->await(); + $this->assertEquals(1, $buried); + /** @var Job $jobStats */ + $jobStats = $this->beanstalk->getJobStats($jobId)->await(); + $this->assertEquals('buried', $jobStats->state); - $kicked = yield $this->beanstalk->kickJob($jobId); - $this->assertTrue($kicked); - })); + $kicked = $this->beanstalk->kickJob($jobId)->await(); + $this->assertTrue($kicked); } - public function testKick() { - wait(call(function () { - for ($i = 0; $i < 10; $i++) { - yield $this->beanstalk->put("Job $i"); - } - for ($i = 0; $i < 8; $i++) { - list($jobId, ) = yield $this->beanstalk->reserve(); - $buried = yield $this->beanstalk->bury($jobId); - $this->assertEquals(1, $buried); - } - - $kicked = yield $this->beanstalk->kick(4); - $this->assertEquals(4, $kicked); - - $kicked = yield $this->beanstalk->kick(10); - $this->assertEquals(4, $kicked); - - $kicked = yield $this->beanstalk->kick(1); - $this->assertEquals(0, $kicked); - })); + public function testKick() + { + for ($i = 0; $i < 10; $i++) { + $this->beanstalk->put("Job $i")->await(); + } + for ($i = 0; $i < 8; $i++) { + [$jobId,] = $this->beanstalk->reserve()->await(); + $buried = $this->beanstalk->bury($jobId)->await(); + $this->assertEquals(1, $buried); + } + + $kicked = $this->beanstalk->kick(4)->await(); + $this->assertEquals(4, $kicked); + + $kicked = $this->beanstalk->kick(10)->await(); + $this->assertEquals(4, $kicked); + + $kicked = $this->beanstalk->kick(1)->await(); + $this->assertEquals(0, $kicked); } - public function testReservedJobShouldHaveTheSamePayloadAsThePutPayload() { - wait(call(function () { - $jobId = yield $this->beanstalk->put(str_repeat('*', 65535)); + public function testReservedJobShouldHaveTheSamePayloadAsThePutPayload() + { + $jobId = $this->beanstalk->put(str_repeat('*', 65535))->await(); - yield $this->beanstalk->watch('default'); - list($reservedJobId, $reservedJobPayload) = yield $this->beanstalk->reserve(); + [$reservedJobId, $reservedJobPayload] = $this->beanstalk->reserve()->await(); - $this->assertSame($jobId, $reservedJobId); - $this->assertSame(65535, strlen($reservedJobPayload)); - })); + $this->assertSame($jobId, $reservedJobId); + $this->assertSame(65535, strlen($reservedJobPayload)); } } diff --git a/test/ParserTest.php b/test/ParserTest.php index ebf5d24..c63f35a 100644 --- a/test/ParserTest.php +++ b/test/ParserTest.php @@ -10,7 +10,8 @@ class ParserTest extends TestCase { protected $parsedElements; - public function setUp() { + public function setUp(): void + { $this->parserToTest = new Parser(function ($result) { $this->parsedElements = $result; });