Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions DependencyInjection/ClankExtensionClass.php
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public function load(array $configs, ContainerBuilder $container)
$this->setupSessionHandler($config['session_handler']);
}

if (isset($config['zmq']) && $config['zmq'])
{
$this->setupZMQ($config['zmq']);
}

}

private function setupWebSocketServer($config)
Expand Down Expand Up @@ -96,6 +101,14 @@ private function setupPeriodicServices($config)
$this->container->setParameter('jdare_clank.periodic_services', $config);
}

private function setupZMQ($config) {
$config = array_merge(array(
'port' => 5555, // Default ZMQ port.
'enabled' => false
), $config);
$this->container->setParameter('jdare_clank.zmq_configuration', $config);
}

public function getAlias()
{
return "clank";
Expand Down
1 change: 1 addition & 0 deletions README.md
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Resources
* [Periodic Services](Resources/docs/PeriodicSetup.md)(functions to be run every x seconds with the IO loop.)
* [Session Management](Resources/docs/SessionSetup.md)
* [Clank Server Events](Resources/docs/ClankEvents.md)
* [ZMQ Integration](Resources/docs/ZMQIntegration.md)

Code Cookbook
--------------
Expand Down
15 changes: 15 additions & 0 deletions Resources/config/services.xml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

<parameters>
<parameter key="jdare_clank.web_socket_server.class">JDare\ClankBundle\Server\Type\WebSocketServerType</parameter>
<parameter key="jdare_clank.zmq_dispatcher.class">JDare\ClankBundle\Zmq\ZmqDispatcher</parameter>
<parameter key="jdare_clank.web_socket_server.port" />
<parameter key="jdare_clank.web_socket_server.host" />


<parameter key="jdare_clank.periodic_services" type="collection" />
<!-- Sample Periodic Service, being executed every 5000ms (5s). Uncomment to enable -->
<!-- <parameter key="jdare_clank.periodic_sample_service">5000</parameter> -->
Expand All @@ -30,6 +32,11 @@

<parameter key="jdare_clank.session_handler">null</parameter>

<parameter key="jdare_clank.zmq_configuration" type="collection">
<parameter key="enabled">null</parameter>
<parameter key="port">5555</parameter>
</parameter>

<parameter key="jdare_clank.servers" type="collection">
<parameter>jdare_clank.web_socket_server</parameter>
</parameter>
Expand Down Expand Up @@ -62,6 +69,13 @@
</call>
</service>

<service id="jdare_clank.zmq.dispatcher" class="%jdare_clank.zmq_dispatcher.class%">
<call method="setContainer">
<argument type="service" id="service_container" />
</call>
</service>


<service id="jdare_clank.clank_app" class="JDare\ClankBundle\Server\App\ClankApp">
<argument type="service" id="jdare_clank.clank_handler_rpc" />
<argument type="service" id="jdare_clank.clank_handler_topic" />
Expand Down Expand Up @@ -92,6 +106,7 @@
<tag name="kernel.event_listener" event="clank.client.error" method="onClientError" />
</service>


<!-- Sample Services examples -->
<service id="jdare_clank.topic_sample_service" class="JDare\ClankBundle\Topic\AcmeTopic" />
<service id="jdare_clank.rpc_sample_service" class="JDare\ClankBundle\RPC\AcmeService" />
Expand Down
138 changes: 138 additions & 0 deletions Resources/docs/ZMQIntegration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Integration with ZMQ

Sometimes you would want to publish events from a controller to the Websocket server, for example a notification system.
See more of how it's integrated [here](http://socketo.me/docs/push).


## Server setup

First, you should Install ZMQ, and the PHP bindings. This should be installed on your server.

Download ZMQ:

- http://zeromq.org/area:download

And the php bindings:

- http://zeromq.org/bindings:php




## Configuration


Add `"react/zmq": "@stable"` to the `require` section of your composer.json file, and run
`composer update react/zmq`.



Add ZMQ as enabled:

```yaml
clank:
topic:
# Example topic service:
-
name: "notification"
service: "acme.topic.notification"
# Enable ZMQ
zmq:
enabled: true
port: 5555 # This should not be the same port as the Clank(Ratchet) server. Default is 5555
```

When starting the server with `clank:server` you should now see somthing like this:

```bash
peec@dev:$ php app/console clank:server
Starting Clank
Launching Ratchet WS Server on: 0.0.0.0:8088

Listening to ZMQ messages on tcp://127.0.0.1:5555
```




## Sending messages from the controller

From any controller, we can now send messages like this:

```php
$this->container->get('jdare_clank.zmq.dispatcher')->send(new \JDare\ClankBundle\Zmq\ZmqMessage(
"notification", // Reference the topic service associated with this message.
["hello" => "World"] // Some data
));
```

## Subscribing to ZMQ Messages.

```php
use JDare\ClankBundle\Topic\TopicInterface;
use JDare\ClankBundle\Zmq\ZMQMessageReciever;
use Ratchet\ConnectionInterface as Conn;
use Ratchet\Wamp\Topic;


class NotificationTopic implements TopicInterface, ZMQMessageReciever
{


// .. other onX functions here..

/**
* When a ZMQ message is recieved for this Topic, this handler is called.
* @param Topic $topic The Topic instance
* @param $data
* @return mixed
*/
public function onZMQMessage(Topic $topic, $data) {
print_r($data); // Prints the data recieved in the console. Only if someone has subscribed to this channel.
$topic->broadcast($data);
}

}
```

## Example simple client

- You must include the JS files, they are in the bundles Resources folder.
- Change `ws://127.0.0.1:8088` to your clank server definition.

```html
<!doctype html>
<html>
<head>
<script src="Clank.js"></script>
<script src="autobahn.min.js"></script>
</head>
<body>
<script>
var myClank = Clank.connect("ws://127.0.0.1:8088");

myClank.on("socket/connect", function(session){
//session is an Autobahn JS WAMP session.

console.log("Successfully Connected!");

//the callback function in "subscribe" is called everytime an event is published in that channel.
session.subscribe("notification/channel", function(uri, payload){
console.log("Received message", payload);
});
})

myClank.on("socket/disconnect", function(error){
//error provides us with some insight into the disconnection: error.reason and error.code

console.log("Disconnected for " + error.reason + " with code " + error.code);
})


</script>
</body>
</html>
```


Now open a controller where you use `jdare_clank.zmq.dispatcher` and you should see the message in the client (console.log).
11 changes: 11 additions & 0 deletions Server/App/ClankApp.php
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace JDare\ClankBundle\Server\App;

use JDare\ClankBundle\Zmq\ZmqMessage;
use Ratchet\ConnectionInterface as Conn;
use Ratchet\Wamp\WampServerInterface;

Expand Down Expand Up @@ -55,4 +56,14 @@ public function onError(Conn $conn, \Exception $e) {
$this->eventDispatcher->dispatch("clank.client.error", $event);
}


public function onZMQMessage ($entry) {
$zmqMessage = json_decode($entry, true);

if (isset($zmqMessage['name']) && isset($zmqMessage['data'])) {

$this->topicHandler->onZMQMessage(new ZmqMessage($zmqMessage['name'], $zmqMessage['data']));
}
}

}
48 changes: 45 additions & 3 deletions Server/App/Handler/TopicHandler.php
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace JDare\ClankBundle\Server\App\Handler;

use JDare\ClankBundle\Zmq\ZmqMessage;
use JDare\ClankBundle\Zmq\ZMQMessageReciever;
use Ratchet\ConnectionInterface as Conn;
use Ratchet\Wamp\Topic;

Expand All @@ -10,6 +12,9 @@ class TopicHandler implements TopicHandlerInterface
{
protected $topicServices, $container;

protected $subscribedTopics = array();


public function setTopicServices($topicServices)
{
$this->topicServices = $topicServices;
Expand All @@ -36,7 +41,36 @@ public function getContainer()
public function onSubscribe(Conn $conn, $topic)
{
//if topic service exists, notify it
$this->dispatch(__METHOD__, $conn, $topic);
if ($this->dispatch(__METHOD__, $conn, $topic)) {
$serviceMatch = $this->getTopicNamespace($topic);
if ($serviceMatch && !isset($this->subscribedTopics[$serviceMatch])) {
$this->subscribedTopics[$serviceMatch] = $topic;
}
}
}

public function onZMQMessage (ZmqMessage $message) {
$serviceMatch = $message->getName();
$handler = null;

// If the lookup topic object isn't set there is no one to publish to
if (!array_key_exists($serviceMatch, $this->subscribedTopics)) {
return;
}
$topic = $this->subscribedTopics[$serviceMatch];

foreach($this->getTopicServices() as $topicService)
{
if ($topicService['name'] === $serviceMatch)
$handler = $this->getContainer()->get($topicService['service']);
}

if ($handler) {
if ($handler instanceof ZMQMessageReciever) {
call_user_func(array($handler, 'onZMQMessage'), $topic, $message->getData());
}
}

}

public function onUnSubscribe(Conn $conn, $topic)
Expand Down Expand Up @@ -79,8 +113,8 @@ public function dispatch($event, Conn $conn, Topic $topic, $payload = null, $exc
return false;
}

public function getTopicHandler(Topic $topic)
{

private function getTopicNamespace (Topic $topic) {
//get network namespace to see if its valid
$parts = explode("/", $topic->getId());
if ($parts <= 0)
Expand All @@ -89,7 +123,15 @@ public function getTopicHandler(Topic $topic)
}

$serviceMatch = $parts[0];
return $serviceMatch;
}

public function getTopicHandler(Topic $topic)
{

$serviceMatch = $this->getTopicNamespace($topic);

if (!$serviceMatch) return;


foreach($this->getTopicServices() as $topicService)
Expand Down
20 changes: 18 additions & 2 deletions Server/Type/WebSocketServerType.php
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,23 @@ private function setupServer()
/** @var $loop \React\EventLoop\LoopInterface */

$this->loop = \React\EventLoop\Factory::create();


// Enable ZMQ Listener.
// Requires , "react/zmq" package.
$zmq = $this->getContainer()->getParameter('jdare_clank.zmq_configuration');
if ($zmq['enabled']) {
if (!class_exists('\ZMQContext')) {
throw new \Exception("Could not find ZMQContext, did you install the zmq bindings for PHP?");
}
// Listen for the web server to make a ZeroMQ push after an ajax request
$context = new \React\ZMQ\Context($this->loop);
$pull = $context->getSocket(\ZMQ::SOCKET_PULL);
$bind = "tcp://127.0.0.1:{$zmq['port']}";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the possibility in the config to listen on something else than 127.0.0.1 ? For example tcp://{$zmq['listen_to_host']}:{$zmq['port']}with listen_to_host set to *

Thanks !

echo "\nListening to ZMQ messages on $bind\n";
$pull->bind($bind); // Binding to 127.0.0.1 means the only client that can connect is itself
$pull->on('message', array($this->getContainer()->get("jdare_clank.clank_app"), 'onZMQMessage'));
}

$this->socket = new \React\Socket\Server($this->loop);

if ($this->host)
Expand Down Expand Up @@ -120,7 +136,7 @@ public function setPeriodicServices($services)

public function getAddress()
{
return (($this->host)?$this->host:"*") . ":" . $this->port;
return (($this->host)?$this->host:"0.0.0.0") . ":" . $this->port;
}

public function getName()
Expand Down
24 changes: 24 additions & 0 deletions Zmq/ZMQMessageReciever.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php
/**
* Created by PhpStorm.
* User: peecdesktop
* Date: 18.09.14
* Time: 19:56
*/

namespace JDare\ClankBundle\Zmq;


use Ratchet\Wamp\Topic;

interface ZMQMessageReciever {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a typo here : Shoud'nt it be ZMQMessageReceiver ?


/**
* When a ZMQ message is recieved for this Topic, this handler is called.
* @param Topic $topic The Topic instance
* @param $data
* @return mixed
*/
public function onZMQMessage (Topic $topic, $data);

}
Loading