diff --git a/composer.json b/composer.json index 0cedb6e..8bc68a0 100644 --- a/composer.json +++ b/composer.json @@ -47,6 +47,7 @@ "phpunit/phpunit": "~4.7", "fabpot/php-cs-fixer": "~2.0@dev", "couchbase/php-ext-couchbase": "dev-master", + "php-amqplib/php-amqplib": "2.*@stable", "neutron/sphinxsearch-api": "0.9.9" }, diff --git a/src/DataFormat/Hash/RabbitMQ/Wait.php b/src/DataFormat/Hash/RabbitMQ/Wait.php new file mode 100644 index 0000000..c039747 --- /dev/null +++ b/src/DataFormat/Hash/RabbitMQ/Wait.php @@ -0,0 +1,84 @@ +channel = $channel; + $this->channel->basic_consume($this->getQueue(), '', false, false, false, false, [$this, 'setMessage']); + + return $this; + } + + public function setMessage(AMQPMessage $message) + { + $this->message = $message; + + return $this; + } + + /** + * @inheritdoc + */ + public function formatData() + { + if ($this->message) { + $this->message->delivery_info['channel']->basic_ack($this->message->delivery_info['delivery_tag']); + unset($this->message); + } + + $this->channel->wait(); + + return json_decode($this->message->getBody()); + } + + /** + * @inheritdoc + */ + public function formatValue() + { + return null; + } + + /** + * @return mixed + */ + public function getQueue() + { + return $this->queue; + } + + /** + * @param mixed $queue + * @return $this + */ + public function setQueue($queue) + { + $this->queue = $queue; + + return $this; + } +} diff --git a/src/Query/RabbitMQ/Get.php b/src/Query/RabbitMQ/Get.php new file mode 100644 index 0000000..3acb47d --- /dev/null +++ b/src/Query/RabbitMQ/Get.php @@ -0,0 +1,74 @@ +error) { + return self::STATUS_ERROR; + } + + return self::STATUS_OK; + } + + /** + * @return string JSON + * @throws \Exception + */ + public function execute() + { + try { + $this->getResource()->basic_qos(null, 1, null); + + return $this->getResource(); + } catch (\Exception $error) { + $this->error = $error; + } + + return false; + } + + /** + * @return AMQPChannel + * @throws \Exception + */ + protected function getResource() + { + return parent::getResource(); + } + + public function getCountTotal() + { + } + public function getCount() + { + } + public function getLastId() + { + } +} diff --git a/src/Query/RabbitMQ/Set.php b/src/Query/RabbitMQ/Set.php new file mode 100644 index 0000000..5162c1d --- /dev/null +++ b/src/Query/RabbitMQ/Set.php @@ -0,0 +1,104 @@ +error) { + return self::STATUS_ERROR; + } + + return self::STATUS_OK; + } + + public function setMessage($message) + { + $this->message = $message; + + return $this; + } + + /** + * @return boolean + */ + public function execute() + { + try { + if ($this->message === null) { + throw new \Exception('Не указано сообщение'); + } + + $message = new AMQPMessage(); + $message->setBody(json_encode($this->message)); + $this->getResource()->basic_publish($message, '', $this->getQueue()); + + return true; + } catch (\Exception $error) { + $this->error = $error; + } + + return false; + } + + /** + * @return string + */ + public function getQueue() + { + return $this->queue; + } + + /** + * @param string $queue + * @return $this + */ + public function setQueue($queue) + { + $this->queue = $queue; + + return $this; + } + + /** + * @return AMQPChannel + * @throws \Exception + */ + protected function getResource() + { + return parent::getResource(); + } + + public function getCountTotal() + { + } + public function getCount() + { + } + public function getLastId() + { + } +} diff --git a/src/Resource/RabbitMQ.php b/src/Resource/RabbitMQ.php new file mode 100644 index 0000000..3298c60 --- /dev/null +++ b/src/Resource/RabbitMQ.php @@ -0,0 +1,149 @@ +host = $host; + + return $this; + } + + /** + * @param string|int $port + * @return self + */ + public function setPort($port) + { + $this->port = $port; + + return $this; + } + + /** + * @param string $user + * @return self + */ + public function setUser($user) + { + $this->user = $user; + + return $this; + } + + /** + * @param string $password + * @return self + */ + public function setPassword($password) + { + $this->password = $password; + + return $this; + } + + /** + * @return string + */ + public function getHost() + { + return $this->host; + } + + /** + * @return string|int + */ + public function getPort() + { + return $this->port; + } + + /** + * @return string + */ + public function getUser() + { + return $this->user; + } + + /** + * @return string + */ + public function getPassword() + { + return $this->password; + } + + /** + * @return AMQPChannel + */ + public function getHandle() + { + if (!$this->channel) { + $this->channel = $this->getConnect()->channel(); + } + + return $this->channel; + } + + /** + * @param string $queue + * @return $this + */ + public function queueDeclare($queue) + { + $this->getHandle()->queue_declare($queue, false, false, false, false); + + return $this; + } + + /** + * @return AMQPStreamConnection + */ + protected function getConnect() + { + if (!$this->connect) { + $this->connect = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); + } + + return $this->connect; + } + + public function disconnect() + { + } + public function setDatabase($database) + { + } + public function getDatabase() + { + } +}