diff --git a/DependencyInjection/ClankExtensionClass.php b/DependencyInjection/ClankExtensionClass.php old mode 100644 new mode 100755 index 989b06e..26ff9a5 --- a/DependencyInjection/ClankExtensionClass.php +++ b/DependencyInjection/ClankExtensionClass.php @@ -58,6 +58,11 @@ public function load(array $configs, ContainerBuilder $container) $this->setupSessionHandler($config['session_handler']); } + if (isset($config['zmq']) && $config['zmq']) + { + $this->setupZMQ($config['zmq']); + } + } private function setupWebSocketServer($config) @@ -96,6 +101,14 @@ private function setupPeriodicServices($config) $this->container->setParameter('jdare_clank.periodic_services', $config); } + private function setupZMQ($config) { + $config = array_merge(array( + 'port' => 5555, // Default ZMQ port. + 'enabled' => false + ), $config); + $this->container->setParameter('jdare_clank.zmq_configuration', $config); + } + public function getAlias() { return "clank"; diff --git a/README.md b/README.md old mode 100644 new mode 100755 index 691fb06..c86a237 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Resources * [Periodic Services](Resources/docs/PeriodicSetup.md)(functions to be run every x seconds with the IO loop.) * [Session Management](Resources/docs/SessionSetup.md) * [Clank Server Events](Resources/docs/ClankEvents.md) +* [ZMQ Integration](Resources/docs/ZMQIntegration.md) Code Cookbook -------------- diff --git a/Resources/config/services.xml b/Resources/config/services.xml old mode 100644 new mode 100755 index 284a934..37f74d3 --- a/Resources/config/services.xml +++ b/Resources/config/services.xml @@ -6,9 +6,11 @@ JDare\ClankBundle\Server\Type\WebSocketServerType + JDare\ClankBundle\Zmq\ZmqDispatcher + @@ -30,6 +32,11 @@ null + + null + 5555 + + jdare_clank.web_socket_server @@ -62,6 +69,13 @@ + + + + + + + @@ -92,6 +106,7 @@ + diff --git a/Resources/docs/ZMQIntegration.md b/Resources/docs/ZMQIntegration.md new file mode 100755 index 0000000..e84d691 --- /dev/null +++ b/Resources/docs/ZMQIntegration.md @@ -0,0 +1,138 @@ +# Integration with ZMQ + +Sometimes you would want to publish events from a controller to the Websocket server, for example a notification system. +See more of how it's integrated [here](http://socketo.me/docs/push). + + +## Server setup + +First, you should Install ZMQ, and the PHP bindings. This should be installed on your server. + +Download ZMQ: + +- http://zeromq.org/area:download + +And the php bindings: + +- http://zeromq.org/bindings:php + + + + +## Configuration + + +Add `"react/zmq": "@stable"` to the `require` section of your composer.json file, and run +`composer update react/zmq`. + + + +Add ZMQ as enabled: + +```yaml +clank: + topic: + # Example topic service: + - + name: "notification" + service: "acme.topic.notification" + # Enable ZMQ + zmq: + enabled: true + port: 5555 # This should not be the same port as the Clank(Ratchet) server. Default is 5555 +``` + +When starting the server with `clank:server` you should now see somthing like this: + +```bash +peec@dev:$ php app/console clank:server +Starting Clank +Launching Ratchet WS Server on: 0.0.0.0:8088 + +Listening to ZMQ messages on tcp://127.0.0.1:5555 +``` + + + + +## Sending messages from the controller + +From any controller, we can now send messages like this: + +```php +$this->container->get('jdare_clank.zmq.dispatcher')->send(new \JDare\ClankBundle\Zmq\ZmqMessage( + "notification", // Reference the topic service associated with this message. + ["hello" => "World"] // Some data +)); +``` + +## Subscribing to ZMQ Messages. + +```php +use JDare\ClankBundle\Topic\TopicInterface; +use JDare\ClankBundle\Zmq\ZMQMessageReciever; +use Ratchet\ConnectionInterface as Conn; +use Ratchet\Wamp\Topic; + + +class NotificationTopic implements TopicInterface, ZMQMessageReciever +{ + + + // .. other onX functions here.. + + /** + * When a ZMQ message is recieved for this Topic, this handler is called. + * @param Topic $topic The Topic instance + * @param $data + * @return mixed + */ + public function onZMQMessage(Topic $topic, $data) { + print_r($data); // Prints the data recieved in the console. Only if someone has subscribed to this channel. + $topic->broadcast($data); + } + +} +``` + +## Example simple client + +- You must include the JS files, they are in the bundles Resources folder. +- Change `ws://127.0.0.1:8088` to your clank server definition. + +```html + + + + + + + + + + +``` + + +Now open a controller where you use `jdare_clank.zmq.dispatcher` and you should see the message in the client (console.log). \ No newline at end of file diff --git a/Server/App/ClankApp.php b/Server/App/ClankApp.php old mode 100644 new mode 100755 index c213a10..a5f609a --- a/Server/App/ClankApp.php +++ b/Server/App/ClankApp.php @@ -2,6 +2,7 @@ namespace JDare\ClankBundle\Server\App; +use JDare\ClankBundle\Zmq\ZmqMessage; use Ratchet\ConnectionInterface as Conn; use Ratchet\Wamp\WampServerInterface; @@ -55,4 +56,14 @@ public function onError(Conn $conn, \Exception $e) { $this->eventDispatcher->dispatch("clank.client.error", $event); } + + public function onZMQMessage ($entry) { + $zmqMessage = json_decode($entry, true); + + if (isset($zmqMessage['name']) && isset($zmqMessage['data'])) { + + $this->topicHandler->onZMQMessage(new ZmqMessage($zmqMessage['name'], $zmqMessage['data'])); + } + } + } \ No newline at end of file diff --git a/Server/App/Handler/TopicHandler.php b/Server/App/Handler/TopicHandler.php old mode 100644 new mode 100755 index 8834252..d557ffb --- a/Server/App/Handler/TopicHandler.php +++ b/Server/App/Handler/TopicHandler.php @@ -2,6 +2,8 @@ namespace JDare\ClankBundle\Server\App\Handler; +use JDare\ClankBundle\Zmq\ZmqMessage; +use JDare\ClankBundle\Zmq\ZMQMessageReciever; use Ratchet\ConnectionInterface as Conn; use Ratchet\Wamp\Topic; @@ -10,6 +12,9 @@ class TopicHandler implements TopicHandlerInterface { protected $topicServices, $container; + protected $subscribedTopics = array(); + + public function setTopicServices($topicServices) { $this->topicServices = $topicServices; @@ -36,7 +41,36 @@ public function getContainer() public function onSubscribe(Conn $conn, $topic) { //if topic service exists, notify it - $this->dispatch(__METHOD__, $conn, $topic); + if ($this->dispatch(__METHOD__, $conn, $topic)) { + $serviceMatch = $this->getTopicNamespace($topic); + if ($serviceMatch && !isset($this->subscribedTopics[$serviceMatch])) { + $this->subscribedTopics[$serviceMatch] = $topic; + } + } + } + + public function onZMQMessage (ZmqMessage $message) { + $serviceMatch = $message->getName(); + $handler = null; + + // If the lookup topic object isn't set there is no one to publish to + if (!array_key_exists($serviceMatch, $this->subscribedTopics)) { + return; + } + $topic = $this->subscribedTopics[$serviceMatch]; + + foreach($this->getTopicServices() as $topicService) + { + if ($topicService['name'] === $serviceMatch) + $handler = $this->getContainer()->get($topicService['service']); + } + + if ($handler) { + if ($handler instanceof ZMQMessageReciever) { + call_user_func(array($handler, 'onZMQMessage'), $topic, $message->getData()); + } + } + } public function onUnSubscribe(Conn $conn, $topic) @@ -79,8 +113,8 @@ public function dispatch($event, Conn $conn, Topic $topic, $payload = null, $exc return false; } - public function getTopicHandler(Topic $topic) - { + + private function getTopicNamespace (Topic $topic) { //get network namespace to see if its valid $parts = explode("/", $topic->getId()); if ($parts <= 0) @@ -89,7 +123,15 @@ public function getTopicHandler(Topic $topic) } $serviceMatch = $parts[0]; + return $serviceMatch; + } + + public function getTopicHandler(Topic $topic) + { + + $serviceMatch = $this->getTopicNamespace($topic); + if (!$serviceMatch) return; foreach($this->getTopicServices() as $topicService) diff --git a/Server/Type/WebSocketServerType.php b/Server/Type/WebSocketServerType.php old mode 100644 new mode 100755 index 369b2a5..379912f --- a/Server/Type/WebSocketServerType.php +++ b/Server/Type/WebSocketServerType.php @@ -46,7 +46,23 @@ private function setupServer() /** @var $loop \React\EventLoop\LoopInterface */ $this->loop = \React\EventLoop\Factory::create(); - + + // Enable ZMQ Listener. + // Requires , "react/zmq" package. + $zmq = $this->getContainer()->getParameter('jdare_clank.zmq_configuration'); + if ($zmq['enabled']) { + if (!class_exists('\ZMQContext')) { + throw new \Exception("Could not find ZMQContext, did you install the zmq bindings for PHP?"); + } + // Listen for the web server to make a ZeroMQ push after an ajax request + $context = new \React\ZMQ\Context($this->loop); + $pull = $context->getSocket(\ZMQ::SOCKET_PULL); + $bind = "tcp://127.0.0.1:{$zmq['port']}"; + echo "\nListening to ZMQ messages on $bind\n"; + $pull->bind($bind); // Binding to 127.0.0.1 means the only client that can connect is itself + $pull->on('message', array($this->getContainer()->get("jdare_clank.clank_app"), 'onZMQMessage')); + } + $this->socket = new \React\Socket\Server($this->loop); if ($this->host) @@ -120,7 +136,7 @@ public function setPeriodicServices($services) public function getAddress() { - return (($this->host)?$this->host:"*") . ":" . $this->port; + return (($this->host)?$this->host:"0.0.0.0") . ":" . $this->port; } public function getName() diff --git a/Zmq/ZMQMessageReciever.php b/Zmq/ZMQMessageReciever.php new file mode 100755 index 0000000..4acb8da --- /dev/null +++ b/Zmq/ZMQMessageReciever.php @@ -0,0 +1,24 @@ +container->getParameter('jdare_clank.zmq_configuration'); + + if (!$zmq['enabled']) { + throw new \Exception("ZMQ is not enabled. Add zmq:\nenabled: true\n to the configuration for clank. in config.yml"); + } + + foreach($this->container->get("jdare_clank.clank_handler_topic")->getTopicServices() as $service) { + if ($message->getName() == $service['name']) { + $found = true; + } + } + + if (false === $found) { + throw new \Exception("Could not find topic service with name {$message->getName()}. Name should be equal to the topic: - name in clank configuration (config.yml)."); + } + + $context = new \ZMQContext(); + $socket = $context->getSocket(\ZMQ::SOCKET_PUSH, 'my pusher'); + $socket->connect("tcp://localhost:{$zmq['port']}"); + + $socket->send(json_encode(array( + 'name' => $message->getName(), + 'data' => $message->getData() + ))); + } + +} \ No newline at end of file diff --git a/Zmq/ZmqMessage.php b/Zmq/ZmqMessage.php new file mode 100755 index 0000000..64f9aba --- /dev/null +++ b/Zmq/ZmqMessage.php @@ -0,0 +1,39 @@ +name = $name; + $this->data = $data; + } + + /** + * @return mixed Custom data. + */ + public function getData() { + return $this->data; + } + + /** + * @return string Name of the service for the Topic. + */ + public function getName() { + return $this->name; + } + + + +} \ No newline at end of file