Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 64 additions & 34 deletions src/pushi/net/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,41 @@ def unsubscribe(self, callback = None):
def latest(self, skip = 0, count = 10, callback = None):
self.owner.latest_pushi(self.name, skip = skip, count = count, callback = callback)

class PushiConnection(netius.clients.WSConnection):
class PushiProtocol(netius.clients.WSProtocol):

PUXIAPP_URL = "wss://puxiapp.com/"
""" The default PuxiApp URL that is going to be used
to establish new client's connections """

def __init__(self, *args, **kwargs):
netius.clients.WSConnection.__init__(self, *args, **kwargs)
netius.clients.WSProtocol.__init__(self, *args, **kwargs)
self.base_url = None
self.client_key = None
self.api = None
self.url = None
self.state = "disconnected"
self.socket_id = None
self.channels = dict()

def connect_pushi(
self,
url = None,
client_key = None,
api = None,
callback = None,
loop = None
):
cls = self.__class__

self.base_url = url or cls.PUXIAPP_URL
self.client_key = client_key
self.api = api
self.url = self.base_url + self.client_key

if callback: self.bind("connect_pushi", callback)

return self.connect_ws(self.url, loop = loop)

def receive_ws(self, data):
data = data.decode("utf-8")
data_j = json.loads(data)
Expand Down Expand Up @@ -273,8 +300,8 @@ def _subscribe_public(self, channel):
))

def _subscribe_private(self, channel, channel_data = None):
if not self.owner.api: raise RuntimeError("No private app available")
auth = self.owner.api.authenticate(channel, self.socket_id)
if not self.api: raise RuntimeError("No private app available")
auth = self.api.authenticate(channel, self.socket_id)
self.send_event("pusher:subscribe", dict(
channel = channel,
auth = auth,
Expand All @@ -300,31 +327,26 @@ def _is_private(self, channel):

class PushiClient(netius.clients.WSClient):

PUXIAPP_URL = "wss://puxiapp.com/"
""" The default puxiapp url that is going to be used
to establish new client's connections """

def __init__(self, url = None, client_key = None, api = None, *args, **kwargs):
netius.clients.WSClient.__init__(self, *args, **kwargs)
self.base_url = url or self.__class__.PUXIAPP_URL
self.client_key = client_key
self.api = api
self.url = self.base_url + self.client_key
protocol = PushiProtocol

def new_connection(self, socket, address, ssl = False):
return PushiConnection(
owner = self,
socket = socket,
address = address,
ssl = ssl
@classmethod
def connect_pushi_s(
cls,
url = None,
client_key = None,
api = None,
callback = None,
loop = None
):
protocol = cls.protocol()
return protocol.connect_pushi(
url = url,
client_key = client_key,
api = api,
callback = callback,
loop = loop
)

def connect_pushi(self, callback = None):
connection = self.connect_ws(self.url)
if not callback: return
connection.bind("connect_pushi", callback)
return connection

if __name__ == "__main__":
def on_message(channel, data, mid = None, timestamp = None):
print("Received %s" % data)
Expand All @@ -345,19 +367,27 @@ def on_subscribe(channel, data):
channel.bind("message", on_message)
channel.latest(count = 20, callback = on_latest)

def on_connect(connection):
connection.subscribe_pushi("global", callback = on_subscribe)
def on_connect(protocol):
protocol.subscribe_pushi("global", callback = on_subscribe)

def register_timer(client):
def register_timer(protocol):
def timer():
print("Waiting for events(s) on channel global ...")
client.delay(timer, timeout = 5)
client.delay(timer, timeout = 5)
protocol.delay(timer, timeout = 5)
protocol.delay(timer, timeout = 5)

url = netius.conf("PUSHI_URL")
client_key = netius.conf("PUSHI_KEY")
client = PushiClient(url = url, client_key = client_key)
client.connect_pushi(callback = on_connect)
register_timer(client)

loop, protocol = PushiClient.connect_pushi_s(
url = url,
client_key = client_key,
callback = on_connect
)

register_timer(protocol)

loop.run_forever()
loop.close()
else:
__path__ = []