From 4cbf44808a6915986854ba7d89be96312e6f7448 Mon Sep 17 00:00:00 2001 From: szolotuhin Date: Mon, 15 Aug 2016 16:53:22 +0300 Subject: [PATCH 1/3] create rabbitMQ connection --- composer.json | 1 + src/DataFormat/Hash/RabbitMQ/Wait.php | 83 +++++++++++++++ src/Query/RabbitMQ/Get.php | 67 ++++++++++++ src/Query/RabbitMQ/Set.php | 96 +++++++++++++++++ src/Resource/RabbitMQ.php | 143 ++++++++++++++++++++++++++ 5 files changed, 390 insertions(+) create mode 100644 src/DataFormat/Hash/RabbitMQ/Wait.php create mode 100644 src/Query/RabbitMQ/Get.php create mode 100644 src/Query/RabbitMQ/Set.php create mode 100644 src/Resource/RabbitMQ.php diff --git a/composer.json b/composer.json index 0cedb6e..8bc68a0 100644 --- a/composer.json +++ b/composer.json @@ -47,6 +47,7 @@ "phpunit/phpunit": "~4.7", "fabpot/php-cs-fixer": "~2.0@dev", "couchbase/php-ext-couchbase": "dev-master", + "php-amqplib/php-amqplib": "2.*@stable", "neutron/sphinxsearch-api": "0.9.9" }, diff --git a/src/DataFormat/Hash/RabbitMQ/Wait.php b/src/DataFormat/Hash/RabbitMQ/Wait.php new file mode 100644 index 0000000..4ab016e --- /dev/null +++ b/src/DataFormat/Hash/RabbitMQ/Wait.php @@ -0,0 +1,83 @@ +channel = $channel; + $this->channel->basic_consume($this->getQueue(), '', false, false, false, false, [$this, 'setMessage']); + + return $this; + } + + public function setMessage(AMQPMessage $message) { + $this->message = $message; + + return $this; + } + + /** + * @inheritdoc + */ + public function formatData() + { + if ($this->message) { + $this->message->delivery_info['channel']->basic_ack($this->message->delivery_info['delivery_tag']); + unset($this->message); + } + + $this->channel->wait(); + + return json_decode($this->message->getBody()); + } + + /** + * @inheritdoc + */ + public function formatValue() + { + return null; + } + + /** + * @return mixed + */ + public function getQueue() + { + return $this->queue; + } + + /** + * @param mixed $queue + * @return $this + */ + public function setQueue($queue) + { + $this->queue = $queue; + + return $this; + } +} diff --git a/src/Query/RabbitMQ/Get.php b/src/Query/RabbitMQ/Get.php new file mode 100644 index 0000000..9191ee2 --- /dev/null +++ b/src/Query/RabbitMQ/Get.php @@ -0,0 +1,67 @@ +error) { + return self::STATUS_ERROR; + } + + return self::STATUS_OK; + } + + /** + * @return string JSON + * @throws \Exception + */ + public function execute() + { + try { + $this->getResource()->basic_qos(null, 1, null); + + return $this->getResource(); + } catch (\Exception $error) { + $this->error = $error; + } + + return false; + } + + /** + * @return AMQPChannel + * @throws \Exception + */ + protected function getResource() { + return parent::getResource(); + } + + public function getCountTotal(){} + public function getCount(){} + public function getLastId(){} +} \ No newline at end of file diff --git a/src/Query/RabbitMQ/Set.php b/src/Query/RabbitMQ/Set.php new file mode 100644 index 0000000..4064c2e --- /dev/null +++ b/src/Query/RabbitMQ/Set.php @@ -0,0 +1,96 @@ +error) { + return self::STATUS_ERROR; + } + + return self::STATUS_OK; + } + + public function setMessage($message) + { + $this->message = $message; + + return $this; + } + + /** + * @return boolean + */ + public function execute() + { + try { + if ($this->message === null) throw new \Exception('Не указано сообщение'); + + $message = new AMQPMessage(); + $message->setBody(json_encode($this->message)); + $this->getResource()->basic_publish($message, '', $this->getQueue()); + + return true; + } catch (\Exception $error) { + $this->error = $error; + } + + return false; + } + + /** + * @return string + */ + public function getQueue() + { + return $this->queue; + } + + /** + * @param string $queue + * @return $this + */ + public function setQueue($queue) + { + $this->queue = $queue; + + return $this; + } + + /** + * @return AMQPChannel + * @throws \Exception + */ + protected function getResource() { + return parent::getResource(); + } + + public function getCountTotal(){} + public function getCount(){} + public function getLastId(){} +} \ No newline at end of file diff --git a/src/Resource/RabbitMQ.php b/src/Resource/RabbitMQ.php new file mode 100644 index 0000000..7b91e58 --- /dev/null +++ b/src/Resource/RabbitMQ.php @@ -0,0 +1,143 @@ +host = $host; + + return $this; + } + + /** + * @param string|int $port + * @return self + */ + public function setPort($port) + { + $this->port = $port; + + return $this; + } + + /** + * @param string $user + * @return self + */ + public function setUser($user) + { + $this->user = $user; + + return $this; + } + + /** + * @param string $password + * @return self + */ + public function setPassword($password) + { + $this->password = $password; + + return $this; + } + + /** + * @return string + */ + public function getHost() + { + return $this->host; + } + + /** + * @return string|int + */ + public function getPort() + { + return $this->port; + } + + /** + * @return string + */ + public function getUser() + { + return $this->user; + } + + /** + * @return string + */ + public function getPassword() + { + return $this->password; + } + + /** + * @return AMQPChannel + */ + public function getHandle() + { + if (!$this->channel) { + $this->channel = $this->getConnect()->channel(); + } + + return $this->channel; + } + + /** + * @param string $queue + * @return $this + */ + public function queueDeclare($queue) + { + $this->getHandle()->queue_declare($queue, false, false, false, false); + + return $this; + } + + /** + * @return AMQPStreamConnection + */ + protected function getConnect() { + if (!$this->connect) { + $this->connect = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); + } + + return $this->connect; + } + + public function disconnect(){} + public function setDatabase($database){} + public function getDatabase(){} +} From 8be6c466998126bb6a00ee8667e9bc51cb77ad49 Mon Sep 17 00:00:00 2001 From: szolotuhin Date: Tue, 16 Aug 2016 11:08:02 +0300 Subject: [PATCH 2/3] code style --- src/DataFormat/Hash/RabbitMQ/Wait.php | 3 ++- src/Query/RabbitMQ/Get.php | 18 +++++++++++++----- src/Query/RabbitMQ/Set.php | 19 ++++++++++++++----- src/Resource/RabbitMQ.php | 15 +++++++++++---- 4 files changed, 40 insertions(+), 15 deletions(-) diff --git a/src/DataFormat/Hash/RabbitMQ/Wait.php b/src/DataFormat/Hash/RabbitMQ/Wait.php index 4ab016e..c039747 100644 --- a/src/DataFormat/Hash/RabbitMQ/Wait.php +++ b/src/DataFormat/Hash/RabbitMQ/Wait.php @@ -33,7 +33,8 @@ public function setData($channel) return $this; } - public function setMessage(AMQPMessage $message) { + public function setMessage(AMQPMessage $message) + { $this->message = $message; return $this; diff --git a/src/Query/RabbitMQ/Get.php b/src/Query/RabbitMQ/Get.php index 9191ee2..a2e5a5a 100644 --- a/src/Query/RabbitMQ/Get.php +++ b/src/Query/RabbitMQ/Get.php @@ -28,7 +28,8 @@ class Get extends Query /** * @return int bitmask of Imhonet\Connection\Query\IQuery::STATUS_* */ - public function getErrorCode(){ + public function getErrorCode() + { if ($this->error) { return self::STATUS_ERROR; } @@ -57,11 +58,18 @@ public function execute() * @return AMQPChannel * @throws \Exception */ - protected function getResource() { + protected function getResource() + { return parent::getResource(); } - public function getCountTotal(){} - public function getCount(){} - public function getLastId(){} + public function getCountTotal() + { + } + public function getCount() + { + } + public function getLastId() + { + } } \ No newline at end of file diff --git a/src/Query/RabbitMQ/Set.php b/src/Query/RabbitMQ/Set.php index 4064c2e..18725bb 100644 --- a/src/Query/RabbitMQ/Set.php +++ b/src/Query/RabbitMQ/Set.php @@ -49,7 +49,9 @@ public function setMessage($message) public function execute() { try { - if ($this->message === null) throw new \Exception('Не указано сообщение'); + if ($this->message === null) { + throw new \Exception('Не указано сообщение'); + } $message = new AMQPMessage(); $message->setBody(json_encode($this->message)); @@ -86,11 +88,18 @@ public function setQueue($queue) * @return AMQPChannel * @throws \Exception */ - protected function getResource() { + protected function getResource() + { return parent::getResource(); } - public function getCountTotal(){} - public function getCount(){} - public function getLastId(){} + public function getCountTotal() + { + } + public function getCount() + { + } + public function getLastId() + { + } } \ No newline at end of file diff --git a/src/Resource/RabbitMQ.php b/src/Resource/RabbitMQ.php index 7b91e58..85261aa 100644 --- a/src/Resource/RabbitMQ.php +++ b/src/Resource/RabbitMQ.php @@ -129,7 +129,8 @@ public function queueDeclare($queue) /** * @return AMQPStreamConnection */ - protected function getConnect() { + protected function getConnect() + { if (!$this->connect) { $this->connect = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); } @@ -137,7 +138,13 @@ protected function getConnect() { return $this->connect; } - public function disconnect(){} - public function setDatabase($database){} - public function getDatabase(){} + public function disconnect() + { + } + public function setDatabase($database) + { + } + public function getDatabase() + { + } } From aed77045cb8a9835d55e01d91cd0b71dd923547e Mon Sep 17 00:00:00 2001 From: szolotuhin Date: Tue, 16 Aug 2016 11:18:03 +0300 Subject: [PATCH 3/3] code style --- src/Query/RabbitMQ/Get.php | 3 +-- src/Query/RabbitMQ/Set.php | 3 +-- src/Resource/RabbitMQ.php | 1 - 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/Query/RabbitMQ/Get.php b/src/Query/RabbitMQ/Get.php index a2e5a5a..3acb47d 100644 --- a/src/Query/RabbitMQ/Get.php +++ b/src/Query/RabbitMQ/Get.php @@ -8,7 +8,6 @@ */ namespace Imhonet\Connection\Query\RabbitMQ; - use Imhonet\Connection\Query\Query; use Imhonet\Connection\Resource\RabbitMQ; use PhpAmqpLib\Channel\AMQPChannel; @@ -72,4 +71,4 @@ public function getCount() public function getLastId() { } -} \ No newline at end of file +} diff --git a/src/Query/RabbitMQ/Set.php b/src/Query/RabbitMQ/Set.php index 18725bb..5162c1d 100644 --- a/src/Query/RabbitMQ/Set.php +++ b/src/Query/RabbitMQ/Set.php @@ -8,7 +8,6 @@ namespace Imhonet\Connection\Query\RabbitMQ; - use Imhonet\Connection\Query\Query; use Imhonet\Connection\Resource\RabbitMQ; use PhpAmqpLib\Channel\AMQPChannel; @@ -102,4 +101,4 @@ public function getCount() public function getLastId() { } -} \ No newline at end of file +} diff --git a/src/Resource/RabbitMQ.php b/src/Resource/RabbitMQ.php index 85261aa..3298c60 100644 --- a/src/Resource/RabbitMQ.php +++ b/src/Resource/RabbitMQ.php @@ -8,7 +8,6 @@ namespace Imhonet\Connection\Resource; - use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection;