diff --git a/.gitignore b/.gitignore index 8731ea4..3637d4a 100644 --- a/.gitignore +++ b/.gitignore @@ -105,4 +105,7 @@ venv.bak/ .mypy_cache/ # editor configs and cache -.vscode \ No newline at end of file +.vscode + +# ide stuff +/.idea \ No newline at end of file diff --git a/senseme/senseme.py b/senseme/senseme.py index 3c17e40..8dcfdcc 100644 --- a/senseme/senseme.py +++ b/senseme/senseme.py @@ -12,6 +12,7 @@ import re import socket import time +from typing import Tuple from .lib import MWT, BackgroundLoop from .lib.xml import data_to_xml @@ -66,7 +67,6 @@ def __init__(self, ip="", name="", model="", series="", mac="", **kwargs): self.ip = ip self.name = name self.mac = mac - self.details = "" self.model = model self.series = series self.monitor_frequency = kwargs.get("monitor_frequency", 45) @@ -826,6 +826,47 @@ def listen(cycles=30): m = sock.recvfrom(1024) LOGGER.info(m) + + def discover_single_device(self): + """Discover a single device. + + Called during __init__ if the device name or IP address is missing. + + This function will discover only the first device to respond if both + name and IP were not provided on instantiation. If there is only one + device in the home this will work well. Otherwise, use the discover + function of the module rather than this one. + """ + data = "".encode("utf-8") + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + LOGGER.debug("Sending broadcast.") + s.sendto(data, ("", self.PORT)) + + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + LOGGER.debug("Listening...") + try: + s.bind(("", self.PORT)) + except OSError as e: + # Address already in use + LOGGER.exception( + "Port is in use or could not be opened." "Is another instance running?" + ) + raise OSError + else: + try: + m = s.recvfrom(1024) + LOGGER.info(m) + if not m: + LOGGER.error("Didn't receive response.") + else: + self.name, self.mac, self.model, self.series, self.ip = decode_discovery(m) + LOGGER.info(self.name, self.mac, self.model, self.series) + except OSError as e: + LOGGER.critical("No device was found.\n%s" % e) + raise OSError + def _send_command(self, msg): sock = socket.socket() sock.settimeout(5) @@ -1103,6 +1144,18 @@ def stop_monitor(self): """Stop the monitor.""" self._monitoring = False self._background_monitor.stop() + + @staticmethod + def listen(cycles=30): + """Listen for broadcasts and logs them for debugging purposes. + + Listens for cycles iterations + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind(("", 31415)) + for x in range(1, cycles): + m = sock.recvfrom(1024) + LOGGER.info(m) def discover(devices_to_find=6, time_to_wait=5): @@ -1133,14 +1186,7 @@ def discover(devices_to_find=6, time_to_wait=5): message = b"" if message: LOGGER.info("Received a message") - message_decoded = message[0].decode("utf-8") - res = re.match("\((.*);DEVICE;ID;(.*);(.*),(.*)\)", message_decoded) - # TODO: Parse this properly rather than regex - name = res.group(1) - mac = res.group(2) - model = res.group(3) - series = res.group(4) - ip = message[1][0] + name, mac, model, series, ip = decode_discovery(message) devices.append( SenseMe(ip=ip, name=name, model=model, series=series, mac=mac) ) @@ -1159,62 +1205,14 @@ def discover(devices_to_find=6, time_to_wait=5): finally: s.close() - @staticmethod - def listen(cycles=30): - """Listen for broadcasts and logs them for debugging purposes. - - Listens for cycles iterations - """ - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.bind(("", 31415)) - for x in range(1, cycles): - m = sock.recvfrom(1024) - LOGGER.info(m) - - def discover_single_device(self): - """Discover a single device. - - Called during __init__ if the device name or IP address is missing. - - This function will discover only the first device to respond if both - name and IP were not provided on instantiation. If there is only one - device in the home this will work well. Otherwise, use the discover - function of the module rather than this one. - """ - data = "".encode("utf-8") - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.bind(("", 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - LOGGER.debug("Sending broadcast.") - s.sendto(data, ("", self.PORT)) - - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - LOGGER.debug("Listening...") - try: - s.bind(("", self.PORT)) - except OSError as e: - # Address already in use - LOGGER.exception( - "Port is in use or could not be opened." "Is another instance running?" - ) - raise OSError - else: - try: - m = s.recvfrom(1024) - LOGGER.info(m) - if not m: - LOGGER.error("Didn't receive response.") - else: - self.details = m[0].decode("utf-8") - res = re.match("\((.*);DEVICE;ID;(.*);(.*),(.*)\)", self.details) - # TODO: Parse this properly rather than regex - self.name = res.group(1) - self.mac = res.group(2) - self.model = res.group(3) - self.series = res.group(4) - self.ip = m[1][0] - - LOGGER.info(self.name, self.mac, self.model, self.series) - except OSError as e: - LOGGER.critical("No device was found.\n%s" % e) - raise OSError +def decode_discovery(message: Tuple[bytes, Tuple[str, str]]) -> Tuple: + """ Takes a message in the return format of socket.recvfrom and returns decoded SenseMe discovery information """ + message_decoded = message[0].decode("utf-8") + res = re.match("\((.*);DEVICE;ID;(.*);(.*),(.*)\)", message_decoded) + # TODO: Parse this properly rather than regex + name = res.group(1) + mac = res.group(2) + model = res.group(3) + series = res.group(4) + ip = message[1][0] + return name, mac, model, series, ip \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..9af7e6f --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[aliases] +test=pytest \ No newline at end of file diff --git a/setup.py b/setup.py index 2ac2d0b..19a0b54 100644 --- a/setup.py +++ b/setup.py @@ -49,5 +49,7 @@ ], keywords="HaikuHome SenseMe fan light home automation bigassfans", python_requires=">=3.6", - scripts=['bin/senseme_cli'] + scripts=['bin/senseme_cli'], + tests_require=["pytest"], + setup_requires=["pytest-runner"] ) diff --git a/tests/test_senseme.py b/tests/test_senseme.py new file mode 100644 index 0000000..56f7db1 --- /dev/null +++ b/tests/test_senseme.py @@ -0,0 +1,7 @@ +from senseme.senseme import decode_discovery + + +def test_discovery_message_decoding(): + test_message = ("(NAME;DEVICE;ID;MAC;MODEL,SERIES)".encode("utf-8"), ("127.0.0.1", "3101")) + name, mac, model, series, ip = decode_discovery(test_message) + assert ("NAME", "MAC", "MODEL", "SERIES", "127.0.0.1") == (name, mac, model, series, ip)