Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.11.0
16 changes: 16 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true
}
]
}
9 changes: 9 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: 3

tasks:
test::debug:
cmds:
- pytest -s --log-cli-level=DEBUG ${@:1}
test:
cmds:
- pytest -s --log-cli-level=INFO ${@:1}
33 changes: 17 additions & 16 deletions demo_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,28 @@


class Counter:
count = 0
count: int
_node: RemoteNode

def __init__(self):
self.count = 0

def increment(self):
self.count += 1
# notify all registered clients
RemoteNode.notify_property_change('demo.Counter/count', self.count)
if self._node:
self._node.notify_property_changed("demo.Counter/count", self.count)


class CounterAdapter(IObjectSource):
node: RemoteNode = None
_node: RemoteNode

def __init__(self, impl):
self.impl = impl
# need to register this source with the registry
RemoteNode.register_source(self)

def olink_object_name(self):
# name this source is registered under
return 'demo.Counter'
return "demo.Counter"

def olink_invoke(self, name: str, args: list[Any]) -> Any:
# called on incoming invoke message
Expand All @@ -48,7 +50,7 @@ def olink_linked(self, name: str, node: "RemoteNode"):
self.impl._node = node

def olink_collect_properties(self) -> object:
return {k: getattr(self.impl, k) for k in ['count']}
return {k: getattr(self.impl, k) for k in ["count"]}


counter = Counter()
Expand All @@ -61,26 +63,27 @@ class RemoteEndpoint(WebSocketEndpoint):
queue = Queue()

async def sender(self, ws):
print('start sender')
print("start sender")
while True:
print('001')
print("001")
msg = await self.queue.get()
print('send', msg)
print("send", msg)
await ws.send_text(msg)
self.queue.task_done()

async def on_connect(self, ws: WebSocket):
print('on_connect')
print("on_connect")
asyncio.create_task(self.sender(ws))

def writer(msg: str):
print('writer', msg)
print("writer", msg)
self.queue.put_nowait(msg)

self.node.on_write(writer)
await super().on_connect(ws)

async def on_receive(self, ws: WebSocket, data: Any) -> None:
print('on_receive', data)
print("on_receive", data)
self.node.handle_message(data)

async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
Expand All @@ -89,8 +92,6 @@ async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
await self.queue.join()


routes = [
WebSocketRoute("/ws", RemoteEndpoint)
]
routes = [WebSocketRoute("/ws", RemoteEndpoint)]

app = Starlette(routes=routes)
23 changes: 11 additions & 12 deletions examples/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class CounterService:

def increment(self):
self.count += 1
self._node.notify_property_change('demo.Counter/count', self.count)
self._node.notify_property_changed("demo.Counter/count", self.count)


class CounterWebsocketAdapter(IObjectSource):
Expand All @@ -30,7 +30,7 @@ def __init__(self, impl):

def olink_object_name(self):
# return service name
return 'demo.Counter'
return "demo.Counter"

def olink_invoke(self, name: str, args: list[Any]) -> Any:
# handle the remote call from client node
Expand All @@ -42,7 +42,7 @@ def olink_invoke(self, name: str, args: list[Any]) -> Any:
result = func(**args)
except Exception as e:
# need to have proper exception handling here
print('error: %s' % e)
print("error: %s" % e)
result = None
# results will be send back to calling client node
return result
Expand All @@ -58,7 +58,7 @@ def olink_linked(self, name: str, node: "RemoteNode"):

def olink_collect_properties(self) -> object:
# collect properties from implementation to send back to client node initially
return {k: getattr(self.impl, k) for k in ['count']}
return {k: getattr(self.impl, k) for k in ["count"]}


# create the service implementation
Expand All @@ -77,31 +77,32 @@ class RemoteEndpoint(WebSocketEndpoint):

async def sender(self, ws):
# sender coroutine, messages from queue are send to client
print('start sender')
print("start sender")
while True:
msg = await self.queue.get()
print('send', msg)
print("send", msg)
await ws.send_text(msg)
self.queue.task_done()

async def on_connect(self, ws: WebSocket):
# handle a socket connection
print('on_connect')
print("on_connect")
# register a sender to the connection
asyncio.create_task(self.sender(ws))

# a writer function to queue messages
def writer(msg: str):
print('write to queue:', msg)
print("write to queue:", msg)
self.queue.put_nowait(msg)

# register the writer function to the node
self.node.on_write(writer)
# call the super connection handler
await super().on_connect(ws)

async def on_receive(self, ws: WebSocket, data: Any) -> None:
# handle a message from a client socket
print('on_receive', data)
print("on_receive", data)
self.node.handle_message(data)

async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
Expand All @@ -114,9 +115,7 @@ async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:


# see https://www.starlette.io/routing/
routes = [
WebSocketRoute("/ws", RemoteEndpoint)
]
routes = [WebSocketRoute("/ws", RemoteEndpoint)]


# call with `uvicorn server:app --port 8282`
Expand Down
6 changes: 6 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[pytest]
minversion = 6.0
addopts = -ra -q
testpaths =
tests
pythonpath = src
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[metadata]
name = olink-core
name = olink
version = 0.0.1
author = AppiGear
author_email = info@apigear.io
Expand Down
8 changes: 8 additions & 0 deletions src/olink/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .node import (
ClientNode as ClientNode,
InvokeReplyArg as InvokeReplyArg,
InvokeReplyFunc as InvokeReplyFunc,
)
from .registry import ClientRegistry as ClientRegistry
from .types import IObjectSink as IObjectSink
from .sink import AbstractSink as AbstractSink
109 changes: 109 additions & 0 deletions src/olink/client/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from re import A
from typing import Any, Callable, Optional
from olink.core import LogLevel, MsgType, BaseNode, Protocol
from .types import IObjectSink
from .registry import ClientRegistry
import logging


class InvokeReplyArg:
def __init__(self, name: str, value: Any):
self.name = name
self.value = value


InvokeReplyFunc = Callable[[InvokeReplyArg], None]


class ClientNode(BaseNode):
def __init__(self, registry: ClientRegistry):
super().__init__()
self._invokes_pending: dict[int, InvokeReplyFunc] = {}
self._requestId: int = 0
self._registry = registry

def registry(self) -> ClientRegistry:
return self._registry

def detach(self) -> None:
self.registry().remove_node(self)

def next_request_id(self) -> int:
self._requestId += 1
return self._requestId

def invoke_remote(
self, name: str, args: list[Any], func: Optional[InvokeReplyFunc]
) -> None:
self.emit_log(LogLevel.DEBUG, f"ClientNode.invoke_remote: {name} {args}")
request_id = self.next_request_id()
if func:
self._invokes_pending[request_id] = func
self.emit_write(Protocol.invoke_message(request_id, name, args))

def set_remote_property(self, name: str, value: Any) -> None:
# send remote property message
self.emit_log(LogLevel.DEBUG, f"ClientNode.set_remote_property: {name} {value}")
self.emit_write(Protocol.set_property_message(name, value))

def link_node(self, name: str):
# register this node to sink
logging.debug(f"ClientNode.linkNode: {name}")
self.registry().add_node(name, self)

def unlink_node(self, name: str) -> None:
# unregister this node from sink
self.registry().remove_node_from_sink(name, self)

def link_remote(self, name: str):
# register this node from sink and send a link message
self.emit_log(LogLevel.DEBUG, f"ClientNode.linkRemote: {name}")
self.registry().add_node(name, self)
self.emit_write(Protocol.link_message(name))

def unlink_remote(self, name: str):
# unlink this node from sink and send an unlink message
self.emit_log(LogLevel.DEBUG, f"ClientNode.unlink_remote: {name}")
self.emit_write(Protocol.unlink_message(name))
self.registry().remove_node_from_sink(name, self)

def handle_init(self, name: str, props: object):
# handle init message from source
self.emit_log(LogLevel.DEBUG, f"ClientNode.handle_init: {name}")
sink = self.registry().get_sink(name)
if sink:
sink.olink_on_init(name, props, self)

def handle_property_change(self, name: str, value: Any) -> None:
# handle property change message from source
self.emit_log(LogLevel.DEBUG, f"ClientNode.handle_property_change: {name}")
sink = self.registry().get_sink(name)
if sink:
sink.olink_on_property_changed(name, value)

def handle_invoke_reply(self, id: int, name: str, value: Any) -> None:
# handle invoke reply message from source
self.emit_log(
LogLevel.DEBUG, f"ClientNode.handle_invoke_reply: {id} {name} {value}"
)
if id in self._invokes_pending:
func = self._invokes_pending[id]
if func:
arg = InvokeReplyArg(name, value)
func(arg)
del self._invokes_pending[id]
else:
self.emit_log(LogLevel.DEBUG, f"no pending invoke: {id} {name}")

def handle_signal(self, name: str, args: list[Any]) -> None:
# handle signal message from source
self.emit_log(LogLevel.DEBUG, f"ClientNode.handle_signal: {name} {args}")
sink = self.registry().get_sink(name)
if sink:
sink.olink_on_signal(name, args)

def handle_error(self, msgType: MsgType, id: int, error: str):
# handle error message from source
self.emit_log(
LogLevel.DEBUG, f"ClientNode.handle_error: {msgType} {id} {error}"
)
Loading