Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"phpunit/phpunit": "~4.7",
"fabpot/php-cs-fixer": "~2.0@dev",
"couchbase/php-ext-couchbase": "dev-master",
"php-amqplib/php-amqplib": "2.*@stable",
"neutron/sphinxsearch-api": "0.9.9"
},

Expand Down
84 changes: 84 additions & 0 deletions src/DataFormat/Hash/RabbitMQ/Wait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php

namespace Imhonet\Connection\DataFormat\Hash\RabbitMQ;

use Imhonet\Connection\DataFormat\IArr;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;

class Wait implements IArr
{
/**
* @var AMQPChannel
*/
protected $channel;
/**
* @var array
*/
protected $cache;
/**
* @var AMQPMessage
*/
protected $message;
private $queue;

/**
* @inheritdoc
*/
public function setData($channel)
{
$this->channel = $channel;
$this->channel->basic_consume($this->getQueue(), '', false, false, false, false, [$this, 'setMessage']);

return $this;
}

public function setMessage(AMQPMessage $message)
{
$this->message = $message;

return $this;
}

/**
* @inheritdoc
*/
public function formatData()
{
if ($this->message) {
$this->message->delivery_info['channel']->basic_ack($this->message->delivery_info['delivery_tag']);
unset($this->message);
}

$this->channel->wait();

return json_decode($this->message->getBody());
}

/**
* @inheritdoc
*/
public function formatValue()
{
return null;
}

/**
* @return mixed
*/
public function getQueue()
{
return $this->queue;
}

/**
* @param mixed $queue
* @return $this
*/
public function setQueue($queue)
{
$this->queue = $queue;

return $this;
}
}
74 changes: 74 additions & 0 deletions src/Query/RabbitMQ/Get.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

/**
* Created by PhpStorm.
* User: Zolotukhin Sergey
* Date: 14.06.16
* Time: 11:53
*/
namespace Imhonet\Connection\Query\RabbitMQ;

use Imhonet\Connection\Query\Query;
use Imhonet\Connection\Resource\RabbitMQ;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;

class Get extends Query
{
/**
* @var RabbitMQ
*/
protected $resource;
/**
* @var AMQPMessage
*/
protected $message;

/**
* @return int bitmask of Imhonet\Connection\Query\IQuery::STATUS_*
*/
public function getErrorCode()
{
if ($this->error) {
return self::STATUS_ERROR;
}

return self::STATUS_OK;
}

/**
* @return string JSON
* @throws \Exception
*/
public function execute()
{
try {
$this->getResource()->basic_qos(null, 1, null);

return $this->getResource();
} catch (\Exception $error) {
$this->error = $error;
}

return false;
}

/**
* @return AMQPChannel
* @throws \Exception
*/
protected function getResource()
{
return parent::getResource();
}

public function getCountTotal()
{
}
public function getCount()
{
}
public function getLastId()
{
}
}
104 changes: 104 additions & 0 deletions src/Query/RabbitMQ/Set.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?php
/**
* Created by PhpStorm.
* User: Zolotukhin Sergey
* Date: 15.07.16
* Time: 14:50
*/

namespace Imhonet\Connection\Query\RabbitMQ;

use Imhonet\Connection\Query\Query;
use Imhonet\Connection\Resource\RabbitMQ;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;

class Set extends Query
{
protected $message;
private $queue;

/**
* @var RabbitMQ
*/
protected $resource;

/**
* @return int bitmask of Imhonet\Connection\Query\IQuery::STATUS_*
*/
public function getErrorCode()
{
if ($this->error) {
return self::STATUS_ERROR;
}

return self::STATUS_OK;
}

public function setMessage($message)
{
$this->message = $message;

return $this;
}

/**
* @return boolean
*/
public function execute()
{
try {
if ($this->message === null) {
throw new \Exception('Не указано сообщение');
}

$message = new AMQPMessage();
$message->setBody(json_encode($this->message));
$this->getResource()->basic_publish($message, '', $this->getQueue());

return true;
} catch (\Exception $error) {
$this->error = $error;
}

return false;
}

/**
* @return string
*/
public function getQueue()
{
return $this->queue;
}

/**
* @param string $queue
* @return $this
*/
public function setQueue($queue)
{
$this->queue = $queue;

return $this;
}

/**
* @return AMQPChannel
* @throws \Exception
*/
protected function getResource()
{
return parent::getResource();
}

public function getCountTotal()
{
}
public function getCount()
{
}
public function getLastId()
{
}
}
Loading