diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..afad818 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.11.0 diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..306f58e --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": true + } + ] +} \ No newline at end of file diff --git a/Taskfile.yml b/Taskfile.yml new file mode 100644 index 0000000..95eaa04 --- /dev/null +++ b/Taskfile.yml @@ -0,0 +1,9 @@ +version: 3 + +tasks: + test::debug: + cmds: + - pytest -s --log-cli-level=DEBUG ${@:1} + test: + cmds: + - pytest -s --log-cli-level=INFO ${@:1} diff --git a/demo_server.py b/demo_server.py index 76f9d03..4aef3bf 100644 --- a/demo_server.py +++ b/demo_server.py @@ -11,26 +11,28 @@ class Counter: - count = 0 + count: int _node: RemoteNode + def __init__(self): + self.count = 0 + def increment(self): self.count += 1 # notify all registered clients - RemoteNode.notify_property_change('demo.Counter/count', self.count) + if self._node: + self._node.notify_property_changed("demo.Counter/count", self.count) class CounterAdapter(IObjectSource): - node: RemoteNode = None + _node: RemoteNode def __init__(self, impl): self.impl = impl - # need to register this source with the registry - RemoteNode.register_source(self) def olink_object_name(self): # name this source is registered under - return 'demo.Counter' + return "demo.Counter" def olink_invoke(self, name: str, args: list[Any]) -> Any: # called on incoming invoke message @@ -48,7 +50,7 @@ def olink_linked(self, name: str, node: "RemoteNode"): self.impl._node = node def olink_collect_properties(self) -> object: - return {k: getattr(self.impl, k) for k in ['count']} + return {k: getattr(self.impl, k) for k in ["count"]} counter = Counter() @@ -61,26 +63,27 @@ class RemoteEndpoint(WebSocketEndpoint): queue = Queue() async def sender(self, ws): - print('start sender') + print("start sender") while True: - print('001') + print("001") msg = await self.queue.get() - print('send', msg) + print("send", msg) await ws.send_text(msg) self.queue.task_done() async def on_connect(self, ws: WebSocket): - print('on_connect') + print("on_connect") asyncio.create_task(self.sender(ws)) def writer(msg: str): - print('writer', msg) + print("writer", msg) self.queue.put_nowait(msg) + self.node.on_write(writer) await super().on_connect(ws) async def on_receive(self, ws: WebSocket, data: Any) -> None: - print('on_receive', data) + print("on_receive", data) self.node.handle_message(data) async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None: @@ -89,8 +92,6 @@ async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None: await self.queue.join() -routes = [ - WebSocketRoute("/ws", RemoteEndpoint) -] +routes = [WebSocketRoute("/ws", RemoteEndpoint)] app = Starlette(routes=routes) diff --git a/examples/server.py b/examples/server.py index 19a082c..842d9ec 100644 --- a/examples/server.py +++ b/examples/server.py @@ -16,7 +16,7 @@ class CounterService: def increment(self): self.count += 1 - self._node.notify_property_change('demo.Counter/count', self.count) + self._node.notify_property_changed("demo.Counter/count", self.count) class CounterWebsocketAdapter(IObjectSource): @@ -30,7 +30,7 @@ def __init__(self, impl): def olink_object_name(self): # return service name - return 'demo.Counter' + return "demo.Counter" def olink_invoke(self, name: str, args: list[Any]) -> Any: # handle the remote call from client node @@ -42,7 +42,7 @@ def olink_invoke(self, name: str, args: list[Any]) -> Any: result = func(**args) except Exception as e: # need to have proper exception handling here - print('error: %s' % e) + print("error: %s" % e) result = None # results will be send back to calling client node return result @@ -58,7 +58,7 @@ def olink_linked(self, name: str, node: "RemoteNode"): def olink_collect_properties(self) -> object: # collect properties from implementation to send back to client node initially - return {k: getattr(self.impl, k) for k in ['count']} + return {k: getattr(self.impl, k) for k in ["count"]} # create the service implementation @@ -77,23 +77,24 @@ class RemoteEndpoint(WebSocketEndpoint): async def sender(self, ws): # sender coroutine, messages from queue are send to client - print('start sender') + print("start sender") while True: msg = await self.queue.get() - print('send', msg) + print("send", msg) await ws.send_text(msg) self.queue.task_done() async def on_connect(self, ws: WebSocket): # handle a socket connection - print('on_connect') + print("on_connect") # register a sender to the connection asyncio.create_task(self.sender(ws)) # a writer function to queue messages def writer(msg: str): - print('write to queue:', msg) + print("write to queue:", msg) self.queue.put_nowait(msg) + # register the writer function to the node self.node.on_write(writer) # call the super connection handler @@ -101,7 +102,7 @@ def writer(msg: str): async def on_receive(self, ws: WebSocket, data: Any) -> None: # handle a message from a client socket - print('on_receive', data) + print("on_receive", data) self.node.handle_message(data) async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None: @@ -114,9 +115,7 @@ async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None: # see https://www.starlette.io/routing/ -routes = [ - WebSocketRoute("/ws", RemoteEndpoint) -] +routes = [WebSocketRoute("/ws", RemoteEndpoint)] # call with `uvicorn server:app --port 8282` diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..7a447b5 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,6 @@ +[pytest] +minversion = 6.0 +addopts = -ra -q +testpaths = + tests +pythonpath = src diff --git a/setup.cfg b/setup.cfg index d30f298..f3473b4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [metadata] -name = olink-core +name = olink version = 0.0.1 author = AppiGear author_email = info@apigear.io diff --git a/src/olink/client/__init__.py b/src/olink/client/__init__.py new file mode 100644 index 0000000..24b25f3 --- /dev/null +++ b/src/olink/client/__init__.py @@ -0,0 +1,8 @@ +from .node import ( + ClientNode as ClientNode, + InvokeReplyArg as InvokeReplyArg, + InvokeReplyFunc as InvokeReplyFunc, +) +from .registry import ClientRegistry as ClientRegistry +from .types import IObjectSink as IObjectSink +from .sink import AbstractSink as AbstractSink diff --git a/src/olink/client/node.py b/src/olink/client/node.py new file mode 100644 index 0000000..c8f1c94 --- /dev/null +++ b/src/olink/client/node.py @@ -0,0 +1,109 @@ +from re import A +from typing import Any, Callable, Optional +from olink.core import LogLevel, MsgType, BaseNode, Protocol +from .types import IObjectSink +from .registry import ClientRegistry +import logging + + +class InvokeReplyArg: + def __init__(self, name: str, value: Any): + self.name = name + self.value = value + + +InvokeReplyFunc = Callable[[InvokeReplyArg], None] + + +class ClientNode(BaseNode): + def __init__(self, registry: ClientRegistry): + super().__init__() + self._invokes_pending: dict[int, InvokeReplyFunc] = {} + self._requestId: int = 0 + self._registry = registry + + def registry(self) -> ClientRegistry: + return self._registry + + def detach(self) -> None: + self.registry().remove_node(self) + + def next_request_id(self) -> int: + self._requestId += 1 + return self._requestId + + def invoke_remote( + self, name: str, args: list[Any], func: Optional[InvokeReplyFunc] + ) -> None: + self.emit_log(LogLevel.DEBUG, f"ClientNode.invoke_remote: {name} {args}") + request_id = self.next_request_id() + if func: + self._invokes_pending[request_id] = func + self.emit_write(Protocol.invoke_message(request_id, name, args)) + + def set_remote_property(self, name: str, value: Any) -> None: + # send remote property message + self.emit_log(LogLevel.DEBUG, f"ClientNode.set_remote_property: {name} {value}") + self.emit_write(Protocol.set_property_message(name, value)) + + def link_node(self, name: str): + # register this node to sink + logging.debug(f"ClientNode.linkNode: {name}") + self.registry().add_node(name, self) + + def unlink_node(self, name: str) -> None: + # unregister this node from sink + self.registry().remove_node_from_sink(name, self) + + def link_remote(self, name: str): + # register this node from sink and send a link message + self.emit_log(LogLevel.DEBUG, f"ClientNode.linkRemote: {name}") + self.registry().add_node(name, self) + self.emit_write(Protocol.link_message(name)) + + def unlink_remote(self, name: str): + # unlink this node from sink and send an unlink message + self.emit_log(LogLevel.DEBUG, f"ClientNode.unlink_remote: {name}") + self.emit_write(Protocol.unlink_message(name)) + self.registry().remove_node_from_sink(name, self) + + def handle_init(self, name: str, props: object): + # handle init message from source + self.emit_log(LogLevel.DEBUG, f"ClientNode.handle_init: {name}") + sink = self.registry().get_sink(name) + if sink: + sink.olink_on_init(name, props, self) + + def handle_property_change(self, name: str, value: Any) -> None: + # handle property change message from source + self.emit_log(LogLevel.DEBUG, f"ClientNode.handle_property_change: {name}") + sink = self.registry().get_sink(name) + if sink: + sink.olink_on_property_changed(name, value) + + def handle_invoke_reply(self, id: int, name: str, value: Any) -> None: + # handle invoke reply message from source + self.emit_log( + LogLevel.DEBUG, f"ClientNode.handle_invoke_reply: {id} {name} {value}" + ) + if id in self._invokes_pending: + func = self._invokes_pending[id] + if func: + arg = InvokeReplyArg(name, value) + func(arg) + del self._invokes_pending[id] + else: + self.emit_log(LogLevel.DEBUG, f"no pending invoke: {id} {name}") + + def handle_signal(self, name: str, args: list[Any]) -> None: + # handle signal message from source + self.emit_log(LogLevel.DEBUG, f"ClientNode.handle_signal: {name} {args}") + sink = self.registry().get_sink(name) + if sink: + sink.olink_on_signal(name, args) + + def handle_error(self, msgType: MsgType, id: int, error: str): + # handle error message from source + self.emit_log( + LogLevel.DEBUG, f"ClientNode.handle_error: {msgType} {id} {error}" + ) diff --git a/src/olink/client/registry.py b/src/olink/client/registry.py new file mode 100644 index 0000000..45cfb71 --- /dev/null +++ b/src/olink/client/registry.py @@ -0,0 +1,74 @@ +from .types import IObjectSink +from olink.core import Name, Base, LogLevel +from typing import Optional, TYPE_CHECKING +import logging + +if TYPE_CHECKING: + from .node import ClientNode + + +class SinkToClientEntry: + def __init__(self, sink=None): + self.sink: IObjectSink = sink + self.node: "ClientNode" = None + + +class ClientRegistry(Base): + def __init__(self) -> None: + super().__init__() + logging.debug("ClientRegistry.__init__") + self.entries: dict[str, SinkToClientEntry] = {} + + def remove_node(self, node: "ClientNode"): + logging.debug("ClientRegistry.removeNode") + # remove node from all sinks + for entry in self.entries.values(): + if entry.node is node: + entry.node = None + + def add_node(self, name: str, node: "ClientNode"): + # add not to named sink + self._entry(name).node = node + + def remove_node_from_sink(self, name: str, node: "ClientNode"): + # remove node from named sink + resource = Name.resource_from_name(name) + if resource in self.entries: + if self.entries[resource].node is node: + self.entries[resource].node = None + else: + self.emit_log( + LogLevel.DEBUG, f"unlink node failed, not the same node: {resource}" + ) + + def add_sink(self, sink: IObjectSink) -> "ClientNode": + # register sink using object name + name = sink.olink_object_name() + entry = self._entry(name) + entry.sink = sink + + def remove_sink(self, sink: IObjectSink): + # unregister sink using object name + name = sink.olink_object_name() + self._remove_entry(name) + + def get_sink(self, name: str) -> Optional[IObjectSink]: + # get sink using name + return self._entry(name).sink + + def get_node(self, name: str) -> Optional["ClientNode"]: + # get node using name + return self._entry(name).node + + def _entry(self, name: str) -> SinkToClientEntry: + # get an entry by name + resource = Name.resource_from_name(name) + if not resource in self.entries: + self.emit_log(LogLevel.DEBUG, f"add new resource: {resource}") + self.entries[resource] = SinkToClientEntry() + return self.entries[resource] + + def _remove_entry(self, name: str) -> None: + # remove an entry by name + resource = Name.resource_from_name(name) + del self.entries[resource] diff --git a/src/olink/client/sink.py b/src/olink/client/sink.py new file mode 100644 index 0000000..cdd3075 --- /dev/null +++ b/src/olink/client/sink.py @@ -0,0 +1,44 @@ +import asyncio +from typing import Any +from olink.core import Name +from .node import IObjectSink, ClientNode +from olink.core.hook import EventHook + + +class AbstractSink(IObjectSink): + def __init__(self, object_id: str): + self._object_id = object_id + self.on_property_changed = EventHook() + self._node: ClientNode = None + + async def _invoke(self, name, args): + future = asyncio.get_running_loop().create_future() + + def func(args): + return future.set_result(args.value) + + self.get_node().invoke_remote(f"{self._object_id}/{name}", args, func) + return await asyncio.wait_for(future, 500) + + def olink_object_name(self): + return self._object_id + + def olink_on_init(self, name: str, props: object, node: ClientNode): + self._node = node + for k in props: + setattr(self, k, props[k]) + + def olink_on_property_changed(self, name: str, value: Any) -> None: + path = Name.path_from_name(name) + setattr(self, name, value) + self.on_property_changed.fire(path, value) + + def olink_on_signal(self, name: str, args: list[Any]): + path = Name.path_from_name(name) + hook = getattr(self, f"on_{path}") + hook.fire(*args) + + def get_node(self) -> ClientNode: + if self._node is None: + raise Exception("Sink not linked to node") + return self._node diff --git a/src/olink/client/types.py b/src/olink/client/types.py new file mode 100644 index 0000000..fb151c6 --- /dev/null +++ b/src/olink/client/types.py @@ -0,0 +1,23 @@ +from typing import Any, Protocol as ProtocolType + +class IObjectSink(ProtocolType): + # interface for object sinks + def olink_object_name() -> str: + # return object name + raise NotImplementedError() + + def olink_on_signal(self, name: str, args: list[Any]) -> None: + # called on signal message + raise NotImplementedError() + + def olink_on_property_changed(self, name: str, value: Any) -> None: + # called on property changed message + raise NotImplementedError() + + def olink_on_init(self, name: str, props: object, node: "ClientNode"): + # called on init message + raise NotImplementedError() + + def olink_on_release(self) -> None: + # called when sink is released + raise NotImplementedError() \ No newline at end of file diff --git a/src/olink/clientnode.py b/src/olink/clientnode.py deleted file mode 100644 index 9ff6c79..0000000 --- a/src/olink/clientnode.py +++ /dev/null @@ -1,222 +0,0 @@ -from re import A -from typing import Any, Callable, Optional, Protocol as ProtocolType -from olink.core.types import Base, LogLevel, MsgType, Name -from olink.core.node import BaseNode -from olink.core.protocol import Protocol - - -class InvokeReplyArg: - def __init__(self, name: str, value: Any): - self.name = name - self.value = value - name: str - value: Any - - -InvokeReplyFunc = Callable[[InvokeReplyArg], None] - - -class IObjectSink(ProtocolType): - # interface for object sinks - def olink_object_name() -> str: - # return object name - raise NotImplementedError() - - def olink_on_signal(self, name: str, args: list[Any]) -> None: - # called on signal message - raise NotImplementedError() - - def olink_on_property_changed(self, name: str, value: Any) -> None: - # called on property changed message - raise NotImplementedError() - - def olink_on_init(self, name: str, props: object, node: "ClientNode"): - # called on init message - raise NotImplementedError() - - def olink_on_release(self) -> None: - # called when sink is released - raise NotImplementedError() - - -class SinkToClientEntry: - # entry in the client registry - sink: IObjectSink = None - node: "ClientNode" = None - - def __init__(self, sink=None): - self.sink = sink - self.node = None - - -class ClientRegistry(Base): - # client side registry to link sinks to nodes - entries: dict[str, SinkToClientEntry] = {} - - def remove_node(self, node: "ClientNode"): - # remove node from all sinks - for entry in self.entries.values(): - if entry.node is node: - entry.node = None - - def add_node_to_sink(self, name: str, node: "ClientNode"): - # add not to named sink - self._entry(name).node = node - - def remove_node_from_sink(self, name: str, node: "ClientNode"): - # remove node from named sink - resource = Name.resource_from_name(name) - if resource in self.entries: - if self.entries[resource].node is node: - self.entries[resource].node = None - else: - self.emit_log( - LogLevel.DEBUG, f"unlink node failed, not the same node: {resource}") - - def register_sink(self, sink: IObjectSink) -> "ClientNode": - # register sink using object name - name = sink.olink_object_name() - entry = self._entry(name) - entry.sink = sink - return entry.node - - def unregister_sink(self, sink: IObjectSink): - # unregister sink using object name - name = sink.olink_object_name() - self._remove_entry(name) - - def get_sink(self, name: str) -> Optional[IObjectSink]: - # get sink using name - return self._entry(name).sink - - def get_node(self, name: str) -> Optional["ClientNode"]: - # get node using name - return self._entry(name).node - - def _entry(self, name: str) -> SinkToClientEntry: - # get an entry by name - resource = Name.resource_from_name(name) - if not resource in self.entries: - self.emit_log(LogLevel.DEBUG, f"add new resource: {resource}") - self.entries[resource] = SinkToClientEntry() - return self.entries[resource] - - def _remove_entry(self, name: str) -> None: - # remove an entry by name - resource = Name.resource_from_name(name) - del self.entries[resource] - - -# global client registry -_registry = ClientRegistry() - - -def get_client_registry() -> ClientRegistry: - # get global client registry - return _registry - - -class ClientNode(BaseNode): - # client side node - invokes_pending: dict[int, InvokeReplyFunc] = {} - requestId = 0 - - def registry(self) -> ClientRegistry: - return get_client_registry() - - def detach(self) -> None: - self.registry().remove_node(self) - - def next_request_id(self) -> int: - self.requestId += 1 - return self.requestId - - def invoke_remote(self, name: str, args: list[Any], func: Optional[InvokeReplyFunc]) -> None: - self.emit_log(LogLevel.DEBUG, - f"ClientNode.invoke_remote: {name} {args}") - request_id = self.next_request_id() - if func: - self.invokes_pending[request_id] = func - self.emit_write(Protocol.invoke_message(request_id, name, args)) - - def set_remote_property(self, name: str, value: Any) -> None: - # send remote propertymessage - self.emit_log(LogLevel.DEBUG, - f"ClientNode.set_remote_property: {name} {value}") - self.emit_write(Protocol.set_property_message(name, value)) - - def link_node(self, name: str): - # register this node to sink - self.registry().add_node_to_sink(name, self) - - def unlink_node(self, name: str) -> None: - # unregister this node from sink - self.registry().remove_node_from_sink(name, self) - - @staticmethod - def register_sink(sink: IObjectSink) -> Optional["ClientNode"]: - # register sink to registry - return get_client_registry().register_sink(sink) - - @staticmethod - def unregister_sink(sink: IObjectSink) -> None: - # unregister sink from registry - return get_client_registry().unregister_sink(sink) - - @staticmethod - def get_sink(name: str) -> Optional[IObjectSink]: - # get sink from registry - return get_client_registry().get_sink(name) - - def link_remote(self, name: str): - # register this node from sink and send a link message - self.emit_log(LogLevel.DEBUG, f"ClientNode.linkRemote: {name}") - self.registry().add_node_to_sink(name, self) - self.emit_write(Protocol.link_message(name)) - - def unlink_remote(self, name: str): - # unlink this node froom sink and send an unlink message - self.emit_log(LogLevel.DEBUG, f"ClientNode.unlink_remote: {name}") - self.emit_write(Protocol.unlink_message(name)) - self.registry().remove_node_from_sink(name, self) - - def handle_init(self, name: str, props: object): - # handle init message from source - self.emit_log(LogLevel.DEBUG, f"ClientNode.handle_init: {name}") - sink = self.get_sink(name) - if sink: - sink.olink_on_init(name, props, self) - - def handle_property_change(self, name: str, value: Any) -> None: - # handle property change message from source - self.emit_log(LogLevel.DEBUG, - f"ClientNode.handle_property_change: {name}") - sink = self.get_sink(name) - if sink: - sink.olink_on_property_changed(name, value) - - def handle_invoke_reply(self, id: int, name: str, value: Any) -> None: - # handle invoke reply message from source - self.emit_log(LogLevel.DEBUG, - f"ClientNode.handle_invoke_reply: {id} {name} {value}") - if id in self.invokes_pending: - func = self.invokes_pending[id] - if func: - arg = InvokeReplyArg(name, value) - func(arg) - del self.invokes_pending[id] - else: - self.emit_log(LogLevel.DEBUG, f"no pending invoke: {id} {name}") - - def handle_signal(self, name: str, args: list[Any]) -> None: - # handle signal message from source - self.emit_log(LogLevel.DEBUG, - f"ClientNode.handle_signal: {name} {args}") - sink = self.get_sink(name) - if sink: - sink.olink_on_signal(name, args) - - def handle_error(self, msgType: MsgType, id: int, error: str): - # handle error message from source - self.emit_log(LogLevel.DEBUG, - f"ClientNode.handle_error: {msgType} {id} {error}") diff --git a/src/olink/core/__init__.py b/src/olink/core/__init__.py index e69de29..c253fa5 100644 --- a/src/olink/core/__init__.py +++ b/src/olink/core/__init__.py @@ -0,0 +1,4 @@ +from .hook import EventHook as EventHook +from .types import Name as Name, Base as Base, LogLevel as LogLevel, MsgType as MsgType +from .protocol import IProtocolListener as IProtocolListener, Protocol as Protocol +from .node import BaseNode as BaseNode diff --git a/src/olink/core/hook.py b/src/olink/core/hook.py new file mode 100644 index 0000000..a702554 --- /dev/null +++ b/src/olink/core/hook.py @@ -0,0 +1,15 @@ +class EventHook(object): + def __init__(self): + self._handlers: list[callable] = [] + + def __iadd__(self, handler): + self._handlers.append(handler) + return self + + def __isub__(self, handler): + self._handlers.remove(handler) + return self + + def fire(self, *args, **kwargs): + for handler in self._handlers: + handler(*args, **kwargs) diff --git a/src/olink/core/node.py b/src/olink/core/node.py index 63658a4..477b0a4 100644 --- a/src/olink/core/node.py +++ b/src/olink/core/node.py @@ -1,32 +1,34 @@ from typing import Any from olink.core.protocol import IProtocolListener, Protocol -from olink.core.types import Base, LogLevel, MessageConverter, MessageFormat, WriteMessageFunc +from olink.core.types import ( + Base, + LogLevel, + MessageConverter, + MessageFormat, + WriteMessageFunc, +) class BaseNode(Base, IProtocolListener): - # base node class - write_func: WriteMessageFunc = None - converter: MessageConverter = None - protocol: Protocol = None - def __init__(self): - super() - self.protocol = Protocol(self) - self.converter = MessageConverter(MessageFormat.JSON) + super().__init__() + self._protocol = Protocol(self) + self._converter = MessageConverter(MessageFormat.JSON) + self._write_func: WriteMessageFunc = None def on_write(self, func: WriteMessageFunc) -> None: # set the write function - self.write_func = func + self._write_func = func def emit_write(self, msg: list[Any]) -> None: # emit a message using the write function - if self.write_func: - data = self.converter.to_string(msg) - self.write_func(data) + if self._write_func: + data = self._converter.to_string(msg) + self._write_func(data) else: self.emit_log(LogLevel.DEBUG, f"write not set on protocol: {msg}") def handle_message(self, data: str) -> None: # handle a message and pass is on to the protocol - msg = self.converter.from_string(data) - self.protocol.handle_message(msg) + msg = self._converter.from_string(data) + self._protocol.handle_message(msg) diff --git a/src/olink/core/protocol.py b/src/olink/core/protocol.py index 38a5da6..f7c2a04 100644 --- a/src/olink/core/protocol.py +++ b/src/olink/core/protocol.py @@ -1,9 +1,9 @@ from typing import Any -from typing import Protocol as ProptocolType +from typing import Protocol as ProtocolType from .types import Base, LogLevel, MsgType -class IProtocolListener(ProptocolType): +class IProtocolListener(ProtocolType): # interface for protocol listeners def handle_link(self, name: str) -> None: # called when a link is created @@ -43,11 +43,9 @@ def handle_error(self, msgType: int, id: int, error: str) -> None: class Protocol(Base): - listener: IProtocolListener = None - def __init__(self, listener: IProtocolListener): super() - self.listener = listener + self._listener = listener @staticmethod def link_message(name: str) -> list[Any]: @@ -69,7 +67,7 @@ def set_property_message(name: str, value: Any) -> list[Any]: return [MsgType.SET_PROPERTY, name, value] @staticmethod - def property_change_message(name: str, value: Any) -> list[Any]: + def property_changed_message(name: str, value: Any) -> list[Any]: """signal property change to the client linked to the remote objects""" return [MsgType.PROPERTY_CHANGE, name, value] @@ -92,39 +90,38 @@ def error_message(msgType: MsgType, id: int, error: str) -> list[Any]: return [MsgType.ERROR, msgType, id, error] def handle_message(self, msg: list[Any]) -> bool: - if not self.listener: + if not self._listener: self.emit_log(LogLevel.DEBUG, "no listener installed") return False msgType = msg[0] if msgType == MsgType.LINK: _, name = msg - self.listener.handle_link(name) + self._listener.handle_link(name) elif msgType == MsgType.INIT: _, name, props = msg - self.listener.handle_init(name, props) + self._listener.handle_init(name, props) elif msgType == MsgType.UNLINK: _, name = msg - self.listener.handle_unlink(name) + self._listener.handle_unlink(name) elif msgType == MsgType.SET_PROPERTY: _, name, value = msg - self.listener.handle_set_property(name, value) + self._listener.handle_set_property(name, value) elif msgType == MsgType.PROPERTY_CHANGE: _, name, value = msg - self.listener.handle_property_change(name, value) + self._listener.handle_property_change(name, value) elif msgType == MsgType.INVOKE: _, id, name, args = msg - self.listener.handle_invoke(id, name, args) + self._listener.handle_invoke(id, name, args) elif msgType == MsgType.INVOKE_REPLY: _, id, name, value = msg - self.listener.handle_invoke_reply(id, name, value) + self._listener.handle_invoke_reply(id, name, value) elif msgType == MsgType.SIGNAL: _, name, args = msg - self.listener.handle_signal(name, args) + self._listener.handle_signal(name, args) elif msgType == MsgType.ERROR: _, msgType, id, error = msg - self.listener.handle_error(msgType, id, error) + self._listener.handle_error(msgType, id, error) else: - self.emit_log(LogLevel.DEBUG, - f"not supported message type: {msgType}") + self.emit_log(LogLevel.DEBUG, f"not supported message type: {msgType}") return False return True diff --git a/src/olink/core/types.py b/src/olink/core/types.py index 1b11b48..f433c7f 100644 --- a/src/olink/core/types.py +++ b/src/olink/core/types.py @@ -1,26 +1,26 @@ from enum import IntEnum from typing import Any, Callable -from typing import Protocol as ProptocolType +from typing import Protocol as ProtocolType import json class MsgType(IntEnum): - LINK = 10, - INIT = 11, - UNLINK = 12, - SET_PROPERTY = 20, - PROPERTY_CHANGE = 21, - INVOKE = 30, - INVOKE_REPLY = 31, - SIGNAL = 40, - ERROR = 90, + LINK = (10,) + INIT = (11,) + UNLINK = (12,) + SET_PROPERTY = (20,) + PROPERTY_CHANGE = (21,) + INVOKE = (30,) + INVOKE_REPLY = (31,) + SIGNAL = (40,) + ERROR = (90,) class MessageFormat(IntEnum): - JSON = 1, - BSON = 2, - MSGPACK = 3, - CBOR = 4, + JSON = (1,) + BSON = (2,) + MSGPACK = (3,) + CBOR = (4,) class Name: @@ -29,30 +29,27 @@ class Name: @staticmethod def resource_from_name(name: str) -> str: # return the resource name from a name - return name.split('/')[0] + return name.split("/")[0] @staticmethod def path_from_name(name: str) -> str: # return the path from a name - return name.split('/')[-1] + return name.split("/")[-1] @staticmethod def has_path(name: str) -> bool: # return true if name has a path - return '/' in name + return "/" in name @staticmethod def create_name(resource: str, path: str) -> str: # create a name from a resource and a path - return f'{resource}/{path}' + return f"{resource}/{path}" class MessageConverter: - # convert a message from/to a string - format: MessageFormat = MessageFormat.JSON - - def __init__(self, format: MessageFormat): - self.format = format + def __init__(self, format: MessageFormat = MessageFormat.JSON): + self._format = format def from_string(self, message: str) -> list[Any]: return json.loads(message) @@ -65,22 +62,23 @@ def to_string(self, data: list[Any]) -> str: class LogLevel: - DEBUG = 1, - INFO = 2, - WARNING = 3, - ERROR = 4, + DEBUG = (1,) + INFO = (2,) + WARNING = (3,) + ERROR = (4,) WriteLogFunc = Callable[[LogLevel, str], None] -class ILogger(ProptocolType): +class ILogger(ProtocolType): def log(level: LogLevel, msg: str) -> None: raise NotImplementedError() class Base: - log_func: WriteLogFunc = None + def __init__(self) -> None: + self.log_func: WriteLogFunc = None def on_log(self, func: WriteLogFunc): self.log_func = func diff --git a/src/olink/mocks/mocksink.py b/src/olink/mocks/mocksink.py index cc11322..7e2e6f4 100644 --- a/src/olink/mocks/mocksink.py +++ b/src/olink/mocks/mocksink.py @@ -1,46 +1,46 @@ from olink.core.types import Name from typing import Any, Optional -from olink.clientnode import ClientNode, IObjectSink, InvokeReplyArg +from olink.client import ClientNode, IObjectSink, InvokeReplyArg + class MockSink(IObjectSink): - name: str - events: list[Any] = [] - node: Optional[ClientNode] = None - properties: dict[str, Any] = {} def __init__(self, name: str): self.name = name - self.node = ClientNode.register_sink(self) + self.events: list[Any] = [] + self.node: Optional[ClientNode] = None + self.properties: dict[str, Any] = {} def invoke(self, name: str, args: list[Any]): if self.node: + def func(arg: InvokeReplyArg): - self.events.append({'type': 'invoke-reply', 'name': arg.name, 'value': arg.value}) + self.events.append( + {"type": "invoke-reply", "name": arg.name, "value": arg.value} + ) + self.node.invoke_remote(name, args, func) - + def olink_object_name(self) -> str: return self.name def olink_on_signal(self, name: str, args: list[Any]) -> None: - self.events.append({'type': 'signal', 'name': name, 'args': args}) + self.events.append({"type": "signal", "name": name, "args": args}) def olink_on_property_changed(self, name: str, value: Any) -> None: path = Name.path_from_name(name) - self.events.append({ 'type': 'property_change', 'name': name, 'value': value}) + self.events.append({"type": "property_change", "name": name, "value": value}) self.properties[path] = value def olink_on_init(self, name: str, props: object, node: "ClientNode"): - self.events.append({'type': 'init', 'name': name, 'props': props}) + self.events.append({"type": "init", "name": name, "props": props}) self.node = node self.properties = props def olink_on_release(self) -> None: - self.events.append({'type': 'release'}) + self.events.append({"type": "release"}) self.node = None def clear(self): self.events = [] self.properties = {} self.node = None - - - diff --git a/src/olink/mocks/mocksource.py b/src/olink/mocks/mocksource.py index 4d65094..25c22db 100644 --- a/src/olink/mocks/mocksource.py +++ b/src/olink/mocks/mocksource.py @@ -1,50 +1,48 @@ from olink.core.types import Name from typing import Any -from olink.remotenode import IObjectSource, RemoteNode +from olink.remote import IObjectSource, RemoteNode -class MockSource(IObjectSource): - name: str - events: list[Any] = [] - properties: dict[str, Any] = {} - node: RemoteNode = None +class MockSource(IObjectSource): def __init__(self, name: str): self.name = name + self.events: list[Any] = [] + self.properties: dict[str, Any] = {} + self.node: RemoteNode = None def set_property(self, name: str, value: Any): - RemoteNode.notify_property_change(name, value) + path = Name.path_from_name(name) + if self.properties.get(path) != value: + self.properties[path] = value + self.get_node().notify_property_changed(name, value) def notify_signal(self, name: str, args: list[Any]): - RemoteNode.notify_signal(name, args) + self.get_node().notify_signal(name, args) def olink_object_name(self) -> str: return self.name def olink_invoke(self, name: str, args: list[Any]): - self.events.append({ 'type': 'invole', 'name': name, 'args': args }) + self.events.append({"type": "invoke", "name": name, "args": args}) return name def olink_set_property(self, name: str, value: Any): - path = Name.path_from_name(name) - self.events.append({'type': 'set_property', 'name': name, 'value': value}) - if not path in self.properties: - # assign new value - self.properties[path] = value - RemoteNode.notify_property_change(name, value) - else: - # update existing value - if not self.properties[path] == value: - self.properties[path] = value - RemoteNode.notify_property_change(name, value) - + self.events.append({"type": "set_property", "name": name, "value": value}) + self.set_property(name, value) + def olink_linked(self, name: str, node: RemoteNode): - self.events.append({'type': 'linked', 'name': name}) self.node = node + self.events.append({"type": "linked", "name": name}) def olink_collect_properties(self) -> object: return self.properties + def get_node(self) -> RemoteNode: + if not self.node: + raise Exception("Node not set") + return self.node + def clear(self): self.events = [] self.properties = {} - self.node = None \ No newline at end of file + self.node = None diff --git a/src/olink/remote/__init__.py b/src/olink/remote/__init__.py new file mode 100644 index 0000000..5ca01a0 --- /dev/null +++ b/src/olink/remote/__init__.py @@ -0,0 +1,4 @@ +from .registry import RemoteRegistry as RemoteRegistry +from .node import RemoteNode as RemoteNode +from .types import IObjectSource as IObjectSource +from .adapter import SourceAdapter as SourceAdapter diff --git a/src/olink/remote/adapter.py b/src/olink/remote/adapter.py new file mode 100644 index 0000000..82777e9 --- /dev/null +++ b/src/olink/remote/adapter.py @@ -0,0 +1,50 @@ +from typing import Any +from olink.core.types import Name +from .node import RemoteNode +from .types import IObjectSource +import logging + + +class SourceAdapter(IObjectSource): + def __init__(self, objectId: str, impl) -> None: + self._object_id = objectId + self.impl = impl + self.impl._change += self.on_change + self.impl._emit += self.on_emit + self._node: RemoteNode = None + + def on_emit(self, path: str, args: list[Any]): + name = Name.create_name(self._object_id, path) + RemoteNode.notify_signal(name, args) + + def on_change(self, name: str, value: Any): + name = Name.create_name(self._object_id, name) + RemoteNode.notify_property_changed(name, value) + + def olink_object_name(self) -> str: + return self._object_id + + def olink_invoke(self, name: str, args: list[Any]) -> Any: + path = Name.path_from_name(name) + func = getattr(self.impl, path) + try: + result = func(*args) + except Exception as e: + logging.exception(e) + print("error: %s" % e) + result = None + raise e + return result + + def olink_set_property(self, name: str, value: Any): + # set property value on implementation + path = Name.path_from_name(name) + setattr(self.impl, path, value) + + def olink_linked(self, name: str, node: "RemoteNode"): + # called when the source is linked to a client node + self._node = node + + def olink_collect_properties(self) -> object: + # collect properties from implementation to send back to client node initially + return {k: getattr(self.impl, k) for k in ["count"]} diff --git a/src/olink/remote/node.py b/src/olink/remote/node.py new file mode 100644 index 0000000..98da84f --- /dev/null +++ b/src/olink/remote/node.py @@ -0,0 +1,59 @@ +from olink.core import BaseNode, Name, Protocol +from typing import Any +from .registry import RemoteRegistry +from .types import IObjectSource + + +class RemoteNode(BaseNode): + def __init__(self, registry: RemoteRegistry): + # initialise node and attaches this node to registry + super().__init__() + self._registry = registry + + def detach(self): + # detach this node from registry + self.registry().remove_node(self) + + def handle_link(self, name: str) -> None: + # handle link message from client node + # sends init message to client node + source = self.get_source(name) + if source: + self.registry().add_node(name, self) + source.olink_linked(name, self) + props = source.olink_collect_properties() + self.emit_write(Protocol.init_message(name, props)) + + def handle_unlink(self, name: str): + # unlinks names source from registry + source = self.get_source(name) + if source: + self.registry().remove_node_from_source(name, self) + + def handle_set_property(self, name: str, value: Any): + # handle set property message from client node + # calls set property on source + source = self.get_source(name) + if source: + source.olink_set_property(name, value) + + def handle_invoke(self, id: int, name: str, args: list[Any]) -> None: + # handle invoke message from client node + # calls invoke on source + # returns invoke reply message to client node + source = self.get_source(name) + if source: + value = source.olink_invoke(name, args) + self.emit_write(Protocol.invoke_reply_message(id, name, value)) + + def registry(self) -> RemoteRegistry: + return self._registry + + def get_source(self, name: str) -> IObjectSource: + return self.registry().get_source(name) + + def notify_property_changed(self, name: str, value: Any) -> None: + self.registry().notify_property_changed(name, value) + + def notify_signal(self, name: str, args: list[Any]) -> None: + self.registry().notify_signal(name, args) diff --git a/src/olink/remote/registry.py b/src/olink/remote/registry.py new file mode 100644 index 0000000..17b67f6 --- /dev/null +++ b/src/olink/remote/registry.py @@ -0,0 +1,101 @@ +from olink.core import Name, Base, LogLevel, Protocol +from typing import Any, TYPE_CHECKING +from .types import IObjectSource + +if TYPE_CHECKING: + from .node import RemoteNode + + +class SourceToNodeEntry: + def __init__(self, source=None): + self.source: IObjectSource = source + self.nodes: set["RemoteNode"] = set() + + +class RemoteRegistry(Base): + def __init__(self) -> None: + super().__init__() + self._entries: dict[str, SourceToNodeEntry] = {} + + def add_source(self, source: IObjectSource): + # register a new source in the registry + name = source.olink_object_name() + self.emit_log(LogLevel.DEBUG, f"RemoteRegistry.add_object_source: {name}") + self._entry(name).source = source + + def remove_source(self, source: IObjectSource): + # remove the given source from the registry + name = source.olink_object_name() + self._remove_entry(name) + + def get_source(self, name: str): + # return the source for the given name + return self._entry(name).source + + def get_nodes(self, name: str): + # return nodes attached to the named source + return self._entry(name).nodes + + def remove_node(self, node: "RemoteNode"): + # remove the given node from the registry + self.emit_log(LogLevel.DEBUG, "RemoteRegistry.detach_remote_node") + for entry in self._entries.values(): + if node in entry.nodes: + entry.nodes.remove(node) + + def add_node(self, name: str, node: "RemoteNode"): + # add a node to the named source + entry = self._entry(name) + if not node in entry.nodes: + entry.nodes.add(node) + + def remove_node_from_source(self, name: str, node: "RemoteNode"): + # remove the given node from the named source + self._entry(name).nodes.remove(node) + + def _entry(self, name: str) -> SourceToNodeEntry: + # returns the entry for the given resource part of the name + resource = Name.resource_from_name(name) + if not resource in self._entries: + self.emit_log(LogLevel.DEBUG, f"add new resource: {resource}") + self._entries[resource] = SourceToNodeEntry() + return self._entries[resource] + + def _remove_entry(self, name: str) -> None: + # remove entry from registry + resource = Name.resource_from_name(name) + if resource in self._entries: + del self._entries[resource] + else: + self.emit_log( + LogLevel.DEBUG, + f"remove resource failed, resource not exists: {resource}", + ) + + def _has_entry(self, name: str) -> SourceToNodeEntry: + # checks if the registry has an entry for the given name + resource = Name.resource_from_name(name) + return resource in self._entries + + def init_entry(self, name: str): + # init a new entry for the given name + resource = Name.resource_from_name(name) + if resource in self._entries: + self._entries[resource] = SourceToNodeEntry() + + def clear(self): + self._entries.clear() + + def notify_property_changed(self, name: str, value: Any): + # notify property change to all named client nodes + resource = Name.resource_from_name(name) + for node in self.get_nodes(resource): + msg = Protocol.property_changed_message(name, value) + node.emit_write(msg) + + def notify_signal(self, name: str, args: tuple): + # notify signal to all named client nodes + resource = Name.resource_from_name(name) + for node in self.get_nodes(resource): + msg = Protocol.signal_message(name, args) + node.emit_write(msg) diff --git a/src/olink/remote/types.py b/src/olink/remote/types.py new file mode 100644 index 0000000..8d1ba75 --- /dev/null +++ b/src/olink/remote/types.py @@ -0,0 +1,24 @@ +from typing import Any, Protocol as ProtocolType + +class IObjectSource(ProtocolType): + # interface for object sources + def olink_object_name() -> str: + # returns the object name + raise NotImplementedError() + + def olink_invoke(self, name: str, args: list[Any]): + # called on incoming invoke message + # returns resulting value + raise NotImplementedError() + + def olink_set_property(self, name: str, value: Any): + # called on incoming set property message + raise NotImplementedError() + + def olink_linked(self, name: str, node: "RemoteNode"): + # called when a remote node is linked to this node + raise NotImplementedError() + + def olink_collect_properties(self) -> object: + # returns a dictionary of all properties + raise NotImplementedError() \ No newline at end of file diff --git a/src/olink/remotenode.py b/src/olink/remotenode.py deleted file mode 100644 index b475d45..0000000 --- a/src/olink/remotenode.py +++ /dev/null @@ -1,193 +0,0 @@ -from olink.core.protocol import Protocol -from olink.core.types import Base, LogLevel, Name -from olink.core.node import BaseNode - -from typing import Any, Protocol as ProtocolType - - -class IObjectSource(ProtocolType): - # interface for object sources - def olink_object_name() -> str: - # returns the object name - raise NotImplementedError() - - def olink_invoke(self, name: str, args: list[Any]): - # called on incoming invoke message - # returns resulting value - raise NotImplementedError() - - def olink_set_property(self, name: str, value: Any): - # called on incoming set property message - raise NotImplementedError() - - def olink_linked(self, name: str, node: "RemoteNode"): - # called when a remote node is linked to this node - raise NotImplementedError() - - def olink_collect_properties(self) -> object: - # returns a dictionary of all properties - raise NotImplementedError() - - -class SourceToNodeEntry: - # entry in the remote registry - source: IObjectSource = None - nodes: set["RemoteNode"] = set() - - def __init__(self, source=None): - self.source = source - self.nodes = set() - - -class RemoteRegistry(Base): - # registry of remote sources - # links sources to nodes - entries: dict[str, SourceToNodeEntry] = {} - - def add_source(self, source: IObjectSource): - # add a source to registry by object name - name = source.olink_object_name() - self.emit_log(LogLevel.DEBUG, - f"RemoteRegistry.add_object_source: {name}") - self._entry(name).source = source - - def remove_source(self, source: IObjectSource): - # remove the given source from the registry - name = source.olink_object_name() - self._remove_entry(name) - - def get_source(self, name: str): - # return the source for the given name - return self._entry(name).source - - def get_nodes(self, name: str): - # return nodes attached to the named source - return self._entry(name).nodes - - def remove_node(self, node: "RemoteNode"): - # remove the given node from the registry - self.emit_log(LogLevel.DEBUG, "RemoteRegistry.detach_remote_node") - for entry in self.entries.values(): - if node in entry.nodes: - entry.nodes.remove(node) - - def add_node_to_source(self, name: str, node: "RemoteNode"): - # add a node to the named source - self._entry(name).nodes.add(node) - - def remove_node_from_source(self, name: str, node: "RemoteNode"): - # remove the given node from the named source - self._entry(name).nodes.remove(node) - - def _entry(self, name: str) -> SourceToNodeEntry: - # returns the entry for the given resource part of the name - resource = Name.resource_from_name(name) - if not resource in self.entries: - self.emit_log(LogLevel.DEBUG, f"add new resource: {resource}") - self.entries[resource] = SourceToNodeEntry() - return self.entries[resource] - - def _remove_entry(self, name: str) -> None: - # remove entry from registry - resource = Name.resource_from_name(name) - if resource in self.entries: - del self.entries[resource] - else: - self.emit_log( - LogLevel.DEBUG, f'remove resource failed, resource not exists: {resource}') - - def _has_entry(self, name: str) -> SourceToNodeEntry: - # checks if the registry has an entry for the given name - resource = Name.resource_from_name(name) - return resource in self.entries - - def init_entry(self, name: str): - # init a new entry for the given name - resource = Name.resource_from_name(name) - if resource in self.entries: - self.entries[resource] = SourceToNodeEntry() - - def clear(self): - self.entries = {} - - -_registry = RemoteRegistry() - - -def get_remote_registry() -> RemoteRegistry: - # returns the remote registry - return _registry - - -class RemoteNode(BaseNode): - # a remote node is a node that is linked to a remote source - def __init__(self): - # initialise node and attaches this node to registry - super().__init__() - - def detach(self): - # detach this node from registry - self.registry().remove_node(self) - - def handle_link(self, name: str) -> None: - # handle link message from client node - # sends init message to client node - source = RemoteNode.get_source(name) - if source: - self.registry().add_node_to_source(name, self) - source.olink_linked(name, self) - props = source.olink_collect_properties() - self.emit_write(Protocol.init_message(name, props)) - - def handle_unlink(self, name: str): - # unlinks names source from registry - source = self.get_source(name) - if source: - self.registry().remove_node_from_source(name, self) - - def handle_set_property(self, name: str, value: Any): - # handle set property message from client node - # calls set property on source - source = self.get_source(name) - if source: - source.olink_set_property(name, value) - - def handle_invoke(self, id: int, name: str, args: list[Any]) -> None: - # handle invoke message from client node - # calls invoke on source - # returns invoke reply message to client node - source = self.get_source(name) - if source: - value = source.olink_invoke(name, args) - self.emit_write(Protocol.invoke_reply_message(id, name, value)) - - def registry(self) -> RemoteRegistry: - # returns global registry - return get_remote_registry() - - @staticmethod - def get_source(name) -> IObjectSource: - # get object source from registry - return get_remote_registry().get_source(name) - - @staticmethod - def register_source(source: IObjectSource): - # add object source to registry - return get_remote_registry().add_source(source) - - @staticmethod - def unregister_source(source: IObjectSource): - # remove object source from registry - return get_remote_registry().remove_source(source) - - @staticmethod - def notify_property_change(name: str, value: Any) -> None: - # notify property change to all named client nodes - for node in get_remote_registry().get_nodes(name): - node.emit_write(Protocol.property_change_message(name, value)) - - @staticmethod - def notify_signal(name: str, args: list[Any]): - # notify signal to all named client nodes - for node in get_remote_registry().get_nodes(name): - node.emit_write(Protocol.signal_message(name, args)) diff --git a/src/olink/ws/__init__.py b/src/olink/ws/__init__.py new file mode 100644 index 0000000..5341ff3 --- /dev/null +++ b/src/olink/ws/__init__.py @@ -0,0 +1,2 @@ +from .conn import Connection as Connection +from .server import Server as Server diff --git a/src/olink/ws/conn.py b/src/olink/ws/conn.py new file mode 100644 index 0000000..4a431fc --- /dev/null +++ b/src/olink/ws/conn.py @@ -0,0 +1,79 @@ +import asyncio +from pyee.asyncio import AsyncIOEventEmitter +import logging +import websockets as ws + +log = logging.getLogger(__name__) + + +class Connection(AsyncIOEventEmitter): + def __init__( + self, cancel: asyncio.Event, socket: ws.WebSocketServerProtocol = None + ): + super().__init__() + self._cancel = cancel + self._socket = socket + self._send_queue = asyncio.Queue() + self._receiver_task = None + self._sender_task = None + if socket: # we are server + log.info("server conn init") + self._receiver_task = asyncio.create_task( + self._receiver(), name="server conn receiver" + ) + self._sender_task = asyncio.create_task( + self._sender(), name="server conn sender" + ) + + async def connect(self, addr: str): + log.info("client connected") + async with ws.connect(addr) as self._socket: + self._receiver_task = asyncio.create_task( + self._receiver(), name="client receiver" + ) + self._sender_task = asyncio.create_task( + self._sender(), name="client sender" + ) + + async def _sender(self): + while not self._cancel.is_set(): + try: + msg = await self._send_queue.get() + if not msg: + log.info("sender got None, closing") + break + await self._socket.send(msg) + self._send_queue.task_done() + except Exception as e: + log.info("connection sender closing: %s", e) + break + log.info("sender done") + + async def _receiver(self): + while not self._cancel.is_set(): + try: + msg = await self._socket.recv() + self.emit("message", msg) + except Exception: + log.info("Connection closed") + break + log.info("receiver done") + + def send(self, data): + self._send_queue.put_nowait(data) + + async def send_async(self, data): + await self._send_queue.put(data) + + async def cancel(self): + self._cancel.set() + if self._socket: + await self._socket.close() + self._socket = None + self._send_queue.put_nowait(None) + if self._receiver_task: + await self._receiver_task + if self._sender_task: + await self._sender_task + self._receiver_task = None + self._sender_task = None diff --git a/src/olink/ws/server.py b/src/olink/ws/server.py new file mode 100644 index 0000000..af3b6fd --- /dev/null +++ b/src/olink/ws/server.py @@ -0,0 +1,51 @@ +import websockets as ws +from typing import Any +import asyncio +from olink.remote import RemoteNode, RemoteRegistry +import logging +from .conn import Connection + + +class RemoteHandler: + def __init__(self, conn: Connection, node: RemoteNode) -> None: + self.conn = conn + self.node = node + self.conn.on("message", self.on_recv_message) + self.node.on_write(self.on_send_message) + + def on_recv_message(self, msg: str): + self.node.handle_message(msg) + + def on_send_message(self, msg: str): + self.conn.send(msg) + + async def cancel(self): + await self.conn.cancel() + self.node.detach() + + +class Server: + def __init__(self, cancel: asyncio.Event) -> None: + logging.info("server init") + self._cancel = cancel + self._handlers: list[RemoteHandler] = [] + self._registry = RemoteRegistry() + + async def _handler(self, socket: ws.WebSocketServerProtocol): + logging.info("server handle new connection %s", socket) + node = RemoteNode(self._registry) + conn = Connection(self._cancel, socket) + handler = RemoteHandler(conn, node) + self._handlers.append(handler) + + async def serve(self, host: str, port: int): + logging.info("server serve") + async with ws.serve(self._handler, host, port): + await self._cancel.wait() + logging.info("server cancel wait done") + + async def cancel(self): + logging.info("server cancel") + for handler in self._handlers: + await handler.cancel() + self._cancel.set() diff --git a/src/olink/ws/test_client.py b/src/olink/ws/test_client.py new file mode 100644 index 0000000..5b84cbb --- /dev/null +++ b/src/olink/ws/test_client.py @@ -0,0 +1,26 @@ +import pytest +import websockets as ws +import asyncio + +test_host = "localhost" +test_port = 8152 + + +async def setup_server( + done: asyncio.Future, queue: asyncio.Queue, host: str, port: int +): + async def handler(socket: ws.WebSocketServerProtocol): + async for msg in socket: + await queue.put(msg) + done.set_result(True) + + await ws.serve(handler, host, port) + await done + await ws.close() + + +@pytest.mark.asyncio +async def test_client_link(): + assert False + queue = asyncio.Queue() + await setup_server(queue, test_host, test_port) diff --git a/tests/test_clientnode.py b/tests/test_clientnode.py index 87cae39..46feb57 100644 --- a/tests/test_clientnode.py +++ b/tests/test_clientnode.py @@ -1,37 +1,67 @@ -from olink.clientnode import ClientNode +from olink.client import ClientNode, ClientRegistry from olink.mocks.mocksink import MockSink +import pytest -name = 'demo.Counter' -sink = MockSink(name) -client = ClientNode() -r = client.registry() +name = "demo.Counter" -def test_add_sink(): - ClientNode.register_sink(sink) - assert(r.get_sink(name) == sink) - assert(r.get_node(name) == None) +@pytest.fixture +def node(): # type: () -> ClientNode + sink = MockSink(name) + registry = ClientRegistry() + registry.add_sink(sink) + node = ClientNode(registry) + registry.add_node(name, node) + assert registry.get_sink(name) == sink + assert registry.get_node(name) == node + return node -def test_remove_sink(): - ClientNode.unregister_sink(sink) - assert(r.get_sink(name) == None) +@pytest.fixture +def registry(): # type: () -> ClientRegistry + name = "demo.Counter" + sink = MockSink(name) + registry = ClientRegistry() + registry.add_sink(sink) + return registry -def test_link_node_to_sink(): - assert(r.get_node(name) == None) - client.link_remote(name) - assert(r.get_node(name) == client) +def test_add_sink(node: ClientNode): + name = "demo.Counter" + registry = node.registry() + sink = registry.get_sink(name) + assert registry.get_sink(name) == sink + assert registry.get_node(name) == node + registry.remove_sink(sink) + assert registry.get_sink(name) == None + assert registry.get_node(name) == None -def test_unlink_node_from_sink(): - assert(r.get_node(name) == client) - client.unlink_remote(name) - assert(r.get_node(name) == None) +def test_remove_sink(node: ClientNode): + registry = node.registry() + sink = registry.get_sink(name) + registry.remove_sink(sink) + assert registry.get_sink(name) == None + assert registry.get_node(name) == None -def test_detach_node_from_all_sinks(): - client.link_remote(name) - assert(r.get_node(name) == client) - client.detach() - assert(r.get_node(name) == None) +def test_link_node_to_sink(node: ClientNode): + registry = node.registry() + assert registry.get_node(name) == node + node.link_remote(name) + assert registry.get_node(name) == node + + +def test_unlink_node_from_sink(node: ClientNode): + registry = node.registry() + assert registry.get_node(name) == node + node.unlink_remote(name) + assert registry.get_node(name) == None + + +def test_detach_node_from_all_sinks(node: ClientNode): + registry = node.registry() + node.link_remote(name) + assert registry.get_node(name) == node + node.detach() + assert registry.get_node(name) == None diff --git a/tests/test_comms.py b/tests/test_comms.py index 21982f7..6ed0134 100644 --- a/tests/test_comms.py +++ b/tests/test_comms.py @@ -1,75 +1,132 @@ -from olink.clientnode import ClientNode -from olink.remotenode import RemoteNode +from olink.client import ClientNode, ClientRegistry +from olink.remote import RemoteNode, RemoteRegistry from olink.mocks.mocksink import MockSink from olink.mocks.mocksource import MockSource +from typing import Tuple +import pytest -name = 'demo.Calc' -propName = 'demo.Calc/total' +name = "demo.Calc" +propName = "demo.Calc/total" propValue = 1 -invokeName = 'demo.Calc/add' +invokeName = "demo.Calc/add" invokeArgs = [1] -sigName = 'demo.Calc/down' +sigName = "demo.Calc/down" sigArgs = [5] -client = ClientNode() -remote = RemoteNode() -client.on_log(lambda level, msg: print(msg)) -remote.on_log(lambda level, msg: print(msg)) -client.on_write(lambda msg: remote.handle_message(msg)) -remote.on_write(lambda msg: client.handle_message(msg)) -sink = MockSink(name) -source = MockSource(name) -RemoteNode.register_source(source) +# remote_registry = RemoteRegistry() +# client_registry = ClientRegistry() +# client = ClientNode(client_registry) +# remote = RemoteNode(remote_registry) +# client.on_log(lambda level, msg: print(msg)) +# remote.on_log(lambda level, msg: print(msg)) +# client.on_write(lambda msg: remote.handle_message(msg)) +# remote.on_write(lambda msg: client.handle_message(msg)) +# sink = MockSink(name) +# source = MockSource(name) +# remote_registry.add_source(source) -def reset(): - sink.clear() - source.clear() +@pytest.fixture +def client(): + registry = ClientRegistry() + client = ClientNode(registry) + client.on_log(lambda level, msg: print(msg)) + sink = MockSink(name) + registry.add_sink(sink) + registry.add_node(name, client) + assert registry.get_sink(name) == sink + assert registry.get_node(name) == client + return client -def test_client_link(): +@pytest.fixture +def remote(): + registry = RemoteRegistry() + remote = RemoteNode(registry) + remote.on_log(lambda level, msg: print(msg)) + source = MockSource(name) + registry.add_source(source) + registry.add_node(name, remote) + assert registry.get_source(name) == source + assert len(registry.get_nodes(name)) == 1 + assert remote in registry.get_nodes(name) + return remote + + +@pytest.fixture +def conn(client, remote): + client.on_write(lambda msg: remote.handle_message(msg)) + remote.on_write(lambda msg: client.handle_message(msg)) + print("conn: remote node", hash(remote)) + return client, remote + + +def test_client_link(conn: Tuple[ClientNode, RemoteNode]): + client, _ = conn client.detach() - assert client.registry().get_node(name) == None + registry = client.registry() + sink = registry.get_sink(name) # type: MockSink + assert registry.get_node(name) == None client.link_remote(name) - assert client.registry().get_node(name) == client + assert registry.get_node(name) == client assert len(sink.events) == 1 - assert sink.events[0] == {'type': 'init', 'name': name, 'props': {}} + assert sink.events[0] == {"type": "init", "name": name, "props": {}} -def test_client_set_property(): - reset() +def test_client_set_property(conn: Tuple[ClientNode, RemoteNode]): + client, _ = conn + registry = client.registry() + sink = registry.get_sink(name) # type: MockSink client.link_remote(name) assert len(sink.events) == 1 client.set_remote_property(propName, propValue) assert len(sink.events) == 2 assert sink.events[1] == { - 'type': 'property_change', 'name': propName, 'value': propValue} + "type": "property_change", + "name": propName, + "value": propValue, + } -def test_client_invoke(): - reset() +def test_client_invoke(conn: Tuple[ClientNode, RemoteNode]): + client, _ = conn + registry = client.registry() + sink = registry.get_sink(name) # type: MockSink client.link_remote(name) assert len(sink.events) == 1 sink.invoke(invokeName, invokeArgs) assert len(sink.events) == 2 - assert sink.events[1] == {'type': 'invoke-reply', - 'name': invokeName, 'value': invokeName} + assert sink.events[1] == { + "type": "invoke-reply", + "name": invokeName, + "value": invokeName, + } -def test_remote_signal(): - reset() +def test_remote_signal(conn: Tuple[ClientNode, RemoteNode]): + client, remote = conn client.link_remote(name) + client_registry = client.registry() + sink = client_registry.get_sink(name) # type: MockSink assert len(sink.events) == 1 + remote_registry = remote.registry() + source = remote_registry.get_source(name) # type: MockSource source.notify_signal(sigName, sigArgs) assert len(sink.events) == 2 - assert sink.events[1] == {'type': 'signal', - 'name': sigName, 'args': sigArgs} + assert sink.events[1] == {"type": "signal", "name": sigName, "args": sigArgs} -def test_remote_set_property(): - reset() +def test_remote_set_property(conn: Tuple[ClientNode, RemoteNode]): + client, remote = conn + client_registry = client.registry() + sink = client_registry.get_sink(name) # type: MockSink + remote_registry = remote.registry() + source = remote_registry.get_source(name) # type: MockSource client.link_remote(name) assert len(sink.events) == 1 source.set_property(propName, propValue) assert len(sink.events) == 2 assert sink.events[1] == { - 'type': 'property_change', 'name': propName, 'value': propValue} + "type": "property_change", + "name": propName, + "value": propValue, + } diff --git a/tests/test_protocol.py b/tests/test_protocol.py index d29cbac..243171b 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -1,8 +1,8 @@ from olink.core.protocol import Protocol from olink.core.types import MsgType -name = 'demo.Calc' -props = {'count': 1} +name = "demo.Calc" +props = {"count": 1} value = 1 id = 1 args = [1, 2] @@ -23,7 +23,7 @@ def test_messages(): msg = Protocol.set_property_message(name, value) assert msg == [MsgType.SET_PROPERTY, name, value] - msg = Protocol.property_change_message(name, value) + msg = Protocol.property_changed_message(name, value) assert msg == [MsgType.PROPERTY_CHANGE, name, value] msg = Protocol.invoke_message(id, name, args) diff --git a/tests/test_remotenode.py b/tests/test_remotenode.py index 51246fd..b9ad080 100644 --- a/tests/test_remotenode.py +++ b/tests/test_remotenode.py @@ -1,60 +1,60 @@ -from olink.remotenode import RemoteNode, get_remote_registry +from olink.remote import RemoteNode, RemoteRegistry from olink.mocks.mocksource import MockSource -name = 'demo.Counter' +name = "demo.Counter" source = MockSource(name) -remote = RemoteNode() -remote_registry = get_remote_registry() +registry = RemoteRegistry() +remote = RemoteNode(registry) def reset(): source.clear() - remote_registry.clear() + registry.clear() def test_add_source(): reset() - assert len(remote_registry.get_nodes(name)) == 0 - remote.register_source(source) - assert remote_registry.get_source(name) == source - assert len(remote_registry.get_nodes(name)) == 0 + assert len(registry.get_nodes(name)) == 0 + registry.add_source(source) + assert registry.get_source(name) == source + assert len(registry.get_nodes(name)) == 0 def test_remove_source(): reset() - RemoteNode.register_source(source) - assert remote_registry.get_source(name) == source - RemoteNode.unregister_source(source) - assert(remote_registry.get_source(name) == None) + registry.add_source(source) + assert registry.get_source(name) == source + registry.remove_source(source) + assert registry.get_source(name) == None def test_link_node_to_source(): reset() - RemoteNode.register_source(source) - assert remote_registry.get_nodes(name) == set() - remote_registry.add_node_to_source(name, remote) - assert remote_registry.get_nodes(name) == {remote} + registry.add_source(source) + assert registry.get_nodes(name) == set() + registry.add_node(name, remote) + assert registry.get_nodes(name) == {remote} def test_unlink_node_from_source(): reset() - RemoteNode.register_source(source) - remote_registry.add_node_to_source(name, remote) - assert remote_registry.get_nodes(name) == set([remote]) - remote_registry.remove_node_from_source(name, remote) - assert remote_registry.get_nodes(name) == set() + registry.add_source(source) + registry.add_node(name, remote) + assert remote in registry.get_nodes(name) + registry.remove_node_from_source(name, remote) + assert registry.get_nodes(name) == set() def test_detach_node_from_all_sources(): reset() - RemoteNode.register_source(source) - remote_registry.add_node_to_source(name, remote) - assert remote_registry.get_nodes(name) == set([remote]) + registry.add_source(source) + registry.add_node(name, remote) + assert registry.get_nodes(name) == set([remote]) remote.detach() - assert remote_registry.get_nodes(name) == set() + assert registry.get_nodes(name) == set() def test_get_registry(): reset() reg = remote.registry() - assert reg == remote_registry + assert reg == registry diff --git a/tests/test_ws_client.py b/tests/test_ws_client.py new file mode 100644 index 0000000..54d421f --- /dev/null +++ b/tests/test_ws_client.py @@ -0,0 +1,48 @@ +import asyncio +import pytest +from olink.ws import Connection, Server +from olink.client import ClientNode, ClientRegistry +import logging + +test_host = "localhost" +test_port = 8152 +test_url = f"ws://{test_host}:{test_port}" +object_id = "demo.Calc" + + +async def delay(coro, delay: float): + await asyncio.sleep(delay) + await coro + + +async def delay_cancel(cancel: asyncio.Future, delay: float): + await asyncio.sleep(delay) + if not cancel.is_set(): + cancel.set() + + +@pytest.mark.asyncio +async def test_client_link(): + logging.info("test_client_link") + cancel = asyncio.Event() + server = Server(cancel) + server_task = asyncio.create_task(server.serve(test_host, test_port), name="server") + + client_registry = ClientRegistry() + client_node = ClientNode(client_registry) + conn = Connection(cancel) + client_node.on_write(conn.send) + conn.on("message", client_node.handle_message) + + async def run_client(conn: Connection, cancel: asyncio.Event): + logging.info("run_client") + logging.info("connecting to %s", test_url) + await conn.connect(test_url) + await cancel.wait() + + await asyncio.sleep(1) + client_task = asyncio.create_task(run_client(conn, cancel), name="client") + await delay_cancel(cancel, 0) + await server.cancel() + await conn.cancel() + await asyncio.wait([server_task, client_task])