diff --git a/.pylintrc b/.pylintrc index c94cddf2..28934b6e 100644 --- a/.pylintrc +++ b/.pylintrc @@ -7,7 +7,7 @@ # Add files or directories matching the regex patterns to the blacklist. The # regex matches against base names, not paths. -ignore-patterns=mqtt_pb2.py,channel_pb2.py,telemetry_pb2.py,admin_pb2.py,config_pb2.py,deviceonly_pb2.py,apponly_pb2.py,remote_hardware_pb2.py,portnums_pb2.py,mesh_pb2.py,storeforward_pb2.py,cannedmessages_pb2.py,module_config_pb2.py,localonly_pb2.py,node.py,device_metadata_pb2.py,nanopb_pb2.py +ignore-patterns=xmodem_pb2.py,rtttl_pb2.py,powermon_pb2.py,paxcount_pb2.py,interdevice_pb2.py,atak_pb2,clientonly_pb2.py,connection_status_pb2.py,device_ui_pb2.py,mqtt_pb2.py,channel_pb2.py,telemetry_pb2.py,admin_pb2.py,config_pb2.py,deviceonly_pb2.py,apponly_pb2.py,remote_hardware_pb2.py,portnums_pb2.py,mesh_pb2.py,storeforward_pb2.py,cannedmessages_pb2.py,module_config_pb2.py,localonly_pb2.py,node.py,device_metadata_pb2.py,nanopb_pb2.py diff --git a/exampleConfig.yaml b/exampleConfig.yaml index a546e5a3..d68b6f46 100644 --- a/exampleConfig.yaml +++ b/exampleConfig.yaml @@ -1,16 +1,46 @@ # example config using camelCase keys owner: Bob TBeam ownerShort: BOB +isUnmessagable: true -channelUrl: https://www.meshtastic.org/d/#CgUYAyIBAQ +channelUrl: https://www.meshtastic.org/e/#CgQ6AggNEg8IATgBQANIAVAeaAHABgE + +cannedMessages: Hi|Bye|Yes|No|Ok +ringtone: 24:d=32,o=5,b=565:f6,p,f6,4p,p,f6,p,f6,2p,p,b6,p,b6,p,b6,p,b6,p,b,p,b,p,b,p,b,p,b,p,b,p,b,p,b,1p.,2p.,p location: lat: 35.88888 lon: -93.88888 alt: 304 - -userPrefs: - region: 1 - isAlwaysPowered: "true" - screenOnSecs: 31536000 - waitBluetoothSecs: 31536000 +config: + bluetooth: + enabled: true + fixedPin: 123456 + device: + serialEnabled: true + display: + screenOnSecs: 781 + lora: + region: US + hopLimit: 3 + txEnabled: true + txPower: 30 + network: + ntpServer: 0.pool.ntp.org + position: + gpsAttemptTime: 900 + gpsEnabled: true + gpsUpdateInterval: 120 + positionBroadcastSecs: 900 + positionBroadcastSmartEnabled: true + positionFlags: 3 + power: + lsSecs: 300 + meshSdsTimeoutSecs: 7200 + minWakeSecs: 10 + sdsSecs: 4294967295 + waitBluetoothSecs: 344 +moduleConfig: + telemetry: + deviceUpdateInterval: 900 + environmentUpdateInterval: 900 diff --git a/example_config.yaml b/example_config.yaml index acbdc332..aff184ec 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -1,8 +1,9 @@ # example configuration file with snake_case keys owner: Bob TBeam owner_short: BOB +is_unmessagable: true -channel_url: https://www.meshtastic.org/e/#CgMSAQESCDgBQANIAVAe +channel_url: https://www.meshtastic.org/e/#CgQ6AggNEg8IATgBQANIAVAeaAHABgE canned_messages: Hi|Bye|Yes|No|Ok ringtone: 24:d=32,o=5,b=565:f6,p,f6,4p,p,f6,p,f6,2p,p,b6,p,b6,p,b6,p,b6,p,b,p,b,p,b,p,b,p,b,p,b,p,b,p,b,1p.,2p.,p @@ -19,7 +20,7 @@ config: device: serialEnabled: true display: - screenOnSecs: 600 + screenOnSecs: 781 lora: region: US hopLimit: 3 @@ -39,7 +40,7 @@ config: meshSdsTimeoutSecs: 7200 minWakeSecs: 10 sdsSecs: 4294967295 - + waitBluetoothSecs: 344 module_config: telemetry: deviceUpdateInterval: 900 diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 16854002..8c3dcc86 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -1,6 +1,6 @@ """ Main Meshtastic """ - +from pathlib import Path # We just hit the 1600 line limit for main.py, but I currently have a huge set of powermon/structured logging changes # later we can have a separate changelist to refactor main.py into smaller files # pylint: disable=R0917,C0302 @@ -21,6 +21,7 @@ import platform import sys import time +import datetime as dt try: import pyqrcode # type: ignore[import-untyped] @@ -290,6 +291,7 @@ def setPref(config, comp_name, raw_val) -> bool: def onConnected(interface): """Callback invoked when we connect to a radio""" + logger.debug("--- onConnected Entry") closeNow = False # Should we drop the connection after we finish? waitForAckNak = ( False # Should we wait for an acknowledgment if we send to a remote node? @@ -671,115 +673,98 @@ def onConnected(interface): printConfig(node.moduleConfig) if args.configure: - with open(args.configure[0], encoding="utf8") as file: - configuration = yaml.safe_load(file) - closeNow = True - - interface.getNode(args.dest, False, **getNode_kwargs).beginSettingsTransaction() - - if "owner" in configuration: - # Validate owner name before setting - owner_name = str(configuration["owner"]).strip() - if not owner_name: - meshtastic.util.our_exit("ERROR: Long Name cannot be empty or contain only whitespace characters") - print(f"Setting device owner to {configuration['owner']}") - waitForAckNak = True - interface.getNode(args.dest, False, **getNode_kwargs).setOwner(configuration["owner"]) - time.sleep(0.5) - - if "owner_short" in configuration: - # Validate owner short name before setting - owner_short_name = str(configuration["owner_short"]).strip() - if not owner_short_name: - meshtastic.util.our_exit("ERROR: Short Name cannot be empty or contain only whitespace characters") - print( - f"Setting device owner short to {configuration['owner_short']}" - ) - waitForAckNak = True - interface.getNode(args.dest, False, **getNode_kwargs).setOwner( - long_name=None, short_name=configuration["owner_short"] - ) - time.sleep(0.5) - - if "ownerShort" in configuration: - # Validate owner short name before setting - owner_short_name = str(configuration["ownerShort"]).strip() - if not owner_short_name: - meshtastic.util.our_exit("ERROR: Short Name cannot be empty or contain only whitespace characters") - print( - f"Setting device owner short to {configuration['ownerShort']}" - ) - waitForAckNak = True - interface.getNode(args.dest, False, **getNode_kwargs).setOwner( - long_name=None, short_name=configuration["ownerShort"] - ) - time.sleep(0.5) - - if "channel_url" in configuration: - print("Setting channel url to", configuration["channel_url"]) - interface.getNode(args.dest, **getNode_kwargs).setURL(configuration["channel_url"]) + def fixEntry(cfg: dict, oldKey: str, newKey: str): + """fix configuration entries so only one structure must be handled. + This will be done by copying the previous used key to the now used one. + Abort if both old and new key are present upon entry.""" + if oldKey in cfg: + if newKey in cfg: + meshtastic.util.our_exit( + f"ERROR: Inconsistent settings in configuration: {oldKey} and {newKey} are both present.") + else: + cfg[newKey] = cfg[oldKey] + + def entryToStr(entry) -> Union[str | None]: + """Ensures conversion of an entry to string if it is not None + Failure of conversion will be trapped upstream""" + if entry is not None: + return str(entry).strip() + return None + + def writeSectionConfig(newConfig: dict, actConfig: dict, actNode: meshtastic.node.Node): + """Traverses a configuration dict and writes each part to the node""" + for sectionName, sectionElements in newConfig.items(): + logger.debug(f"writeSectionConfig for {sectionName} on node {actNode}") + traverseConfig(sectionName, sectionElements, actConfig) + actNode.writeConfig(meshtastic.util.camel_to_snake(sectionName)) time.sleep(0.5) - if "channelUrl" in configuration: - print("Setting channel url to", configuration["channelUrl"]) - interface.getNode(args.dest, **getNode_kwargs).setURL(configuration["channelUrl"]) - time.sleep(0.5) + configuration = yaml.safe_load(Path(args.configure[0]).read_text(encoding="utf8")) + closeNow = True + if not configuration or not isinstance(configuration, dict) or len(configuration) == 0: + logger.debug(f"Importing configuration failed: {configuration}") + print("command failed, no valid configuration could be loaded. Please check your file.") + return - if "canned_messages" in configuration: - print("Setting canned message messages to", configuration["canned_messages"]) - interface.getNode(args.dest, **getNode_kwargs).set_canned_message(configuration["canned_messages"]) - time.sleep(0.5) + # fix configuration structure: older version had entries "ownerShort" and "channelUrl"? + fixEntry(configuration, "ownerShort", "owner_short") + fixEntry(configuration, "channelUrl", "channel_url") - if "ringtone" in configuration: - print("Setting ringtone to", configuration["ringtone"]) - interface.getNode(args.dest, **getNode_kwargs).set_ringtone(configuration["ringtone"]) - time.sleep(0.5) + # keep always the same node when applying settings + actualNode: meshtastic.node.Node = interface.getNode(args.dest, True, **getNode_kwargs) + actualNode.beginSettingsTransaction() + waitForAckNak = True - if "location" in configuration: - alt = 0 - lat = 0.0 - lon = 0.0 - localConfig = interface.localNode.localConfig - - if "alt" in configuration["location"]: - alt = int(configuration["location"]["alt"] or 0) - print(f"Fixing altitude at {alt} meters") - if "lat" in configuration["location"]: - lat = float(configuration["location"]["lat"] or 0) - print(f"Fixing latitude at {lat} degrees") - if "lon" in configuration["location"]: - lon = float(configuration["location"]["lon"] or 0) - print(f"Fixing longitude at {lon} degrees") - print("Setting device position") + ownerName = entryToStr(configuration.get("owner", None)) + ownerShortName = entryToStr(configuration.get("owner_short", None)) + isUnmessagable = configuration.get("is_unmessagable", None) + print(f"Setting owner properties: {ownerName} - {ownerShortName} - {isUnmessagable}") + logger.debug(f"Setting owner properties: {ownerName} - {ownerShortName} - {isUnmessagable}") + actualNode.setOwner(long_name=ownerName, short_name=ownerShortName, is_unmessagable=isUnmessagable) + time.sleep(0.5) + + if "channel_url" in configuration: + print(f"Setting channel url to {configuration.get("channel_url","---")}") + logger.debug(f"Setting channel url to {configuration.get("channel_url","---")}") + actualNode.deleteAllSecondaryChannels() + actualNode.setURL(configuration["channel_url"]) + time.sleep(0.5) + + if "canned_messages" in configuration: + print(f"Setting canned message messages to {configuration.get("canned_messages", "---")}") + logger.debug(f"Setting canned message messages to {configuration.get("canned_messages", "---")}") + actualNode.set_canned_message(configuration["canned_messages"]) + time.sleep(0.5) + + if "ringtone" in configuration: + print(f"Setting ringtone to {configuration.get("ringtone", "---")}") + logger.debug(f"Setting ringtone to {configuration.get("ringtone", "---")}") + actualNode.set_ringtone(configuration["ringtone"]) + time.sleep(0.5) + + if "location" in configuration: + if configuration["location"] is None: + print("No position elements found, skipping") + else: + alt = int(configuration["location"].get("alt", 0)) + lat = float(configuration["location"].get("lat", 0.0)) + lon = float(configuration["location"].get("lon", 0.0)) + print(f"Setting fixed device position to lat {lat} lon {lon} alt {alt}") + logger.debug(f"Setting fixed device position to lat {lat} lon {lon} alt {alt}") interface.localNode.setFixedPosition(lat, lon, alt) time.sleep(0.5) - if "config" in configuration: - localConfig = interface.getNode(args.dest, **getNode_kwargs).localConfig - for section in configuration["config"]: - traverseConfig( - section, configuration["config"][section], localConfig - ) - interface.getNode(args.dest, **getNode_kwargs).writeConfig( - meshtastic.util.camel_to_snake(section) - ) - time.sleep(0.5) - - if "module_config" in configuration: - moduleConfig = interface.getNode(args.dest, **getNode_kwargs).moduleConfig - for section in configuration["module_config"]: - traverseConfig( - section, - configuration["module_config"][section], - moduleConfig, - ) - interface.getNode(args.dest, **getNode_kwargs).writeConfig( - meshtastic.util.camel_to_snake(section) - ) - time.sleep(0.5) + if "config" in configuration: + writeSectionConfig(configuration["config"], actualNode.localConfig, actualNode) + + if "module_config" in configuration: + writeSectionConfig(configuration["module_config"], actualNode.moduleConfig, actualNode) - interface.getNode(args.dest, False, **getNode_kwargs).commitSettingsTransaction() - print("Writing modified configuration to device") + print("Committing modified configuration to device") + logger.debug("Committing modified configuration to device") + actualNode.commitSettingsTransaction() + print(f"Configuration finished [{args.configure[0]}]") + logger.debug(f"Configuration finished [{args.configure[0]}]") if args.export_config: if args.dest != BROADCAST_ADDR: @@ -787,15 +772,14 @@ def onConnected(interface): return closeNow = True - config_txt = export_config(interface) + configTxt = export_config(interface) if args.export_config == "-": # Output to stdout (preserves legacy use of `> file.yaml`) - print(config_txt) + print(configTxt) else: try: - with open(args.export_config, "w", encoding="utf-8") as f: - f.write(config_txt) + Path(args.export_config).write_text(configTxt, encoding="utf-8") print(f"Exported configuration to {args.export_config}") except Exception as e: meshtastic.util.our_exit(f"ERROR: Failed to write config file: {e}") @@ -842,7 +826,7 @@ def onConnected(interface): n.writeChannel(ch.index) if channelIndex is None: print( - f"Setting newly-added channel's {ch.index} as '--ch-index' for further modifications" + f"Setting newly-added channel {ch.index} as '--ch-index' for further modifications" ) mt_config.channel_index = ch.index @@ -861,7 +845,7 @@ def onConnected(interface): ) else: print(f"Deleting channel {channelIndex}") - ch = interface.getNode(args.dest, **getNode_kwargs).deleteChannel(channelIndex) + interface.getNode(args.dest, **getNode_kwargs).deleteChannel(channelIndex) def setSimpleConfig(modem_preset): """Set one of the simple modem_config""" @@ -899,6 +883,10 @@ def setSimpleConfig(modem_preset): if args.ch_shortfast: setSimpleConfig(config_pb2.Config.LoRaConfig.ModemPreset.SHORT_FAST) + if args.ch_info: + node = interface.getNode(args.dest, **getNode_kwargs) + node.showChannels(True) + if args.ch_set or args.ch_enable or args.ch_disable: closeNow = True @@ -1013,12 +1001,16 @@ def setSimpleConfig(modem_preset): if args.dest != BROADCAST_ADDR: print("Showing node list of a remote node is not supported.") return - interface.showNodes(True, args.show_fields) + interface.showNodes(True, showFields=args.show_fields, printFmt=args.fmt) if args.show_fields and not args.nodes: print("--show-fields can only be used with --nodes") return + if args.fmt and not args.nodes: + print("--fmt can only be used with --nodes") + return + if args.qr or args.qr_all: closeNow = True url = interface.getNode(args.dest, True, **getNode_kwargs).getURL(includeAll=args.qr_all) @@ -1085,6 +1077,7 @@ def setSimpleConfig(modem_preset): print(f"Waiting {args.wait_to_disconnect} seconds before disconnecting") time.sleep(int(args.wait_to_disconnect)) + logger.debug("--- onConnected: Leaving") # if the user didn't ask for serial debugging output, we might want to exit after we've done our operation if (not args.seriallog) and closeNow: interface.close() # after running command then exit @@ -1094,6 +1087,7 @@ def setSimpleConfig(modem_preset): log_set.close() except Exception as ex: + logger.debug(f"Aborting due to: {ex}") print(f"Aborting due to: {ex}") interface.close() # close the connection now, so that our app exits sys.exit(1) @@ -1131,10 +1125,11 @@ def subscribe() -> None: # pub.subscribe(onNode, "meshtastic.node") -def set_missing_flags_false(config_dict: dict, true_defaults: set[tuple[str, str]]) -> None: + +def setMissingFlagsFalse(configDict: dict, trueDefaults: set[tuple[str, str]]) -> None: """Ensure that missing default=True keys are present in the config_dict and set to False.""" - for path in true_defaults: - d = config_dict + for path in trueDefaults: + d = configDict for key in path[:-1]: if key not in d or not isinstance(d[key], dict): d[key] = {} @@ -1142,12 +1137,13 @@ def set_missing_flags_false(config_dict: dict, true_defaults: set[tuple[str, str if path[-1] not in d: d[path[-1]] = False + def export_config(interface) -> str: - """used in --export-config""" - configObj = {} + """implements --export-config option""" + configObj: dict = {} # main object collecting all settings to be exported # A list of configuration keys that should be set to False if they are missing - config_true_defaults = { + configTrueDefaults: set[tuple] = { ("bluetooth", "enabled"), ("lora", "sx126xRxBoostedGain"), ("lora", "txEnabled"), @@ -1156,17 +1152,19 @@ def export_config(interface) -> str: ("security", "serialEnabled"), } - module_true_defaults = { + module_true_defaults: set[tuple] = { ("mqtt", "encryptionEnabled"), } - owner = interface.getLongName() - owner_short = interface.getShortName() - channel_url = interface.localNode.getURL() - myinfo = interface.getMyNodeInfo() - canned_messages = interface.getCannedMessage() - ringtone = interface.getRingtone() - pos = myinfo.get("position") + owner: str | None = interface.getLongName() + ownerShort: str | None = interface.getShortName() + channelUrl: str | None = interface.localNode.getURL() + isUnmessagable: bool | None = interface.getIsUnmessagable() + + myinfo: dict | None = interface.getMyNodeInfo() + cannedMessages: str | None = interface.getCannedMessage() + ringtone: str | None = interface.getRingtone() + pos: dict | None = myinfo.get("position") lat = None lon = None alt = None @@ -1177,15 +1175,17 @@ def export_config(interface) -> str: if owner: configObj["owner"] = owner - if owner_short: - configObj["owner_short"] = owner_short - if channel_url: + if ownerShort: + configObj["owner_short"] = ownerShort + if isUnmessagable: + configObj["is_unmessagable"] = isUnmessagable + if channelUrl: if mt_config.camel_case: - configObj["channelUrl"] = channel_url + configObj["channelUrl"] = channelUrl else: - configObj["channel_url"] = channel_url - if canned_messages: - configObj["canned_messages"] = canned_messages + configObj["channel_url"] = channelUrl + if cannedMessages: + configObj["canned_messages"] = cannedMessages if ringtone: configObj["ringtone"] = ringtone # lat and lon don't make much sense without the other (so fill with 0s), and alt isn't meaningful without both @@ -1194,8 +1194,7 @@ def export_config(interface) -> str: if alt: configObj["location"]["alt"] = alt - config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below - #was used as a string here and a Dictionary above + config: dict = MessageToDict(interface.localNode.localConfig) if config: # Convert inner keys to correct snake/camelCase prefs = {} @@ -1213,29 +1212,20 @@ def export_config(interface) -> str: if 'adminKey' in prefs[pref]: for i in range(len(prefs[pref]['adminKey'])): prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i] - if mt_config.camel_case: - configObj["config"] = config #Identical command here and 2 lines below? - else: - configObj["config"] = config - - set_missing_flags_false(configObj["config"], config_true_defaults) + configObj["config"] = config + setMissingFlagsFalse(configObj["config"], configTrueDefaults) - module_config = MessageToDict(interface.localNode.moduleConfig) - if module_config: + moduleConfig: dict = MessageToDict(interface.localNode.moduleConfig) + if moduleConfig: # Convert inner keys to correct snake/camelCase prefs = {} - for pref in module_config: - if len(module_config[pref]) > 0: - prefs[pref] = module_config[pref] - if mt_config.camel_case: - configObj["module_config"] = prefs - else: - configObj["module_config"] = prefs + for pref in moduleConfig: + if len(moduleConfig[pref]) > 0: + prefs[pref] = moduleConfig[pref] + configObj["module_config"] = prefs + setMissingFlagsFalse(configObj["module_config"], module_true_defaults) - set_missing_flags_false(configObj["module_config"], module_true_defaults) - - config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out) - #was used as a string here and a Dictionary above + config_txt = "# start of Meshtastic configure yaml\n" config_txt += yaml.dump(configObj) return config_txt @@ -1280,10 +1270,15 @@ def common(): logfile = None args = mt_config.args parser = mt_config.parser - logging.basicConfig( - level=logging.DEBUG if (args.debug or args.listen) else logging.INFO, - format="%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s", - ) + logConfig = { + "level": logging.DEBUG if (args.debug or args.listen) else logging.INFO, + "format": "%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s" + } + if args.logTo != "stdout": + logConfig["filename"] = args.logTo # define a file handle to log to + logConfig["encoding"] = "utf-8" + + logging.basicConfig(**logConfig) # set all meshtastic loggers to DEBUG if not (args.debug or args.listen) and args.debuglib: @@ -1297,6 +1292,8 @@ def common(): meshtastic.util.support_info() meshtastic.util.our_exit("", 0) + ts = dt.datetime.now().isoformat() + logger.debug(f"=== CMD: [{ts}] meshtastic {' '.join(sys.argv[1:])}") # Early validation for owner names before attempting device connection if hasattr(args, 'set_owner') and args.set_owner is not None: stripped_long_name = args.set_owner.strip() @@ -1457,6 +1454,7 @@ def common(): except KeyboardInterrupt: logger.info("Exiting due to keyboard interrupt") + logger.debug(f"--- Leaving...\n{60*'='}\n\n") # don't call exit, background threads might be running still # sys.exit(0) @@ -1763,6 +1761,14 @@ def addChannelConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPa default=False, ) + group.add_argument( + "--ch-info", + help="List all channels", + action="store_true", + dest="ch_info", + default=False, + ) + return parser def addPositionConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: @@ -1832,6 +1838,13 @@ def addLocalActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars default=None ) + group.add_argument( + "--fmt", + help="Specify format to show when using --nodes", + type=str, + default=None + ) + return parser def addRemoteActionArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: @@ -2059,7 +2072,16 @@ def initParser(): ) group.add_argument( - "--debug", help="Show API library debug log messages", action="store_true" + "--debug", + help="Show API library debug log messages'", + action="store_true" + ) + + group.add_argument( + "--logTo", + help="Defines where to log messages. Default is stdout", + default="stdout", + type=str ) group.add_argument( diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 7052bc5f..971c8e17 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -225,13 +225,14 @@ def showInfo(self, file=sys.stdout) -> str: # pylint: disable=W0613 return infos def showNodes( - self, includeSelf: bool = True, showFields: Optional[List[str]] = None + self, includeSelf: bool = True, showFields: Optional[List[str]] = None, printFmt: Optional[str] = None ) -> str: # pylint: disable=W0613 """Show table summary of nodes in mesh Args: includeSelf (bool): Include ourself in the output? showFields (List[str]): List of fields to show in output + printFmt (str): name of format to use """ def get_human_readable(name): @@ -261,7 +262,6 @@ def get_human_readable(name): else: return name - def formatFloat(value, precision=2, unit="") -> Optional[str]: """Format a float value with precision.""" return f"{value:.{precision}f}{unit}" if value else None @@ -296,7 +296,7 @@ def getNestedValue(node_dict: Dict[str, Any], key_path: str) -> Any: return value if showFields is None or len(showFields) == 0: - # The default set of fields to show (e.g., the status quo) + # The default set of fields to show (e.g., the status quo) showFields = ["N", "user.longName", "user.id", "user.shortName", "user.hwModel", "user.publicKey", "user.role", "position.latitude", "position.longitude", "position.altitude", "deviceMetrics.batteryLevel", "deviceMetrics.channelUtilization", @@ -372,7 +372,16 @@ def getNestedValue(node_dict: Dict[str, Any], key_path: str) -> Any: for i, row in enumerate(rows): row["N"] = i + 1 - table = tabulate(rows, headers="keys", missingval="N/A", tablefmt="fancy_grid") + if not printFmt or len(printFmt) == 0: + printFmt = "fancy_grid" + if printFmt.lower() == 'json': + headers = [] + if len(rows) > 0: + headers = list(rows[0].keys()) + outDict = {'headers': headers, 'nodes': rows} + table = json.dumps(outDict) + else: + table = tabulate(rows, headers="keys", missingval="N/A", tablefmt=printFmt) print(table) return table @@ -1061,27 +1070,34 @@ def getMyNodeInfo(self) -> Optional[Dict]: logger.debug(f"self.nodesByNum:{self.nodesByNum}") return self.nodesByNum.get(self.myInfo.my_node_num) - def getMyUser(self): + def getMyUser(self) -> dict | None: """Get user""" nodeInfo = self.getMyNodeInfo() if nodeInfo is not None: return nodeInfo.get("user") return None - def getLongName(self): + def getLongName(self) -> str | None: """Get long name""" user = self.getMyUser() if user is not None: return user.get("longName", None) return None - def getShortName(self): + def getShortName(self) -> str | None: """Get short name""" user = self.getMyUser() if user is not None: return user.get("shortName", None) return None + def getIsUnmessagable(self) -> bool | None: + """Get getIsUnmessagable property""" + user = self.getMyUser() + if user is not None: + return user.get("isUnmessagable", None) + return None + def getPublicKey(self): """Get Public Key""" user = self.getMyUser() @@ -1089,14 +1105,14 @@ def getPublicKey(self): return user.get("publicKey", None) return None - def getCannedMessage(self): + def getCannedMessage(self) -> str | None: """Get canned message""" node = self.localNode if node is not None: return node.get_canned_message() return None - def getRingtone(self): + def getRingtone(self) -> str | None: """Get ringtone""" node = self.localNode if node is not None: diff --git a/meshtastic/node.py b/meshtastic/node.py index afb5611a..bda243e8 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -75,17 +75,17 @@ def module_available(self, excluded_bit: int) -> bool: except Exception: return True - def showChannels(self): - """Show human readable description of our channels.""" + def showChannels(self, fullDisplay=False): + """Show human readable description of our channels, which can also easily parsed by machines""" print("Channels:") if self.channels: logger.debug(f"self.channels:{self.channels}") for c in self.channels: cStr = message_to_json(c.settings) # don't show disabled channels - if channel_pb2.Channel.Role.Name(c.role) != "DISABLED": + if fullDisplay or channel_pb2.Channel.Role.Name(c.role) != "DISABLED": print( - f" Index {c.index}: {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}" + f" Index {c.index}: {channel_pb2.Channel.Role.Name(c.role)}, psk: {pskToString(c.settings.psk)}, settings: {cStr}" ) publicURL = self.getURL(includeAll=False) adminURL = self.getURL(includeAll=True) @@ -273,6 +273,42 @@ def getChannelByChannelIndex(self, channelIndex): ch = self.channels[channelIndex] return ch + def _rewriteChannelList(self, idxStart: int, admIndex: int) -> None: + """write back current channels list to device starting from index idxStart + channels (before idxStart channels have not changed, so we don't need to rewrite those).""" + + for idx in range(len(self.channels)-1, idxStart-1, -1): + self.writeChannel(idx) + + # index = idxStart + # while index < 8: + # self.writeChannel(index, adminIndex=admIndex) + # index += 1 + # + # # if we are updating the local node, we might end up + # # *moving* the admin channel index as we are writing + # if (self.iface.localNode == self) and index >= admIndex: + # # We've now passed the old location for admin index + # # (and written it), so we can start finding it by name again + # admIndex = 0 + + def deleteAllSecondaryChannels(self) -> None: + """Remove all secondary or disabled channels in order to be able to rewrite channel config during + configuration, only keep primary channel""" + if self.channels: + adminIndex = self.iface.localNode._getAdminChannelIndex() + + idx2Delete = [c.index for c in self.channels + if c.role == channel_pb2.Channel.Role.SECONDARY and not c.index == adminIndex + ] + logger.debug(f"Deleting secondary channels. Idx found: {idx2Delete}") + if len(idx2Delete) > 0: + idx2Delete.reverse() + for idx in idx2Delete: + self.channels.pop(idx) + self._fixupChannels() + self._rewriteChannelList(idx2Delete[-1], adminIndex) + def deleteChannel(self, channelIndex): """Delete the specified channelIndex and shift other channels up""" ch = self.channels[channelIndex] @@ -289,17 +325,7 @@ def deleteChannel(self, channelIndex): self.channels.pop(channelIndex) self._fixupChannels() # expand back to 8 channels - index = channelIndex - while index < 8: - self.writeChannel(index, adminIndex=adminIndex) - index += 1 - - # if we are updating the local node, we might end up - # *moving* the admin channel index as we are writing - if (self.iface.localNode == self) and index >= adminIndex: - # We've now passed the old location for admin index - # (and written it), so we can start finding it by name again - adminIndex = 0 + self._rewriteChannelList(channelIndex, adminIndex) def getChannelByName(self, name): """Try to find the named channel or return None""" @@ -322,8 +348,8 @@ def _getAdminChannelIndex(self): return c.index return 0 - def setOwner(self, long_name: Optional[str]=None, short_name: Optional[str]=None, is_licensed: bool=False, is_unmessagable: Optional[bool]=None): - """Set device owner name""" + def setOwner(self, long_name: Optional[str] = None, short_name: Optional[str] = None, is_licensed: bool = False, is_unmessagable: Optional[bool] = None): + """Set device owner properties""" logger.debug(f"in setOwner nodeNum:{self.nodeNum}") self.ensureSessionKey() p = admin_pb2.AdminMessage() @@ -423,9 +449,14 @@ def setURL(self, url: str, addOnly: bool = False): ch.role = channel_pb2.Channel.Role.SECONDARY print(f"Adding new channel '{chs.name}' to device") self.writeChannel(ch.index) - else: - i = 0 - for chs in channelSet.settings: + else: # set new channel settings, starting from index 0. Keep previous settings otherwise + # The behavior is somewhat strange: if you load a URL, you might expect that your channel set is + # exactly as defined by the URL. So, when the URL has 2 channels and your radio has defined 5, after loading + # there should be 2 channels available but not 5 (2 newly set and 3 untouched) + # Publishing one URL for the primary channel and one for all together opens an ambiguity: the system + # cannot decide if you want to configure just one primary channel without any secondary channels or if you + # want to overwrite only the primary channel and keep the secondary channels as they are. + for i, chs in enumerate(channelSet.settings): ch = channel_pb2.Channel() ch.role = ( channel_pb2.Channel.Role.PRIMARY @@ -437,7 +468,6 @@ def setURL(self, url: str, addOnly: bool = False): self.channels[ch.index] = ch logger.debug(f"Channel i:{i} ch:{ch}") self.writeChannel(ch.index) - i = i + 1 p = admin_pb2.AdminMessage() p.set_config.lora.CopyFrom(channelSet.lora_config) @@ -462,7 +492,7 @@ def onResponseRequestRingtone(self, p): logger.debug(f"self.ringtonePart:{self.ringtonePart}") self.gotResponse = True - def get_ringtone(self): + def get_ringtone(self) -> str | None: """Get the ringtone. Concatenate all pieces together and return a single string.""" logger.debug(f"in get_ringtone()") if not self.module_available(mesh_pb2.EXTNOTIF_CONFIG): @@ -616,7 +646,8 @@ def reboot(self, secs: int = 10): self.ensureSessionKey() p = admin_pb2.AdminMessage() p.reboot_seconds = secs - logger.info(f"Telling node to reboot in {secs} seconds") + print(f"Telling node to reboot in {secs} seconds") + logger.debug(f"Telling node to reboot in {secs} seconds") # If sending to a remote node, wait for ACK/NAK if self == self.iface.localNode: @@ -630,7 +661,8 @@ def beginSettingsTransaction(self): self.ensureSessionKey() p = admin_pb2.AdminMessage() p.begin_edit_settings = True - logger.info(f"Telling open a transaction to edit settings") + print("Telling open a transaction to edit settings") + logger.debug("Telling open a transaction to edit settings") # If sending to a remote node, wait for ACK/NAK if self == self.iface.localNode: @@ -644,7 +676,8 @@ def commitSettingsTransaction(self): self.ensureSessionKey() p = admin_pb2.AdminMessage() p.commit_edit_settings = True - logger.info(f"Telling node to commit open transaction for editing settings") + print("Telling node to commit open transaction for editing settings") + logger.debug("Telling node to commit open transaction for editing settings") # If sending to a remote node, wait for ACK/NAK if self == self.iface.localNode: @@ -658,7 +691,8 @@ def rebootOTA(self, secs: int = 10): self.ensureSessionKey() p = admin_pb2.AdminMessage() p.reboot_ota_seconds = secs - logger.info(f"Telling node to reboot to OTA in {secs} seconds") + print(f"Telling node to reboot to OTA in {secs} seconds") + logger.debug(f"Telling node to reboot to OTA in {secs} seconds") # If sending to a remote node, wait for ACK/NAK if self == self.iface.localNode: @@ -672,7 +706,8 @@ def enterDFUMode(self): self.ensureSessionKey() p = admin_pb2.AdminMessage() p.enter_dfu_mode_request = True - logger.info(f"Telling node to enable DFU mode") + print("Telling node to enable DFU mode") + logger.debug("Telling node to enable DFU mode") # If sending to a remote node, wait for ACK/NAK if self == self.iface.localNode: @@ -686,7 +721,8 @@ def shutdown(self, secs: int = 10): self.ensureSessionKey() p = admin_pb2.AdminMessage() p.shutdown_seconds = secs - logger.info(f"Telling node to shutdown in {secs} seconds") + print(f"Telling node to shutdown in {secs} seconds") + logger.debug(f"Telling node to shutdown in {secs} seconds") # If sending to a remote node, wait for ACK/NAK if self == self.iface.localNode: @@ -712,10 +748,12 @@ def factoryReset(self, full: bool = False): p = admin_pb2.AdminMessage() if full: p.factory_reset_device = True - logger.info(f"Telling node to factory reset (full device reset)") + print("Telling node to factory reset (full device reset)") + logger.debug("Telling node to factory reset (full device reset)") else: p.factory_reset_config = True - logger.info(f"Telling node to factory reset (config reset)") + print("Telling node to factory reset (config reset)") + logger.debug("Telling node to factory reset (config reset)") # If sending to a remote node, wait for ACK/NAK if self == self.iface.localNode: diff --git a/meshtastic/tests/ch_reset_config.yaml b/meshtastic/tests/ch_reset_config.yaml new file mode 100644 index 00000000..20adf55f --- /dev/null +++ b/meshtastic/tests/ch_reset_config.yaml @@ -0,0 +1,2 @@ +# configuration file for resetting channel config to a known value +channel_url: https://www.meshtastic.org/e/#CgI6ABIPCAE4AUADSAFQHmgBwAYB diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index 251de98f..772072b2 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -18,10 +18,11 @@ onNode, onReceive, tunnelMain, - set_missing_flags_false, + setMissingFlagsFalse, ) from meshtastic import mt_config - +from meshtastic.tests.test_node import initChannels +from ..protobuf import localonly_pb2, config_pb2 from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611 # from ..ble_interface import BLEInterface @@ -408,8 +409,8 @@ def test_main_nodes(capsys): iface = MagicMock(autospec=SerialInterface) - def mock_showNodes(includeSelf, showFields): - print(f"inside mocked showNodes: {includeSelf} {showFields}") + def mock_showNodes(includeSelf, showFields, printFmt): + print(f"inside mocked showNodes: {includeSelf} {showFields} {printFmt}") iface.showNodes.side_effect = mock_showNodes with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo: @@ -1076,6 +1077,14 @@ def test_main_set_with_invalid(mocked_findports, mocked_serial, mocked_open, moc assert err == "" mo.assert_called() +def mockNode(ifce: SerialInterface) -> Node: + n = Node(ifce, 1234567890, noProto=True) + lc = localonly_pb2.LocalConfig() + n.localConfig = lc + lc.lora.CopyFrom(config_pb2.Config.LoRaConfig()) + n.moduleConfig = localonly_pb2.LocalModuleConfig() + n.channels = initChannels() + return n # TODO: write some negative --configure tests @pytest.mark.unit @@ -1088,24 +1097,21 @@ def test_main_configure_with_snake_case(mocked_findports, mocked_serial, mocked_ """Test --configure with valid file""" sys.argv = ["", "--configure", "example_config.yaml"] mt_config.args = sys.argv - serialInterface = SerialInterface(noProto=True) - anode = Node(serialInterface, 1234567890, noProto=True) - serialInterface.localNode = anode + serialInterface.localNode = mockNode(serialInterface) with patch("meshtastic.serial_interface.SerialInterface", return_value=serialInterface) as mo: main() out, err = capsys.readouterr() assert re.search(r"Connected to radio", out, re.MULTILINE) - # should these come back? maybe a flag? - #assert re.search(r"Setting device owner", out, re.MULTILINE) - #assert re.search(r"Setting device owner short", out, re.MULTILINE) - #assert re.search(r"Setting channel url", out, re.MULTILINE) - #assert re.search(r"Fixing altitude", out, re.MULTILINE) - #assert re.search(r"Fixing latitude", out, re.MULTILINE) - #assert re.search(r"Fixing longitude", out, re.MULTILINE) - #assert re.search(r"Set location_share to LocEnabled", out, re.MULTILINE) - assert re.search(r"Writing modified configuration to device", out, re.MULTILINE) + assert re.search(r"Telling open a transaction to edit settings", out, re.MULTILINE) + assert re.search(r"Setting owner properties: Bob TBeam - BOB - True", out, re.MULTILINE) + assert re.search(r"Setting channel url to https://www.meshtastic.org/e/#CgQ6AggNEg8IATgBQANIAVAeaAHABgE", out, re.MULTILINE) + assert re.search(r"Setting fixed device position to lat 35.88888 lon -93.88888 alt 304", out, re.MULTILINE) + assert re.search(r"Set lora.region to US", out, re.MULTILINE) + assert re.search(r"Set position.position_flags to 3", out, re.MULTILINE) + assert re.search(r"Set telemetry.environment_update_interval to 900", out, re.MULTILINE) + assert re.search(r"Committing modified configuration to device", out, re.MULTILINE) assert err == "" mo.assert_called() @@ -1122,21 +1128,20 @@ def test_main_configure_with_camel_case_keys(mocked_findports, mocked_serial, mo mt_config.args = sys.argv serialInterface = SerialInterface(noProto=True) - anode = Node(serialInterface, 1234567890, noProto=True) - serialInterface.localNode = anode + serialInterface.localNode = mockNode(serialInterface) with patch("meshtastic.serial_interface.SerialInterface", return_value=serialInterface) as mo: main() out, err = capsys.readouterr() assert re.search(r"Connected to radio", out, re.MULTILINE) - # should these come back? maybe a flag? - #assert re.search(r"Setting device owner", out, re.MULTILINE) - #assert re.search(r"Setting device owner short", out, re.MULTILINE) - #assert re.search(r"Setting channel url", out, re.MULTILINE) - #assert re.search(r"Fixing altitude", out, re.MULTILINE) - #assert re.search(r"Fixing latitude", out, re.MULTILINE) - #assert re.search(r"Fixing longitude", out, re.MULTILINE) - assert re.search(r"Writing modified configuration to device", out, re.MULTILINE) + assert re.search(r"Connected to radio", out, re.MULTILINE) + assert re.search(r"Telling open a transaction to edit settings", out, re.MULTILINE) + assert re.search(r"Setting owner properties: Bob TBeam - BOB - None", out, re.MULTILINE) + assert re.search(r"Setting channel url to https://www.meshtastic.org/e/#CgQ6AggNEg8IATgBQANIAVAeaAHABgE", out, re.MULTILINE) + assert re.search(r"Setting fixed device position to lat 35.88888 lon -93.88888 alt 304", out, re.MULTILINE) + assert re.search(r"Set lora.region to US", out, re.MULTILINE) + assert re.search(r"Set position.position_flags to 3", out, re.MULTILINE) + assert re.search(r"Committing modified configuration to device", out, re.MULTILINE) assert err == "" mo.assert_called() @@ -1784,23 +1789,17 @@ def test_main_export_config(capsys): with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo: mo.getLongName.return_value = "foo" mo.getShortName.return_value = "oof" + mo.getIsUnmessagable.return_value = True mo.localNode.getURL.return_value = "bar" mo.getCannedMessage.return_value = "foo|bar" mo.getRingtone.return_value = "24:d=32,o=5" - mo.getMyNodeInfo().get.return_value = { - "latitudeI": 1100000000, - "longitudeI": 1200000000, - "altitude": 100, - "batteryLevel": 34, - "latitude": 110.0, - "longitude": 120.0, + mo.getMyNodeInfo.return_value = { + "user": {"hwModel": "HELTEC_V3", "longName": "foo", "shortName": "oof"}, + "position": {"altitude": 100, "latitude": 110.0, "longitude": 120.0}, + "deviceMetrics": {"airUtilTx": 0.06, "batteryLevel": 101}, + "localStats": {"heapFreeBytes": 132796}, } - mo.localNode.radioConfig.preferences = """phone_timeout_secs: 900 -ls_secs: 300 -position_broadcast_smart: true -fixed_position: true -position_flags: 35""" - export_config(mo) + # export_config(mo) out = export_config(mo) err = "" @@ -1911,7 +1910,7 @@ def test_set_missing_flags_false(): ("mqtt", "encryptionEnabled"), } - set_missing_flags_false(config, false_defaults) + setMissingFlagsFalse(config, false_defaults) # Preserved assert config["bluetooth"]["enabled"] is True diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index c5cb6b3f..bf3c2a7f 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -19,6 +19,13 @@ # from ..util import Timeout +def initChannels(maxIdx: int = 8) -> list[Channel]: + """Initialize a set of empty channels, set the first one to PRIMARY""" + channels = [Channel(index=idx, role=0) for idx in range(0, maxIdx)] + channels[0].role = 1 + return channels + + @pytest.mark.unit def test_node(capsys): """Test that we can instantiate a Node""" @@ -354,28 +361,7 @@ def test_setURL_valid_URL_but_no_settings(capsys): def test_getChannelByChannelIndex(): """Test getChannelByChannelIndex()""" anode = Node("foo", "bar") - - channel1 = Channel(index=1, role=1) # primary channel - channel2 = Channel(index=2, role=2) # secondary channel - channel3 = Channel(index=3, role=0) - channel4 = Channel(index=4, role=0) - channel5 = Channel(index=5, role=0) - channel6 = Channel(index=6, role=0) - channel7 = Channel(index=7, role=0) - channel8 = Channel(index=8, role=0) - - channels = [ - channel1, - channel2, - channel3, - channel4, - channel5, - channel6, - channel7, - channel8, - ] - - anode.channels = channels + anode.channels = initChannels() # test primary assert anode.getChannelByChannelIndex(0) is not None diff --git a/meshtastic/tests/test_smoke1.py b/meshtastic/tests/test_smoke1.py index 6a5709e6..084569b2 100644 --- a/meshtastic/tests/test_smoke1.py +++ b/meshtastic/tests/test_smoke1.py @@ -1,88 +1,217 @@ """Meshtastic smoke tests with a single device via USB""" -import os -import platform import re import subprocess import time +from pathlib import Path +import tempfile -# Do not like using hard coded sleeps, but it probably makes -# sense to pause for the radio at appropriate times import pytest from ..util import findPorts +# Do not like using hard coded sleeps, but it makes +# sense to pause for the radio at appropriate times to +# avoid overload of the radio. # seconds to pause after running a meshtastic command PAUSE_AFTER_COMMAND = 2 -PAUSE_AFTER_REBOOT = 7 +PAUSE_AFTER_REBOOT = 10 +WAIT_FOR_REBOOT = -1 + +""" Following 2 switches allow control creation of additional or debug messages during testing + DEBUG contains a string passed to the meshtastic call to log internal behavior + VERBOSE toggles additional print output during the command execution of smoketest functions + TEMPFILE contains the extracted settings of the radio till the tests have finished, so the + radio will not stay with wrong region settings for a longer time +""" +DEBUG: str = '' +VERBOSE: bool = False +# DEBUG: str = '--debug --logTo smoke1.log' +# VERBOSE: bool = True + +TEMPFILE = None +RESET_CHANNEL_CONFIG = "meshtastic/tests/ch_reset_config.yaml" # Path to reset file + +def noPrint(*args): + """Dummy print function""" + pass + + +vprint = print if VERBOSE else noPrint + + +# Helper functions used in executing tests +def communicate(cmd: str, repeatTimes: int = 2) -> tuple[int, str]: + """Communicate to the radio. Repeat request in case serial line is not operational""" + k = 0 + vprint(f'---COM: "{cmd}", r: {repeatTimes}') + while k < repeatTimes: + return_value, out = subprocess.getstatusoutput(f"{cmd} {DEBUG}") + k += 1 + + if return_value == 0 \ + and not re.search("Input/output error", out, re.MULTILINE) \ + and not re.search("MeshInterfaceError: Timed out", out, re.MULTILINE): + break + vprint(f"k: {k} ret: {return_value} out: {out}") + vprint("Repeating command...") + return return_value, out + + +def waitFor(eventOrTime: int, repeatTimes: int = 5) -> None: + """Wait for a dedicated time (positive integer input) or for a reboot. The latter will ensure that the + serial line is back operational so we can safely send the next command.""" + vprint(f"---WAI {eventOrTime}") + if eventOrTime > 0: + time.sleep(eventOrTime) + elif eventOrTime == WAIT_FOR_REBOOT: + k = 0 + while True: + time.sleep(2*PAUSE_AFTER_REBOOT) + return_value, out = communicate("meshtastic --device-metadata") + vprint(f"ret: {return_value} out: {out}") + k += 1 + if return_value == 0 and re.search("firmware_version", out, re.MULTILINE) is not None: + break + if k > repeatTimes: + vprint("Reboot failed") + break + + +def checkQr(f, qrCompare: str) -> None: + """checks binary file containing url""" + f.seek(0) + qrData = f.read() + assert len(qrData) > 20000 # file containing qr does not contain enough data + qrSplit = qrData.splitlines(keepends=True) + vprint(f"checkQr: found lines: {len(qrSplit)}") + assert len(qrSplit) >= 4 + assert re.search(qrCompare, qrSplit[1].decode('utf-8'), re.MULTILINE) + + +def setAndTestUrl(pat: str, skipTest: bool = False) -> None: + """transmits set-url command with pattern "pat" and then checks if it has been set correctly""" + url = f"https://meshtastic.org/e/#{pat}" + return_value, out = communicate(f"meshtastic --seturl {url}") + assert re.match(r"Connected to radio", out) + assert return_value == 0 + # pause for the radio + waitFor(PAUSE_AFTER_COMMAND) + if not skipTest: + return_value, out = communicate("meshtastic --info") + assert re.search(pat, out, re.MULTILINE) + assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) + +# Fixtures +@pytest.fixture +def temporaryCfgFile(scope="module"): + """Return a temp file valid throughout the whole test. + Purpose: store the exported data for later reconfigure""" + global TEMPFILE + if TEMPFILE is None: + TEMPFILE = tempfile.NamedTemporaryFile(mode='w+t', encoding='utf-8', delete=False) + print(f"created file {TEMPFILE.name}") + else: + open(TEMPFILE.name, 'r+t', encoding='utf-8') + yield TEMPFILE + TEMPFILE.close() @pytest.mark.smoke1 def test_smoke1_reboot(): """Test reboot""" - return_value, _ = subprocess.getstatusoutput("meshtastic --reboot") + return_value, _ = communicate("meshtastic --reboot") assert return_value == 0 # pause for the radio to reset (10 seconds for the pause, and a few more seconds to be back up) - time.sleep(18) + waitFor(WAIT_FOR_REBOOT) @pytest.mark.smoke1 def test_smoke1_info(): """Test --info""" - return_value, out = subprocess.getstatusoutput("meshtastic --info") + return_value, out = communicate("meshtastic --info") assert re.match(r"Connected to radio", out) assert re.search(r"^Owner", out, re.MULTILINE) assert re.search(r"^My info", out, re.MULTILINE) assert re.search(r"^Nodes in mesh", out, re.MULTILINE) assert re.search(r"^Preferences", out, re.MULTILINE) assert re.search(r"^Channels", out, re.MULTILINE) - assert re.search(r"^ PRIMARY", out, re.MULTILINE) + assert re.search(r"^\s*Index 0: PRIMARY", out, re.MULTILINE) assert re.search(r"^Primary channel URL", out, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) + + +@pytest.mark.smoke1 +def test_smoke1_export_config(temporaryCfgFile): + """Test exporting current config, then later reimport and check if things are back as before + Store this config in a temporary file to be used later""" + vprint(f"\nGot temp file: {temporaryCfgFile.name}") + return_value, out = communicate(f'meshtastic --export-config {temporaryCfgFile.name}') + temporaryCfgFile.seek(0) + vprint(f"ret: {return_value} out: {out}") + pat = f"Exported configuration to {temporaryCfgFile.name}".replace('\\', '\\\\') + assert re.match(pat, out) + + +@pytest.mark.smoke1 +def test_smoke1_nodes(): + """Test --nodes""" + return_value, out = communicate('meshtastic --nodes --fmt json') + assert re.match(r"Connected to radio", out) + assert re.search(r"N.+User", out, re.MULTILINE) + assert re.search(r'"N": 1, "User":', out, re.MULTILINE) + assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_get_with_invalid_setting(): """Test '--get a_bad_setting'.""" - return_value, out = subprocess.getstatusoutput("meshtastic --get a_bad_setting") - assert re.search(r"Choices in sorted order", out) + return_value, out = communicate("meshtastic --get a_bad_setting") + assert re.search(r"Choices are...", out) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_set_with_invalid_setting(): """Test '--set a_bad_setting'.""" - return_value, out = subprocess.getstatusoutput("meshtastic --set a_bad_setting foo") - assert re.search(r"Choices in sorted order", out) + return_value, out = communicate("meshtastic --set a_bad_setting foo") + assert re.search(r"Choices are...", out) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 -def test_ch_set_with_invalid_settingpatch_find_ports(): +def test_ch_set_with_invalid_setting(): """Test '--ch-set with a_bad_setting'.""" - return_value, out = subprocess.getstatusoutput( + return_value, out = communicate( "meshtastic --ch-set invalid_setting foo --ch-index 0" ) - assert re.search(r"Choices in sorted order", out) + assert re.search(r"Choices are...", out) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_pos_fields(): """Test --pos-fields (with some values POS_ALTITUDE POS_ALT_MSL POS_BATTERY)""" - return_value, out = subprocess.getstatusoutput( - "meshtastic --pos-fields POS_ALTITUDE POS_ALT_MSL POS_BATTERY" + return_value, out = communicate( + "meshtastic --pos-fields ALTITUDE ALTITUDE_MSL HEADING" ) assert re.match(r"Connected to radio", out) - assert re.search(r"^Setting position fields to 35", out, re.MULTILINE) + assert re.search(r"^Setting position fields to 259", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --pos-fields") + waitFor(WAIT_FOR_REBOOT) + return_value, out = communicate("meshtastic --pos-fields") assert re.match(r"Connected to radio", out) - assert re.search(r"POS_ALTITUDE", out, re.MULTILINE) - assert re.search(r"POS_ALT_MSL", out, re.MULTILINE) - assert re.search(r"POS_BATTERY", out, re.MULTILINE) + assert re.search(r"ALTITUDE", out, re.MULTILINE) + assert re.search(r"ALTITUDE_MSL", out, re.MULTILINE) + assert re.search(r"HEADING", out, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 @@ -90,67 +219,32 @@ def test_smoke1_test_with_arg_but_no_hardware(): """Test --test Note: Since only one device is connected, it will not do much. """ - return_value, out = subprocess.getstatusoutput("meshtastic --test") + return_value, out = communicate("meshtastic --test", repeatTimes=1) assert re.search(r"^Warning: Must have at least two devices", out, re.MULTILINE) assert return_value == 1 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_debug(): """Test --debug""" - return_value, out = subprocess.getstatusoutput("meshtastic --info --debug") + return_value, out = communicate("meshtastic --info --debug") assert re.search(r"^Owner", out, re.MULTILINE) assert re.search(r"^DEBUG file", out, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_seriallog_to_file(): """Test --seriallog to a file creates a file""" - filename = "tmpoutput.txt" - if os.path.exists(f"{filename}"): - os.remove(f"{filename}") - return_value, _ = subprocess.getstatusoutput( - f"meshtastic --info --seriallog {filename}" - ) - assert os.path.exists(f"{filename}") - assert return_value == 0 - os.remove(f"{filename}") - - -@pytest.mark.smoke1 -def test_smoke1_qr(): - """Test --qr""" - filename = "tmpqr" - if os.path.exists(f"{filename}"): - os.remove(f"{filename}") - return_value, _ = subprocess.getstatusoutput(f"meshtastic --qr > {filename}") - assert os.path.exists(f"{filename}") - # not really testing that a valid qr code is created, just that the file size - # is reasonably big enough for a qr code - assert os.stat(f"{filename}").st_size > 20000 - assert return_value == 0 - os.remove(f"{filename}") - - -@pytest.mark.smoke1 -def test_smoke1_nodes(): - """Test --nodes""" - return_value, out = subprocess.getstatusoutput("meshtastic --nodes") - assert re.match(r"Connected to radio", out) - if platform.system() != "Windows": - assert re.search(r" User ", out, re.MULTILINE) - assert re.search(r" 1 ", out, re.MULTILINE) - assert return_value == 0 - - -@pytest.mark.smoke1 -def test_smoke1_send_hello(): - """Test --sendtext hello""" - return_value, out = subprocess.getstatusoutput("meshtastic --sendtext hello") - assert re.match(r"Connected to radio", out) - assert re.search(r"^Sending text message hello to \^all", out, re.MULTILINE) - assert return_value == 0 + with tempfile.NamedTemporaryFile('w+t', encoding='utf-8', delete=True, delete_on_close=False) as f: + return_value, _ = communicate(f"meshtastic --info --seriallog {f.name}") + f.seek(0) + data = f.read() + assert len(data) > 2000 + assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 @@ -161,16 +255,17 @@ def test_smoke1_port(): # hopefully there is just one assert len(ports) == 1 port = ports[0] - return_value, out = subprocess.getstatusoutput(f"meshtastic --port {port} --info") + return_value, out = communicate(f"meshtastic --port {port} --info") assert re.match(r"Connected to radio", out) assert re.search(r"^Owner", out, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_set_location_info(): """Test --setlat, --setlon and --setalt""" - return_value, out = subprocess.getstatusoutput( + return_value, out = communicate( "meshtastic --setlat 32.7767 --setlon -96.7970 --setalt 1337" ) assert re.match(r"Connected to radio", out) @@ -179,150 +274,124 @@ def test_smoke1_set_location_info(): assert re.search(r"^Fixing longitude", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out2 = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out2 = communicate("meshtastic --info") assert re.search(r"1337", out2, re.MULTILINE) assert re.search(r"32.7767", out2, re.MULTILINE) assert re.search(r"-96.797", out2, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_set_owner(): """Test --set-owner name""" # make sure the owner is not Joe - return_value, out = subprocess.getstatusoutput("meshtastic --set-owner Bob") + return_value, out = communicate("meshtastic --set-owner Bob") assert re.match(r"Connected to radio", out) assert re.search(r"^Setting device owner to Bob", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --info") assert not re.search(r"Owner: Joe", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --set-owner Joe") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --set-owner Joe") assert re.match(r"Connected to radio", out) assert re.search(r"^Setting device owner to Joe", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --info") assert re.search(r"Owner: Joe", out, re.MULTILINE) assert return_value == 0 - - -@pytest.mark.smoke1 -def test_smoke1_ch_set_modem_config(): - """Test --ch-set modem_config""" - return_value, out = subprocess.getstatusoutput( - "meshtastic --ch-set modem_config MedFast" - ) - assert re.search(r"Warning: Need to specify", out, re.MULTILINE) - assert return_value == 1 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") - assert not re.search(r"MedFast", out, re.MULTILINE) - assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput( - "meshtastic --ch-set modem_config MedFast --ch-index 0" - ) - assert re.match(r"Connected to radio", out) - assert re.search(r"^Set modem_config to MedFast", out, re.MULTILINE) - assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_REBOOT) - return_value, out = subprocess.getstatusoutput("meshtastic --info") - assert re.search(r"MedFast", out, re.MULTILINE) - assert return_value == 0 - + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 -def test_smoke1_ch_values(): +def test_smoke1_ch_modem_presets(): """Test --ch-vlongslow --ch-longslow, --ch-longfast, --ch-mediumslow, --ch-mediumsfast, --ch-shortslow, and --ch-shortfast arguments """ exp = { - "--ch-vlongslow": '{ "psk": "AQ==" }', - "--ch-longslow": "LongSlow", - "--ch-longfast": "LongFast", - "--ch-medslow": "MedSlow", - "--ch-medfast": "MedFast", - "--ch-shortslow": "ShortSlow", - "--ch-shortfast": "ShortFast", + "--ch-vlongslow": 'VERY_LONG_SLOW', + "--ch-longslow": "LONG_SLOW", + "--ch-longfast": "LONG_FAST", + "--ch-medslow": "MEDIUM_SLOW", + "--ch-medfast": "MEDIUM_FAST", + "--ch-shortslow": "SHORT_SLOW", + "--ch-shortfast": "SHORT_FAST", } - + print("\n") for key, val in exp.items(): print(key, val) - return_value, out = subprocess.getstatusoutput(f"meshtastic {key}") + return_value, out = communicate(f"meshtastic {key}") assert re.match(r"Connected to radio", out) - assert re.search(r"Writing modified channels to device", out, re.MULTILINE) assert return_value == 0 # pause for the radio (might reboot) - time.sleep(PAUSE_AFTER_REBOOT) - return_value, out = subprocess.getstatusoutput("meshtastic --info") - assert re.search(val, out, re.MULTILINE) + waitFor(WAIT_FOR_REBOOT) # Radio tends to stall with many LoRa changes + return_value, out = communicate("meshtastic --info") + assert re.search(f'"modemPreset":\\s*"{val}"', out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_ch_set_name(): """Test --ch-set name""" - return_value, out = subprocess.getstatusoutput("meshtastic --info") + return_value, out = communicate("meshtastic --info") assert not re.search(r"MyChannel", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-set name MyChannel") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --ch-set name MyChannel", repeatTimes=1) + assert return_value == 1 assert re.match(r"Connected to radio", out) assert re.search(r"Warning: Need to specify", out, re.MULTILINE) - assert return_value == 1 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput( + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate( "meshtastic --ch-set name MyChannel --ch-index 0" ) assert re.match(r"Connected to radio", out) assert re.search(r"^Set name to MyChannel", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --info") assert re.search(r"MyChannel", out, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_ch_set_downlink_and_uplink(): """Test -ch-set downlink_enabled X and --ch-set uplink_enabled X""" - return_value, out = subprocess.getstatusoutput( - "meshtastic --ch-set downlink_enabled false --ch-set uplink_enabled false" + return_value, out = communicate( + "meshtastic --ch-set downlink_enabled false --ch-set uplink_enabled false", + repeatTimes=1 ) + assert return_value == 1 assert re.match(r"Connected to radio", out) assert re.search(r"Warning: Need to specify", out, re.MULTILINE) - assert return_value == 1 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput( + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate( "meshtastic --ch-set downlink_enabled false --ch-set uplink_enabled false --ch-index 0" ) assert re.match(r"Connected to radio", out) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") - assert not re.search(r"uplinkEnabled", out, re.MULTILINE) - assert not re.search(r"downlinkEnabled", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --info") + assert re.search(r'("uplinkEnabled")\s*:\s*(false)', out, re.MULTILINE) + assert re.search(r'("downlinkEnabled")\s*:\s*(false)', out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput( + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate( "meshtastic --ch-set downlink_enabled true --ch-set uplink_enabled true --ch-index 0" ) assert re.match(r"Connected to radio", out) @@ -330,271 +399,325 @@ def test_smoke1_ch_set_downlink_and_uplink(): assert re.search(r"^Set uplink_enabled to true", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") - assert re.search(r"uplinkEnabled", out, re.MULTILINE) - assert re.search(r"downlinkEnabled", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --info") + assert re.search(r'("uplinkEnabled")\s*:\s*(true)', out, re.MULTILINE) + assert re.search(r'("downlinkEnabled")\s*:\s*(true)', out, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_ch_add_and_ch_del(): """Test --ch-add""" - return_value, out = subprocess.getstatusoutput("meshtastic --ch-add testing") + return_value, out = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") + assert return_value == 0 + waitFor(WAIT_FOR_REBOOT) + + return_value, out = communicate("meshtastic --ch-add testing1") assert re.search(r"Writing modified channels to device", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --info") assert re.match(r"Connected to radio", out) assert re.search(r"SECONDARY", out, re.MULTILINE) - assert re.search(r"testing", out, re.MULTILINE) + assert re.search(r"testing1", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-index 1 --ch-del") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --ch-index 1 --ch-del") assert re.search(r"Deleting channel 1", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_REBOOT) - # make sure the secondar channel is not there - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(WAIT_FOR_REBOOT) + # make sure the secondary channel is not there + return_value, out = communicate("meshtastic --info") assert re.match(r"Connected to radio", out) assert not re.search(r"SECONDARY", out, re.MULTILINE) - assert not re.search(r"testing", out, re.MULTILINE) + assert not re.search(r"testing1", out, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_ch_enable_and_disable(): """Test --ch-enable and --ch-disable""" - return_value, out = subprocess.getstatusoutput("meshtastic --ch-add testing") + return_value, out = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") + assert return_value == 0 + waitFor(WAIT_FOR_REBOOT) + + return_value, out = communicate("meshtastic --ch-add testing1") assert re.search(r"Writing modified channels to device", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --info") assert re.match(r"Connected to radio", out) - assert re.search(r"SECONDARY", out, re.MULTILINE) - assert re.search(r"testing", out, re.MULTILINE) + assert re.search(r'(Index\s*1:\s* SECONDARY).*("name"\s*:\s*"testing1")', out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) # ensure they need to specify a --ch-index - return_value, out = subprocess.getstatusoutput("meshtastic --ch-disable") + return_value, out = communicate("meshtastic --ch-disable", repeatTimes=1) assert return_value == 1 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput( - "meshtastic --ch-disable --ch-index 1" - ) + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --ch-disable --ch-index 1") assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --info") assert re.match(r"Connected to radio", out) - assert re.search(r"DISABLED", out, re.MULTILINE) - assert re.search(r"testing", out, re.MULTILINE) + assert not re.search(r'SECONDARY', out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput( - "meshtastic --ch-enable --ch-index 1" - ) + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --ch-enable --ch-index 1") assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --info") assert re.match(r"Connected to radio", out) - assert re.search(r"SECONDARY", out, re.MULTILINE) - assert re.search(r"testing", out, re.MULTILINE) + assert re.search(r'(Index\s*1:\s* SECONDARY).*("name"\s*:\s*"testing1")', out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-del --ch-index 1") + waitFor(PAUSE_AFTER_COMMAND) + return_value, out = communicate("meshtastic --ch-del --ch-index 1") + assert re.match(r"Connected to radio", out) + assert re.search(r"Deleting\schannel\s1", out) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_ch_del_a_disabled_non_primary_channel(): """Test --ch-del will work on a disabled non-primary channel.""" - return_value, out = subprocess.getstatusoutput("meshtastic --ch-add testing") + return_value, out = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") + assert return_value == 0 + waitFor(WAIT_FOR_REBOOT) + + return_value, out = communicate("meshtastic --ch-add testing1") assert re.search(r"Writing modified channels to device", out, re.MULTILINE) assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + + return_value, out = communicate("meshtastic --info") assert re.match(r"Connected to radio", out) - assert re.search(r"SECONDARY", out, re.MULTILINE) - assert re.search(r"testing", out, re.MULTILINE) + assert re.search(r'(Index\s*1:\s* SECONDARY).*("name"\s*:\s*"testing1")', out, re.MULTILINE) assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) + # ensure they need to specify a --ch-index - return_value, out = subprocess.getstatusoutput("meshtastic --ch-disable") + return_value, out = communicate("meshtastic --ch-disable", repeatTimes=1) assert return_value == 1 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-del --ch-index 1") + waitFor(PAUSE_AFTER_COMMAND) + + return_value, out = communicate("meshtastic --ch-disable --ch-index 1") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(PAUSE_AFTER_COMMAND) + + return_value, out = communicate("meshtastic --info") + assert re.match(r"Connected to radio", out) + assert not re.search(r"SECONDARY", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) + + return_value, out = communicate("meshtastic --ch-del --ch-index 1") + assert re.search(r"Deleting\schannel\s1", out) + assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) + + return_value, out = communicate("meshtastic --info") assert re.match(r"Connected to radio", out) assert not re.search(r"DISABLED", out, re.MULTILINE) assert not re.search(r"SECONDARY", out, re.MULTILINE) - assert not re.search(r"testing", out, re.MULTILINE) + assert not re.search(r"testing1", out, re.MULTILINE) assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_attempt_to_delete_primary_channel(): """Test that we cannot delete the PRIMARY channel.""" - return_value, out = subprocess.getstatusoutput("meshtastic --ch-del --ch-index 0") + return_value, out = communicate("meshtastic --ch-del --ch-index 0", repeatTimes=1) assert re.search(r"Warning: Cannot delete primary channel", out, re.MULTILINE) assert return_value == 1 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_attempt_to_disable_primary_channel(): """Test that we cannot disable the PRIMARY channel.""" - return_value, out = subprocess.getstatusoutput( - "meshtastic --ch-disable --ch-index 0" + return_value, out = communicate( + "meshtastic --ch-disable --ch-index 0", + repeatTimes=1 ) assert re.search(r"Warning: Cannot enable", out, re.MULTILINE) assert return_value == 1 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_attempt_to_enable_primary_channel(): """Test that we cannot enable the PRIMARY channel.""" - return_value, out = subprocess.getstatusoutput( - "meshtastic --ch-enable --ch-index 0" + return_value, out = communicate( + "meshtastic --ch-enable --ch-index 0", + repeatTimes=1 ) assert re.search(r"Warning: Cannot enable", out, re.MULTILINE) assert return_value == 1 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_ensure_ch_del_second_of_three_channels(): """Test that when we delete the 2nd of 3 channels, that it deletes the correct channel.""" - return_value, out = subprocess.getstatusoutput("meshtastic --ch-add testing1") - assert re.match(r"Connected to radio", out) + + # prepare test setting: setup 2 channels and validate they are created + return_value, out = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") + assert return_value == 0 + waitFor(WAIT_FOR_REBOOT) + + return_value, out = communicate("meshtastic --ch-info") + vprint(f"ret: {return_value} out: {out}") + + return_value, out = communicate("meshtastic --ch-add testing1") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") assert re.match(r"Connected to radio", out) - assert re.search(r"SECONDARY", out, re.MULTILINE) - assert re.search(r"testing1", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) + + return_value, out = communicate("meshtastic --ch-add testing2") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-add testing2") assert re.match(r"Connected to radio", out) + waitFor(PAUSE_AFTER_COMMAND) + + waitFor(WAIT_FOR_REBOOT) + return_value, out = communicate("meshtastic --info") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") assert re.match(r"Connected to radio", out) + assert re.search(r"SECONDARY", out, re.MULTILINE) + assert re.search(r"testing1", out, re.MULTILINE) assert re.search(r"testing2", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) + + # validate the first channel is deleted correctly. + # Second channel must move up to index 1 and index 2 must become disabled + return_value, out = communicate("meshtastic --ch-del --ch-index 1") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-del --ch-index 1") assert re.match(r"Connected to radio", out) + vprint(f"ret: {return_value} out: {out}") + waitFor(PAUSE_AFTER_COMMAND) + + return_value, out = communicate("meshtastic --info") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") assert re.match(r"Connected to radio", out) assert re.search(r"testing2", out, re.MULTILINE) - assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-del --ch-index 1") - assert re.match(r"Connected to radio", out) - assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + assert re.search(r"Index 1: SECONDARY", out, re.MULTILINE) + assert not re.search(r"Index 2", out, re.MULTILINE) + assert not re.search(r"Index 3", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_ensure_ch_del_third_of_three_channels(): """Test that when we delete the 3rd of 3 channels, that it deletes the correct channel.""" - return_value, out = subprocess.getstatusoutput("meshtastic --ch-add testing1") - assert re.match(r"Connected to radio", out) + + # prepare test setting: setup 2 channels and validate they are created + return_value, out = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") + assert return_value == 0 + waitFor(WAIT_FOR_REBOOT) + + return_value, out = communicate("meshtastic --ch-info") + vprint(f"ret: {return_value} out: {out}") + + return_value, out = communicate("meshtastic --ch-add testing1") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") assert re.match(r"Connected to radio", out) - assert re.search(r"SECONDARY", out, re.MULTILINE) - assert re.search(r"testing1", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) + + return_value, out = communicate("meshtastic --ch-add testing2") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-add testing2") assert re.match(r"Connected to radio", out) + waitFor(PAUSE_AFTER_COMMAND) + + waitFor(WAIT_FOR_REBOOT) + return_value, out = communicate("meshtastic --info", repeatTimes=2) assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") assert re.match(r"Connected to radio", out) + assert re.search(r"SECONDARY", out, re.MULTILINE) + assert re.search(r"testing1", out, re.MULTILINE) assert re.search(r"testing2", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) + + # validate the second channel is deleted correctly + return_value, out = communicate("meshtastic --ch-del --ch-index 2") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-del --ch-index 2") assert re.match(r"Connected to radio", out) + waitFor(PAUSE_AFTER_COMMAND) + + return_value, out = communicate("meshtastic --info", repeatTimes=2) assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") assert re.match(r"Connected to radio", out) assert re.search(r"testing1", out, re.MULTILINE) + assert re.search(r"Index 1: SECONDARY", out, re.MULTILINE) + assert not re.search(r"Index 2", out, re.MULTILINE) + assert not re.search(r"Index 3", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) + + +@pytest.mark.smoke1 +def test_smoke1_set_primary_channel(): + """Test --seturl with primary channel""" + # prepare test setting: setup 2 channels and validate they are created + return_value, _ = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --ch-del --ch-index 1") - assert re.match(r"Connected to radio", out) + waitFor(WAIT_FOR_REBOOT) + + # set to different url + pat = "CgcSAQE6AggNEg8IATgDQANIAVAbaAHABgE" + setAndTestUrl(pat) + + +@pytest.mark.smoke1 +def test_smoke1_qr(): + """Test --qr, based on setting via URL""" + # prepare test setting: setup 2 channels and validate they are created + return_value, _ = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(WAIT_FOR_REBOOT) + + # set to different url + pat = "CgcSAQE6AggNEg8IATgDQANIAVAbaAHABgE" + setAndTestUrl(pat) + with tempfile.NamedTemporaryFile('w+b', delete=True, delete_on_close=False) as f: + return_value, _ = communicate(f"meshtastic --qr >{f.name}") + assert return_value == 0 + checkQr(f, pat) + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_seturl_default(): """Test --seturl with default value""" + # prepare test setting: setup std channel + return_value, out = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") + assert return_value == 0 + waitFor(WAIT_FOR_REBOOT) + # set some channel value so we no longer have a default channel - return_value, out = subprocess.getstatusoutput( + return_value, out = communicate( "meshtastic --ch-set name foo --ch-index 0" ) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) # ensure we no longer have a default primary channel - return_value, out = subprocess.getstatusoutput("meshtastic --info") - assert not re.search("CgUYAyIBAQ", out, re.MULTILINE) - assert return_value == 0 - url = "https://www.meshtastic.org/d/#CgUYAyIBAQ" - return_value, out = subprocess.getstatusoutput(f"meshtastic --seturl {url}") - assert re.match(r"Connected to radio", out) - assert return_value == 0 - # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput("meshtastic --info") - assert re.search("CgUYAyIBAQ", out, re.MULTILINE) + return_value, out = communicate("meshtastic --info") + assert not re.search("CgI6ABIPCAE4A0ADSAFQG2gBwAYB", out, re.MULTILINE) assert return_value == 0 @@ -603,31 +726,107 @@ def test_smoke1_seturl_invalid_url(): """Test --seturl with invalid url""" # Note: This url is no longer a valid url. url = "https://www.meshtastic.org/c/#GAMiENTxuzogKQdZ8Lz_q89Oab8qB0RlZmF1bHQ=" - return_value, out = subprocess.getstatusoutput(f"meshtastic --seturl {url}") + return_value, out = communicate(f"meshtastic --seturl {url}", repeatTimes=1) assert re.match(r"Connected to radio", out) assert re.search("Warning: There were no settings", out, re.MULTILINE) assert return_value == 1 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) + waitFor(PAUSE_AFTER_COMMAND) + + +@pytest.mark.smoke1 +def test_smoke1_seturl_2chan(): + """Test --seturl with 2 channels""" + # prepare test setting: setup 2 channels and validate they are created + return_value, _ = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") + assert return_value == 0 + waitFor(WAIT_FOR_REBOOT) + + pat = "CgcSAQE6AggNCjASIOKjX3f5UXnz8zkcXi6MxfIsnNof5sUAW4FQQi_IXsLdGgRUZXN0KAEwAToCCBESDwgBOANAA0gBUBtoAcAGAQ" + setAndTestUrl(pat) + # check qr output + with tempfile.NamedTemporaryFile('w+b', delete=True, delete_on_close=False) as f: + return_value, _ = communicate(f"meshtastic --qr-all >{f.name}") + assert return_value == 0 + checkQr(f, pat) + + +@pytest.mark.smoke1 +def test_smoke1_seturl_3_to_2_chan(): + """Test --seturl with 3 channels, then reconfigure 2 channels""" + # prepare test setting: setup 2 channels and validate they are created + return_value, out = communicate(f"meshtastic --configure {RESET_CHANNEL_CONFIG}") + assert return_value == 0 + waitFor(WAIT_FOR_REBOOT) + + pat = "CgcSAQE6AggNCjESIOKjX3f5UXnz8zkcXi6MxfIsnNof5sUAW4FQQi_IXsLdGgV0ZXN0MSgBMAE6AggRCi0SIGyPI2Gbw3v6rl9H" + \ + "Q8SL3LvRx7ScovIdU6pahs_l59CoGgV0ZXN0MigBMAESDwgBOANAA0gBUBtoAcAGAQ" + setAndTestUrl(pat) + # check that we have 3 channels + return_value, out = communicate("meshtastic --info") + assert return_value == 0 + assert re.match(r"Connected to radio", out) + assert re.search(r"SECONDARY", out, re.MULTILINE) + assert re.search(r"test1", out, re.MULTILINE) + assert re.search(r"test2", out, re.MULTILINE) + waitFor(PAUSE_AFTER_COMMAND) + + # now configure 2 channels only + patSet = "CgcSAQE6AggNCjASIOKjX3f5UXnz8zkcXi6MxfIsnNof5sUAW4FQQi_IXsLdGgRUZXN0KAEwAToCCBESDwgBOANAA0gBUBtoAcAGAQ" + setAndTestUrl(patSet, skipTest=True) + + # now test for patComp (url will be diefferent because of not deleted channel 2) + return_value, out = communicate("meshtastic --info") + assert return_value == 0 + assert re.match(r"Connected to radio", out) + assert re.search(r"SECONDARY", out, re.MULTILINE) + assert re.search(r"Test", out, re.MULTILINE) # Test for changed channel + assert re.search(r"test2", out, re.MULTILINE) # this one should remain as before + waitFor(PAUSE_AFTER_COMMAND) + # Note: keep one secondary channel in order to send the hello to it + +@pytest.mark.smoke1 +def test_smoke1_send_hello(): + """Test --sendtext hello, use channel 1 to not bother other participants with testing messages""" + return_value, out = communicate('meshtastic --sendtext "hello from smoke test" --ch-index 1') + assert re.match(r"Connected to radio", out) + assert re.search(r"^Sending text message hello from smoke test to \^all on channelIndex:1", out, re.MULTILINE) + assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_configure(): """Test --configure""" - _, out = subprocess.getstatusoutput(f"meshtastic --configure example_config.yaml") - assert re.match(r"Connected to radio", out) - assert re.search("^Setting device owner to Bob TBeam", out, re.MULTILINE) - assert re.search("^Fixing altitude at 304 meters", out, re.MULTILINE) - assert re.search("^Fixing latitude at 35.8", out, re.MULTILINE) - assert re.search("^Fixing longitude at -93.8", out, re.MULTILINE) - assert re.search("^Setting device position", out, re.MULTILINE) - assert re.search("^Set region to 1", out, re.MULTILINE) - assert re.search("^Set is_always_powered to true", out, re.MULTILINE) - assert re.search("^Set screen_on_secs to 31536000", out, re.MULTILINE) - assert re.search("^Set wait_bluetooth_secs to 31536000", out, re.MULTILINE) - assert re.search("^Writing modified preferences to device", out, re.MULTILINE) - # pause for the radio - time.sleep(PAUSE_AFTER_REBOOT) + cfgPth = Path('example_config.yaml') + if not cfgPth.exists(): + cfgPth = Path.cwd().parent / cfgPth + if not cfgPth.exists(): + pytest.fail(f"Cannot access config: actual path: {Path.cwd()}. Execute tests from base folder.") + + _, out = communicate(f"meshtastic --configure {str(cfgPth)}") + vprint(f"out: {out}") + assert re.search("Connected to radio", out) + assert re.search("^Setting owner properties: Bob TBeam - BOB - True", out, re.MULTILINE) + assert re.search("^Setting channel url to https://www.meshtastic.org/e/#CgQ6AggNEg8IATgBQANIAVAeaAHABgE", out, re.MULTILINE) + assert re.search("^Setting fixed device position to lat 35.88888 lon -93.88888 alt 304", out, re.MULTILINE) + assert re.search("^Set lora.region to US", out, re.MULTILINE) + assert re.search("^Set display.screen_on_secs to 781", out, re.MULTILINE) + assert re.search("^Set power.wait_bluetooth_secs to 344", out, re.MULTILINE) + assert re.search("^Committing modified configuration to device", out, re.MULTILINE) + # pause for the radio + waitFor(WAIT_FOR_REBOOT) + return_value, out = communicate("meshtastic --info", repeatTimes=2) + assert re.search('Bob TBeam', out, re.MULTILINE) + assert re.search('"latitude": 35.8', out, re.MULTILINE) + assert re.search('"longitude": -93.8', out, re.MULTILINE) + assert re.search('"gpsMode": "ENABLED"', out, re.MULTILINE) + assert re.search('"region": "US"', out, re.MULTILINE) + assert re.search('"screenOnSecs": 781', out, re.MULTILINE) + assert re.search('"waitBluetoothSecs": 344', out, re.MULTILINE) + assert re.search("CgQ6AggNEg8IATgBQANIAVAeaAHABgE", out, re.MULTILINE) + assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 @@ -635,45 +834,58 @@ def test_smoke1_set_ham(): """Test --set-ham Note: Do a factory reset after this setting so it is very short-lived. """ - return_value, out = subprocess.getstatusoutput("meshtastic --set-ham KI1234") + return_value, out = communicate("meshtastic --set-ham KI1234") assert re.search(r"Setting Ham ID", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_REBOOT) - return_value, out = subprocess.getstatusoutput("meshtastic --info") + waitFor(WAIT_FOR_REBOOT) + return_value, out = communicate("meshtastic --info") assert re.search(r"Owner: KI1234", out, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_set_wifi_settings(): """Test --set wifi_ssid and --set wifi_password""" - return_value, out = subprocess.getstatusoutput( - 'meshtastic --set wifi_ssid "some_ssid" --set wifi_password "temp1234"' + return_value, out = communicate( + 'meshtastic --set network.wifi_ssid "some_ssid" --set network.wifi_psk "temp1234"' ) assert re.match(r"Connected to radio", out) - assert re.search(r"^Set wifi_ssid to some_ssid", out, re.MULTILINE) - assert re.search(r"^Set wifi_password to temp1234", out, re.MULTILINE) + assert re.search(r"^Set network.wifi_ssid to some_ssid", out, re.MULTILINE) + assert re.search(r"^Set network.wifi_psk to temp1234", out, re.MULTILINE) assert return_value == 0 # pause for the radio - time.sleep(PAUSE_AFTER_COMMAND) - return_value, out = subprocess.getstatusoutput( - "meshtastic --get wifi_ssid --get wifi_password" + waitFor(WAIT_FOR_REBOOT) + return_value, out = communicate( + "meshtastic --get network.wifi_ssid --get network.wifi_psk" ) - assert re.search(r"^wifi_ssid: some_ssid", out, re.MULTILINE) - assert re.search(r"^wifi_password: sekrit", out, re.MULTILINE) + assert re.search(r"^network.wifi_ssid:\s*some_ssid", out, re.MULTILINE) + assert re.search(r"^network.wifi_psk:\s*temp1234", out, re.MULTILINE) assert return_value == 0 + waitFor(PAUSE_AFTER_COMMAND) @pytest.mark.smoke1 def test_smoke1_factory_reset(): """Test factory reset""" - return_value, out = subprocess.getstatusoutput( - "meshtastic --set factory_reset true" - ) - assert re.match(r"Connected to radio", out) - assert re.search(r"^Set factory_reset to true", out, re.MULTILINE) - assert re.search(r"^Writing modified preferences to device", out, re.MULTILINE) + return_value, out = communicate("meshtastic --factory-reset") + assert re.search("Connected to radio", out) + assert re.search(r"(factory reset).+config\sreset", out, re.MULTILINE) assert return_value == 0 # NOTE: The radio may not be responsive after this, may need to do a manual reboot # by pressing the button + waitFor(WAIT_FOR_REBOOT) + +@pytest.mark.smoke1 +def test_smoke1_config_reset(temporaryCfgFile): + """Restore original settings""" + vprint(f"Got temp file: {temporaryCfgFile.name}") + return_value, out = communicate(f"meshtastic --config {temporaryCfgFile.name}") + vprint(f"ret: {return_value} out: {out}") + assert re.search(r"Connected to radio", out) + assert re.search(r"Configuration finished", out) + + +if TEMPFILE is not None: + Path(TEMPFILE).unlink(missing_ok=True)