diff --git a/.gitignore b/.gitignore
index e2ae38d70..60421e4c2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,4 +8,4 @@ IoTuring/Configurator/configurations.json*
IoTuring/Configurator/dontmoveconf.itg*
.venv
build
-*.egg-info
\ No newline at end of file
+*.egg-info
diff --git a/IoTuring/Configurator/Configurator.py b/IoTuring/Configurator/Configurator.py
index 3e84957ce..da778e2fd 100644
--- a/IoTuring/Configurator/Configurator.py
+++ b/IoTuring/Configurator/Configurator.py
@@ -74,9 +74,9 @@ def OpenConfigInEditor(self):
editor_command = next(
(e for e in editors if OsD.CommandExists(e)), "")
if editor_command:
- subprocess.run(f'{editor_command} "{config_path}"',
- shell=True, close_fds=True)
- return
+ OsD.RunCommand(f'{editor_command} "{config_path}"',
+ shell=True, close_fds=True, capture_output=False)
+ return
self.Log(self.LOG_WARNING, "No editor found")
diff --git a/IoTuring/Entity/Deployments/Wifi/Wifi.py b/IoTuring/Entity/Deployments/Wifi/Wifi.py
new file mode 100644
index 000000000..a7807e3cd
--- /dev/null
+++ b/IoTuring/Entity/Deployments/Wifi/Wifi.py
@@ -0,0 +1,385 @@
+import re
+from socket import AddressFamily
+
+import psutil
+import locale
+
+from IoTuring.Configurator.MenuPreset import MenuPreset
+from IoTuring.Entity.Entity import Entity
+from IoTuring.Entity.EntityData import EntitySensor
+from IoTuring.Entity.ValueFormat import ValueFormatterOptions
+from IoTuring.MyApp.SystemConsts import OperatingSystemDetection as OsD
+
+VALUEFORMATTEROPTIONS_DBM = ValueFormatterOptions(ValueFormatterOptions.TYPE_RADIOPOWER)
+VALUEFORMATTEROPTIONS_PERCENTAGE = ValueFormatterOptions(
+ ValueFormatterOptions.TYPE_PERCENTAGE
+)
+
+WIFI_CHOICE_STRING = "Name: {:<15}, IP: {:<16}, MAC: {:<11}"
+
+CONFIG_KEY_WIFI = "wifi"
+
+SIGNAL_UNIT = "dBm" # windows also supports "%"
+SHOW_NA = False # don't show not available extraAttributes
+
+KEY_SIGNAL_STRENGTH_PERCENT = "signal_strength_percent"
+KEY_SIGNAL_STRENGTH_DBM = "signal_strength_dbm"
+
+# WINDOWS
+EXTRA_KEY_NAME = "name"
+EXTRA_KEY_DESCRIPTION = "description"
+EXTRA_KEY_PHYSICAL_ADDRESS = "physical_address"
+EXTRA_KEY_STATE = "state"
+EXTRA_KEY_SSID = "ssid"
+EXTRA_KEY_BSSID = "bssid"
+EXTRA_KEY_NETWORK_TYPE = "network_type"
+EXTRA_KEY_RADIO_TYPE = "radio_type"
+EXTRA_KEY_SIGNAL = "signal"
+EXTRA_KEY_PROFILE = "profile"
+EXTRA_KEY_HOSTED_NETWORK_STATUS = "hosted_network_status"
+# LINUX
+# EXTRA_KEY_BSSID = "BSSID" # already in windows
+# EXTRA_KEY_SSID = "ssid" # already in windows
+EXTRA_KEY_FREQUENCY = "Frequency"
+# EXTRA_KEY_RX_BYTES = "RX_bytes"
+# EXTRA_KEY_TX_BYTES = "TX_bytes"
+EXTRA_KEY_SIGNAL = "Signal"
+# EXTRA_KEY_RX_BITRATE = "RX_bitrate"
+# EXTRA_KEY_TX_BITRATE = "TX_bitrate"
+# EXTRA_KEY_BSS_FLAGS = "BSS_flags"
+# EXTRA_KEY_DTIM_PERIOD = "DTIM_period"
+# EXTRA_KEY_BEACON_INTERVAL = "Beacon_interval"
+# MACOS
+EXTRA_KEY_AGRCTLRSSI = "agrCtlRSSI"
+EXTRA_KEY_AGREXTRSSI = "agrExtRSSI"
+# EXTRA_KEY_STATE = 'state' # already in windows
+EXTRA_KEY_OP_MODE = "op mode"
+EXTRA_KEY_LASTTXRATE = "lastTxRate"
+EXTRA_KEY_MAXRATE = "maxRate"
+
+# EXTRA_KEY_BSSID = 'BSSID' # already in windows
+# EXTRA_KEY_SSID = 'SSID' # already in windows
+# EXTRA_KEY_CHANNEL = 'channel' # already in windows
+
+
+class Wifi(Entity):
+ NAME = "Wifi"
+ ALLOW_MULTI_INSTANCE = True
+
+ def Initialize(self):
+ self.platform = OsD.GetOs()
+ self.locale_str, _ = locale.getdefaultlocale()
+ self.language: str = self.locale_str.split("_")[0]
+ self.showNA = SHOW_NA
+
+ # In macos trick the language to be english since the output of airport is always in english
+ if OsD.IsMacos():
+ self.language = "en"
+
+ self.wifiInterface = self.GetFromConfigurations(CONFIG_KEY_WIFI)
+
+ self.commands = {
+ OsD.WINDOWS: ["netsh", "wlan", "show", "interfaces"],
+ OsD.LINUX: ["iw", "dev", self.wifiInterface, "link"],
+ OsD.MACOS: [
+ "/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport",
+ "-I",
+ ],
+ }
+ self.patterns = {
+ OsD.WINDOWS: {
+ #"en": {
+ "Description": r"Description\s+:\s+(.*)",
+ "Physical address": r"Physical address\s+:\s+([\w:]+)",
+ "State": r"State\s+:\s+(.*)",
+ "SSID": r"SSID\s+:\s+(.*)",
+ "BSSID": r"BSSID\s+:\s+([\w:]+)",
+ "Network type": r"Network type\s+:\s+(.*)",
+ "Radio type": r"Radio type\s+:\s+(.*)",
+ "Signal": r"Signal\s+:\s+(\d+%?)",
+ "Profile": r"Profile\s+:\s+(.*)",
+ "Hosted network status": r"Hosted network status\s+:\s+(.*)",
+ },
+ #"de": {
+ # "Schnittstellenname": r"Schnittstellenname:\s+(.+)",
+ # "Beschreibung": r"Beschreibung:\s+(.+)",
+ # "Physische Adresse": r"Physische Adresse:\s+([0-9A-Fa-f-]+)",
+ # "Status": r"Status:\s+(.+)",
+ # "SSID": r"SSID:\s+(.+)",
+ # "BSSID": r"BSSID:\s+([0-9A-Fa-f-]+)",
+ # "Netzwerktyp": r"Netzwerktyp:\s+(.+)",
+ # "Funktyp": r"Funktyp:\s+(.+)",
+ # "Signal": r"Signal:\s+(\d+%?)",
+ # "Profil": r"Profil:\s+(.+)",
+ #},
+ #"sp": {
+ # "Nombre de interfaz": r"Nombre de interfaz:\s+(.+)",
+ # "Descripción": r"Descripción:\s+(.+)",
+ # "Dirección física": r"Dirección física:\s+([0-9A-Fa-f-]+)",
+ # "Estado": r"Estado:\s+(.+)",
+ # "SSID": r"SSID:\s+(.+)",
+ # "BSSID": r"BSSID:\s+([0-9A-Fa-f-]+)",
+ # "Tipo de red": r"Tipo de red:\s+(.+)",
+ # "Señal": r"Señal:\s+(\d+%?)",
+ # "Perfil": r"Perfil:\s+(.+)",
+ #},
+ #},
+ OsD.LINUX: {
+ "BSSID": r'Connected to (\S+) \(on \S+\)',
+ "SSID": r'SSID: (.+)',
+ "Frequency": r'freq: ([\d.]+)',
+ #"RX_bytes": r'RX: (\d+) bytes \(\d+ packets\)',
+ #"TX_bytes": r'TX: (\d+) bytes \(\d+ packets\)',
+ "Signal": r'signal: (-?\d+) dBm',
+ #"RX_bitrate": r'rx bitrate: ([\d.]+) MBit/s',
+ #"TX_bitrate": r'tx bitrate: ([\d.]+) MBit/s',
+ #"BSS_flags": r'bss flags: (.+)',
+ #"DTIM_period": r'dtim period: (\d+)',
+ #"Beacon_interval": r'beacon int: (\d+)'
+ },
+ OsD.MACOS: { # no language differentiation in macos: always english
+ "agrCtlRSSI": r"[^\n][\s]*agrCtlRSSI:\s+(-?\d+)\n",
+ "agrExtRSSI": r"[^\n][\s]*agrExtRSSI:\s+(-?\d+)\n",
+ "state": r"[^\n][\s]*state:\s+(\w+)\n",
+ "op mode": r"[^\n][\s]*op mode:\s+(\w+)\n",
+ "lastTxRate": r"[^\n][\s]*lastTxRate:\s+(\d+)\n",
+ "maxRate": r"[^\n][\s]*maxRate:\s+(\d+)\n",
+ "BSSID": r"[^\n][\s]*BSSID:\s+([\w:]+)\n",
+ "SSID": r"\n[\s]*SSID:\s+([\w\s]+)\n",
+ "channel": r"[^\n][\s]*channel:\s+([\d,]+)\n",
+ },
+ }
+
+ if OsD.IsWindows():
+ self.keySignalStrength = KEY_SIGNAL_STRENGTH_PERCENT
+ self.valueFormatterOptionsSignalStrength = VALUEFORMATTEROPTIONS_PERCENTAGE
+ else:
+ self.keySignalStrength = KEY_SIGNAL_STRENGTH_DBM
+ self.valueFormatterOptionsSignalStrength = VALUEFORMATTEROPTIONS_DBM
+
+ self.RegisterEntitySensor(
+ EntitySensor(
+ self,
+ key=self.keySignalStrength,
+ supportsExtraAttributes=True,
+ valueFormatterOptions=self.valueFormatterOptionsSignalStrength,
+ ),
+ )
+
+ def Update(self):
+ p = self.RunCommand(self.commands[self.platform])
+ if not p.stdout:
+ raise Exception("error in GetWirelessInfo\n", p.stderr)
+ wifiInfo = self.GetWirelessInfo(p.stdout)
+ if not wifiInfo:
+ raise Exception("error while parsing wirelessInfo")
+ # set signal strength
+ if self.platform == OsD.WINDOWS and "Signal" in wifiInfo:
+ if SIGNAL_UNIT == "%":
+ self.SetEntitySensorValue(
+ key=self.keySignalStrength, value=wifiInfo["Signal"]
+ )
+ elif SIGNAL_UNIT == "dBm":
+ self.SetEntitySensorValue(
+ key=self.keySignalStrength,
+ value=self.PercentToDbm(int(wifiInfo["Signal"][:-1])),
+ )
+ elif self.platform == OsD.LINUX and "Signal" in wifiInfo:
+ self.SetEntitySensorValue(
+ key=self.keySignalStrength, value=wifiInfo["Signal"]
+ )
+ elif self.platform == OsD.MACOS and "agrCtlRSSI" in wifiInfo:
+ self.SetEntitySensorValue(
+ key=self.keySignalStrength, value=wifiInfo["agrCtlRSSI"]
+ )
+ else: # if there is no signal level found the interface might not be connected to an access point
+ self.SetEntitySensorValue(key=self.keySignalStrength, value="not connected")
+
+ # Extra attributes
+ for key in self.patterns[self.platform]:
+ extraKey = "EXTRA_KEY_" + key.upper().replace(" ", "_").replace(".", "_")
+ if key in wifiInfo:
+ attributevalue = wifiInfo[key]
+ elif self.showNA:
+ attributevalue = "not available"
+ else:
+ continue
+
+ self.SetEntitySensorExtraAttribute(
+ sensorDataKey=self.keySignalStrength,
+ attributeKey=globals()[extraKey],
+ attributeValue=attributevalue,
+ )
+
+ def GetWirelessInfo(self, stdout):
+ wifi_info = {}
+ for key, pattern in self.patterns[self.platform].items():
+ match = re.search(pattern, stdout, re.IGNORECASE)
+ if match:
+ wifi_info[key] = match.group(1) if match.group(1) else match.group(0)
+ return wifi_info
+
+ @classmethod
+ def ConfigurationPreset(cls) -> MenuPreset:
+ NIC_CHOICES = Wifi.GetWifiNics(getInfo=True)
+ SIGNAL_CHOICES = ["%", "dBm"]
+
+ preset = MenuPreset()
+ preset.AddEntry(
+ name="Interface to check",
+ key=CONFIG_KEY_WIFI,
+ mandatory=True,
+ question_type="select",
+ choices=NIC_CHOICES,
+ )
+ if OsD.IsWindows():
+ preset.AddEntry(
+ name="Signal Quality in dBm or percent",
+ key=SIGNAL_UNIT,
+ mandatory=True,
+ question_type="select",
+ choices=SIGNAL_CHOICES,
+ )
+ return preset
+
+ @staticmethod
+ def PercentToDbm(percentage):
+ return (percentage / 2) - 100
+
+ @staticmethod
+ def GetWifiNics(getInfo=True):
+ interfaces = psutil.net_if_addrs()
+ NIC_CHOICES = []
+
+ def appendNicChoice(interfaceName, nicip4="", nicip6="", nicmac=""):
+ NIC_CHOICES.append(
+ {
+ "name": WIFI_CHOICE_STRING.format(
+ interfaceName,
+ nicip4 if nicip4 else nicip6, # defaults to showing ipv4
+ nicmac,
+ ),
+ "value": interfaceName,
+ }
+ )
+
+ ip4 = ""
+ ip6 = ""
+ mac = ""
+
+ if OsD.IsLinux():
+ for interface in interfaces:
+ p = OsD.RunCommand(["iw", "dev", interface, "link"])
+ if (
+ p.returncode > 0
+ ): # if the returncode is 0 iwconfig succeeded, else continue with next interface
+ continue
+ if not getInfo:
+ appendNicChoice(interface)
+ continue
+ else:
+ nicinfo = interfaces[interface] # TODO Typehint
+ for nicaddr in nicinfo:
+ if nicaddr.family == AddressFamily.AF_INET:
+ ip4 = nicaddr.address
+ continue
+ elif nicaddr.family == AddressFamily.AF_INET6:
+ ip6 = nicaddr.address
+ continue
+ elif nicaddr.family == psutil.AF_LINK:
+ mac = nicaddr.address
+ continue
+ appendNicChoice(interface, ip4, ip6, mac)
+ return NIC_CHOICES
+
+ elif OsD.IsWindows():
+ p = OsD.RunCommand(["netsh", "wlan", "show", "interfaces"])
+ if (
+ p.returncode > 0
+ ): # if the returncode is 0 iwconfig succeeded, else continue with next interface
+ raise Exception("RunCommand netsh returncode > 0")
+ output = p.stdout
+ numInterfacesMatch = re.search(
+ r"There is (\d+) interface(?:s)? on the system", output
+ )
+ numOfInterfaces = int(numInterfacesMatch.group(1))
+ if numOfInterfaces == 0:
+ raise Exception("no wireless interface found")
+ elif numOfInterfaces > 1:
+ raise Exception(
+ "more than one wireless interface not supported, create a github issue with the output of 'netsh wlan show interfaces' atached"
+ )
+ interfaceMatch = re.search(r"Name\s+:\s+(\w+)", output)
+ interfaceName = interfaceMatch.group(1)
+ if not getInfo:
+ appendNicChoice(interfaceName)
+ else:
+ nicinfo = interfaces[interfaceName] # TODO Typehint
+ for nicaddr in nicinfo:
+ if nicaddr.family == AddressFamily.AF_INET:
+ ip4 = nicaddr.address
+ continue
+ elif nicaddr.family == AddressFamily.AF_INET6:
+ ip6 = nicaddr.address
+ continue
+ elif nicaddr.family == psutil.AF_LINK:
+ mac = nicaddr.address
+ continue
+ appendNicChoice(interfaceName, ip4, ip6, mac)
+ return NIC_CHOICES
+
+ elif OsD.IsMacos():
+ for interface in interfaces:
+ p = OsD.RunCommand(["airport", interface])
+ if (
+ p.returncode > 0
+ ): # if the returncode is 0 iwconfig succeeded, else continue with next interface
+ continue
+ nicinfo = interfaces[interface] # TODO Typehint
+ if not getInfo:
+ appendNicChoice(interface)
+ continue
+ else:
+ for nicaddr in nicinfo:
+ if nicaddr.family == AddressFamily.AF_INET:
+ ip4 = nicaddr.address
+ continue
+ elif nicaddr.family == AddressFamily.AF_INET6:
+ ip6 = nicaddr.address
+ continue
+ elif nicaddr.family == psutil.AF_LINK:
+ mac = nicaddr.address
+ continue
+
+ appendNicChoice(interface, ip4, ip6, mac)
+ return NIC_CHOICES
+
+ @classmethod
+ def CheckSystemSupport(cls):
+ if OsD.IsLinux():
+ if not OsD.CommandExists("iw"):
+ raise Exception("iw not found")
+ wifiNics = Wifi.GetWifiNics(getInfo=False)
+ if not wifiNics:
+ raise Exception("no wireless interface found")
+
+ elif OsD.IsWindows():
+ if not OsD.CommandExists("netsh"):
+ raise Exception("netsh not found")
+ elif "English" not in locale.getlocale()[0]:
+ raise Exception(
+ "locale not supported, create a github issue with the output of 'netsh wlan show interfaces' atached"
+ )
+ wifiNics = Wifi.GetWifiNics(getInfo=False)
+ if not wifiNics:
+ raise Exception("no wireless interface found")
+
+ elif OsD.IsMacos():
+ if not OsD.CommandExists("/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport"):
+ raise Exception("airport not found")
+ wifiNics = Wifi.GetWifiNics(getInfo=False)
+ if not wifiNics:
+ raise Exception("no wireless interface found")
+
+ else:
+ raise Exception("OS detection failed")
diff --git a/IoTuring/Entity/Deployments/Wifi/test.py b/IoTuring/Entity/Deployments/Wifi/test.py
new file mode 100644
index 000000000..b6182112c
--- /dev/null
+++ b/IoTuring/Entity/Deployments/Wifi/test.py
@@ -0,0 +1,754 @@
+#!/usr/bin/env python3
+# vim: set fileencoding=utf-8
+
+"""Implementations of wifi functions of Linux."""
+
+import re
+import platform
+import time
+import logging
+from ctypes import *
+from ctypes.wintypes import *
+from comtypes import GUID
+
+"""Constants used in pywifi library define here."""
+
+# Define interface status.
+IFACE_DISCONNECTED = 0
+IFACE_SCANNING = 1
+IFACE_INACTIVE = 2
+IFACE_CONNECTING = 3
+IFACE_CONNECTED = 4
+
+# Define auth algorithms.
+AUTH_ALG_OPEN = 0
+AUTH_ALG_SHARED = 1
+
+# Define auth key mgmt types.
+AKM_TYPE_NONE = 0
+AKM_TYPE_WPA = 1
+AKM_TYPE_WPAPSK = 2
+AKM_TYPE_WPA2 = 3
+AKM_TYPE_WPA2PSK = 4
+AKM_TYPE_UNKNOWN = 5
+
+# Define ciphers.
+CIPHER_TYPE_NONE = 0
+CIPHER_TYPE_WEP = 1
+CIPHER_TYPE_TKIP = 2
+CIPHER_TYPE_CCMP = 3
+CIPHER_TYPE_UNKNOWN = 4
+
+KEY_TYPE_NETWORKKEY = 0
+KEY_TYPE_PASSPHRASE = 1
+
+class Profile():
+
+ def __init__(self):
+
+ self.id = 0
+ self.auth = AUTH_ALG_OPEN
+ self.akm = [AKM_TYPE_NONE]
+ self.cipher = CIPHER_TYPE_NONE
+ self.ssid = None
+ self.bssid = None
+ self.key = None
+
+ def process_akm(self):
+
+ if len(self.akm) > 1:
+ self.akm = self.akm[-1:]
+
+ def __eq__(self, profile):
+
+ if profile.ssid:
+ if profile.ssid != self.ssid:
+ return False
+
+ if profile.bssid:
+ if profile.bssid != self.bssid:
+ return False
+
+ if profile.auth:
+ if profile.auth!= self.auth:
+ return False
+
+ if profile.cipher:
+ if profile.cipher != self.cipher:
+ return False
+
+ if profile.akm:
+ if set(profile.akm).isdisjoint(set(self.akm)):
+ return False
+
+ return True
+
+
+if platform.release().lower() == 'xp':
+ if platform.win32_ver()[2].lower() in ['sp2', 'sp3']:
+ CLIENT_VERSION = 1
+else:
+ CLIENT_VERSION = 2
+
+"""
+Some types does not exist in python2 ctypes.wintypes so we fake them
+using how its defined in python3 ctypes.wintypes.
+"""
+if not "PDWORD" in dir():
+ PDWORD = POINTER(DWORD)
+
+if not "PWCHAR" in dir():
+ PWCHAR= POINTER(WCHAR)
+
+ERROR_SUCCESS = 0
+WLAN_MAX_PHY_TYPE_NUMBER = 8
+DOT11_MAC_ADDRESS = c_ubyte * 6
+
+
+native_wifi = windll.wlanapi
+
+status_dict = [
+ IFACE_INACTIVE,
+ IFACE_CONNECTED,
+ IFACE_CONNECTED,
+ IFACE_DISCONNECTED,
+ IFACE_DISCONNECTED,
+ IFACE_CONNECTING,
+ IFACE_CONNECTING,
+ IFACE_CONNECTING
+]
+
+# opcode = [
+# "wlan_intf_opcode_autoconf_start",
+# "wlan_intf_opcode_autoconf_enabled",
+# "wlan_intf_opcode_background_scan_enabled",
+# "wlan_intf_opcode_media_streaming_mode",
+# "wlan_intf_opcode_radio_state",
+# "wlan_intf_opcode_bss_type",
+# "wlan_intf_opcode_interface_state",
+# "wlan_intf_opcode_current_connection",
+# "wlan_intf_opcode_channel_number",
+# "wlan_intf_opcode_supported_infrastructure_auth_cipher_pairs",
+# "wlan_intf_opcode_supported_adhoc_auth_cipher_pairs",
+# "wlan_intf_opcode_supported_country_or_region_string_list",
+# "wlan_intf_opcode_current_operation_mode",
+# "wlan_intf_opcode_supported_safe_mode",
+# "wlan_intf_opcode_certified_safe_mode",
+# "wlan_intf_opcode_hosted_network_capable",
+# "wlan_intf_opcode_management_frame_protection_capable",
+# "wlan_intf_opcode_secondary_sta_interfaces",
+# "wlan_intf_opcode_secondary_sta_synchronized_connections",
+# "wlan_intf_opcode_realtime_connection_quality",
+# wlan_intf_opcode_autoconf_end = 0x0fffffff,
+# wlan_intf_opcode_msm_start = 0x10000100,
+# wlan_intf_opcode_statistics,
+# wlan_intf_opcode_rssi,
+# wlan_intf_opcode_msm_end = 0x1fffffff,
+# wlan_intf_opcode_security_start = 0x20010000,
+# wlan_intf_opcode_security_end = 0x2fffffff,
+# wlan_intf_opcode_ihv_start = 0x30000000,
+# wlan_intf_opcode_ihv_end = 0x3fffffff]
+
+auth_value_to_str_dict = {
+ AUTH_ALG_OPEN: 'open',
+ AUTH_ALG_SHARED: 'shared'
+}
+
+auth_str_to_value_dict = {
+ 'open': AUTH_ALG_OPEN,
+ 'shared': AUTH_ALG_SHARED
+}
+
+akm_str_to_value_dict = {
+ 'NONE': AKM_TYPE_NONE,
+ 'WPA': AKM_TYPE_WPA,
+ 'WPAPSK': AKM_TYPE_WPAPSK,
+ 'WPA2': AKM_TYPE_WPA2,
+ 'WPA2PSK': AKM_TYPE_WPA2PSK,
+ 'OTHER': AKM_TYPE_UNKNOWN
+}
+
+akm_value_to_str_dict = {
+ AKM_TYPE_NONE: 'NONE',
+ AKM_TYPE_WPA: 'WPA',
+ AKM_TYPE_WPAPSK: 'WPAPSK',
+ AKM_TYPE_WPA2: 'WPA2',
+ AKM_TYPE_WPA2PSK: 'WPA2PSK',
+ AKM_TYPE_UNKNOWN: 'OTHER'
+}
+
+cipher_str_to_value_dict = {
+ 'NONE': CIPHER_TYPE_NONE,
+ 'WEP': CIPHER_TYPE_WEP,
+ 'TKIP': CIPHER_TYPE_TKIP,
+ 'AES': CIPHER_TYPE_CCMP,
+ 'OTHER': CIPHER_TYPE_UNKNOWN
+}
+
+cipher_value_to_str_dict = {
+ CIPHER_TYPE_NONE: 'NONE',
+ CIPHER_TYPE_WEP: 'WEP',
+ CIPHER_TYPE_TKIP: 'TKIP',
+ CIPHER_TYPE_CCMP: 'AES',
+ CIPHER_TYPE_UNKNOWN: 'UNKNOWN'
+}
+
+class WLAN_INTERFACE_INFO(Structure):
+
+ _fields_ = [
+ ("InterfaceGuid", GUID),
+ ("strInterfaceDescription", c_wchar * 256),
+ ("isState", c_uint)
+ ]
+
+
+class WLAN_INTERFACE_INFO_LIST(Structure):
+
+ _fields_ = [
+ ("dwNumberOfItems", DWORD),
+ ("dwIndex", DWORD),
+ ("InterfaceInfo", WLAN_INTERFACE_INFO * 1)
+ ]
+
+
+class DOT11_SSID(Structure):
+
+ _fields_ = [("uSSIDLength", c_ulong),
+ ("ucSSID", c_char * 32)]
+
+
+class WLAN_RATE_SET(Structure):
+
+ _fields_ = [
+ ("uRateSetLength", c_ulong),
+ ("usRateSet", c_ushort * 126)
+ ]
+
+
+class WLAN_RAW_DATA(Structure):
+
+ _fields_ = [
+ ("dwDataSize", DWORD),
+ ("DataBlob", c_byte * 1)
+ ]
+
+
+class WLAN_AVAILABLE_NETWORK(Structure):
+
+ _fields_ = [
+ ("strProfileName", c_wchar * 256),
+ ("dot11Ssid", DOT11_SSID),
+ ("dot11BssType", c_uint),
+ ("uNumberOfBssids", c_ulong),
+ ("bNetworkConnectable", c_bool),
+ ("wlanNotConnectableReason", c_uint),
+ ("uNumberOfPhyTypes", c_ulong * WLAN_MAX_PHY_TYPE_NUMBER),
+ ("dot11PhyTypes", c_uint),
+ ("bMorePhyTypes", c_bool),
+ ("wlanSignalQuality", c_ulong),
+ ("bSecurityEnabled", c_bool),
+ ("dot11DefaultAuthAlgorithm", c_uint),
+ ("dot11DefaultCipherAlgorithm", c_uint),
+ ("dwFlags", DWORD),
+ ("dwReserved", DWORD)
+ ]
+
+
+class WLAN_AVAILABLE_NETWORK_LIST(Structure):
+
+ _fields_ = [
+ ("dwNumberOfItems", DWORD),
+ ("dwIndex", DWORD),
+ ("Network", WLAN_AVAILABLE_NETWORK * 1)
+ ]
+
+
+class WLAN_BSS_ENTRY(Structure):
+
+ _fields_ = [
+ ("dot11Ssid", DOT11_SSID),
+ ("uPhyId", c_ulong),
+ ("dot11Bssid", DOT11_MAC_ADDRESS),
+ ("dot11BssType", c_uint),
+ ("dot11BssPhyType", c_uint),
+ ("lRssi", c_long),
+ ("uLinkQuality", c_ulong),
+ ("bInRegDomain", c_bool),
+ ("usBeaconPeriod", c_ushort),
+ ("ullTimestamp", c_ulonglong),
+ ("ullHostTimestamp", c_ulonglong),
+ ("usCapabilityInformation", c_ushort),
+ ("ulChCenterFrequency", c_ulong),
+ ("wlanRateSet", WLAN_RATE_SET),
+ ("ulIeOffset", c_ulong),
+ ("ulIeSize", c_ulong)
+ ]
+
+
+class WLAN_BSS_LIST(Structure):
+
+ _fields_ = [
+ ("dwTotalSize", DWORD),
+ ("dwNumberOfItems", DWORD),
+ ("wlanBssEntries", WLAN_BSS_ENTRY * 1)
+ ]
+
+
+class NDIS_OBJECT_HEADER(Structure):
+
+ _fields_ = [
+ ("Type", c_ubyte),
+ ("Revision", c_ubyte),
+ ("Size", c_ushort)
+ ]
+
+
+class DOT11_BSSID_LIST(Structure):
+
+ _fields_ = [
+ ("Header", NDIS_OBJECT_HEADER),
+ ("uNumOfEntries", c_ulong),
+ ("uTotalNumOfEntries", c_ulong),
+ ("BSSIDs", DOT11_MAC_ADDRESS * 1)
+ ]
+
+
+class WLAN_CONNECTION_PARAMETERS(Structure):
+
+ _fields_ = [
+ ("wlanConnectionMode", c_uint),
+ ("strProfile", c_wchar_p),
+ ("pDot11Ssid", POINTER(DOT11_SSID)),
+ ("pDesiredBssidList", POINTER(DOT11_BSSID_LIST)),
+ ("dot11BssType", c_uint),
+ ("dwFlags", DWORD)
+ ]
+
+
+class WLAN_PROFILE_INFO(Structure):
+
+ _fields_ = [
+ ("strProfileName", c_wchar * 256),
+ ("dwFlags", DWORD)
+ ]
+
+
+class WLAN_PROFILE_INFO_LIST(Structure):
+
+ _fields_ = [
+ ("dwNumberOfItems", DWORD),
+ ("dwIndex", DWORD),
+ ("ProfileInfo", WLAN_PROFILE_INFO * 1)
+ ]
+
+
+class WifiUtil():
+ """WifiUtil implements the wifi functions in Windows."""
+
+ _nego_version = DWORD()
+ _handle = HANDLE()
+ _ifaces = pointer(WLAN_INTERFACE_INFO_LIST())
+ _logger = logging.getLogger('pywifi')
+
+ # def scan(self, obj):
+ # """Trigger the wifi interface to scan."""
+
+ # self._wlan_scan(self._handle, byref(obj['guid']))
+
+ # def scan_results(self, obj):
+ # """Get the AP list after scanning."""
+
+ # avail_network_list = pointer(WLAN_AVAILABLE_NETWORK_LIST())
+ # self._wlan_get_available_network_list(self._handle,
+ # byref(obj['guid']), byref(avail_network_list))
+ # networks = cast(avail_network_list.contents.Network,
+ # POINTER(WLAN_AVAILABLE_NETWORK))
+
+ # self._logger.debug("Scan found %d networks.",
+ # avail_network_list.contents.dwNumberOfItems)
+
+ # network_list = []
+ # for i in range(avail_network_list.contents.dwNumberOfItems):
+
+ # if networks[i].dot11BssType == 1 and networks[i].bNetworkConnectable :
+
+ # ssid = ''
+ # for j in range(networks[i].dot11Ssid.uSSIDLength):
+
+ # if networks[i].dot11Ssid.ucSSID != b'':
+ # ssid += "%c" % networks[i].dot11Ssid.ucSSID[j]
+
+ # bss_list = pointer(WLAN_BSS_LIST())
+ # self._wlan_get_network_bss_list(self._handle,
+ # byref(obj['guid']), byref(bss_list), networks[i].dot11Ssid, networks[i].bSecurityEnabled)
+ # bsses = cast(bss_list.contents.wlanBssEntries,
+ # POINTER(WLAN_BSS_ENTRY))
+
+ # if networks[i].bSecurityEnabled:
+ # akm = self._get_akm(networks[i].dot11DefaultCipherAlgorithm)
+ # auth_alg = self._get_auth_alg(networks[i].dot11DefaultAuthAlgorithm)
+ # else:
+ # akm = [AKM_TYPE_NONE]
+ # auth_alg = [AUTH_ALG_OPEN]
+
+ # for j in range(bss_list.contents.dwNumberOfItems):
+ # network = Profile()
+
+ # network.ssid = ssid
+
+ # network.bssid = ''
+ # for k in range(6):
+ # network.bssid += "%02x:" % bsses[j].dot11Bssid[k]
+
+ # network.signal = bsses[j].lRssi
+ # network.freq = bsses[j].ulChCenterFrequency
+ # network.auth = auth_alg
+ # network.akm = akm
+ # network_list.append(network)
+
+ # return network_list
+
+ # def connect(self, obj, params):
+ # """Connect to the specified AP."""
+
+ # connect_params = WLAN_CONNECTION_PARAMETERS()
+ # connect_params.wlanConnectionMode = 0 # Profile
+ # connect_params.dot11BssType = 1 # infra
+ # profile_name = create_unicode_buffer(params.ssid)
+
+ # connect_params.strProfile = profile_name.value
+ # ret = self._wlan_connect(
+ # self._handle, obj['guid'], byref(connect_params))
+ # self._logger.debug('connect result: %d', ret)
+
+ # def disconnect(self, obj):
+ # """Disconnect to the specified AP."""
+
+ # self._wlan_disconnect(self._handle, obj['guid'])
+
+ # def add_network_profile(self, obj, params):
+ # """Add an AP profile for connecting to afterward."""
+
+ # reason_code = DWORD()
+
+ # params.process_akm()
+ # profile_data = {}
+ # profile_data['ssid'] = params.ssid
+
+ # if AKM_TYPE_NONE in params.akm:
+ # profile_data['auth'] = auth_value_to_str_dict[params.auth]
+ # profile_data['encrypt'] = "none"
+ # else:
+ # profile_data['auth'] = akm_value_to_str_dict[params.akm[-1]]
+ # profile_data['encrypt'] = cipher_value_to_str_dict[params.cipher]
+
+ # profile_data['key'] = params.key
+
+ # profile_data['protected'] = 'false'
+ # profile_data['profile_name'] = params.ssid
+
+ # xml = """
+ #
+ # {profile_name}
+ #
+ #
+ # {ssid}
+ #
+ #
+ # ESS
+ # manual
+ #
+ #
+ #
+ # {auth}
+ # {encrypt}
+ # false
+ #
+ # """
+
+ # if AKM_TYPE_NONE not in params.akm:
+ # xml += """
+ # passPhrase
+ # {protected}
+ # {key}
+ # """
+
+ # xml += """
+ #
+ # """
+
+ # xml += """
+ # false
+ #
+ #
+ # """
+
+ # xml = xml.format(**profile_data)
+
+ # status = self._wlan_set_profile(self._handle, obj['guid'], xml,
+ # True, byref(reason_code))
+ # if status != ERROR_SUCCESS:
+ # self._logger.debug("Status %d: Add profile failed", status)
+
+ # buf_size = DWORD(64)
+ # buf = create_unicode_buffer(64)
+ # self._wlan_reason_code_to_str(reason_code, buf_size, buf)
+
+ # return params
+
+ # def network_profile_name_list(self, obj):
+ # """Get AP profile names."""
+
+ # profile_list = pointer(WLAN_PROFILE_INFO_LIST())
+ # self._wlan_get_profile_list(self._handle,
+ # byref(obj['guid']),
+ # byref(profile_list))
+ # profiles = cast(profile_list.contents.ProfileInfo,
+ # POINTER(WLAN_PROFILE_INFO))
+
+ # profile_name_list = []
+ # for i in range(profile_list.contents.dwNumberOfItems):
+ # profile_name = ''
+ # for j in range(len(profiles[i].strProfileName)):
+ # profile_name += profiles[i].strProfileName[j]
+ # profile_name_list.append(profile_name)
+
+ # return profile_name_list
+
+ # def network_profiles(self, obj):
+ # """Get AP profiles."""
+
+ # profile_name_list = self.network_profile_name_list(obj)
+
+ # profile_list = []
+ # for profile_name in profile_name_list:
+ # profile = Profile()
+ # flags = DWORD()
+ # access = DWORD()
+ # xml = LPWSTR()
+ # self._wlan_get_profile(self._handle, obj['guid'],
+ # profile_name, byref(xml), byref(flags),
+ # byref(access))
+ # # fill profile info
+ # profile.ssid = re.search(r'(.*)', xml.value).group(1)
+ # auth = re.search(r'(.*)',
+ # xml.value).group(1).upper()
+
+ # profile.akm = []
+ # if auth not in akm_str_to_value_dict:
+ # if auth not in auth_str_to_value_dict:
+ # profile.auth = AUTH_ALG_OPEN
+ # else:
+ # profile.auth = auth_str_to_value_dict[auth]
+ # profile.akm.append(AKM_TYPE_NONE)
+ # else:
+ # profile.auth = AUTH_ALG_OPEN
+ # profile.akm.append(akm_str_to_value_dict[auth])
+
+ # profile_list.append(profile)
+
+ # return profile_list
+
+ # def remove_network_profile(self, obj, params):
+ # """Remove the specified AP profile."""
+
+ # self._logger.debug("delete profile: %s", params.ssid)
+ # str_buf = create_unicode_buffer(params.ssid)
+ # ret = self._wlan_delete_profile(self._handle, obj['guid'], str_buf)
+ # self._logger.debug("delete result %d", ret)
+
+ # def remove_all_network_profiles(self, obj):
+ # """Remove all the AP profiles."""
+
+ # profile_name_list = self.network_profile_name_list(obj)
+
+ # for profile_name in profile_name_list:
+ # self._logger.debug("delete profile: %s", profile_name)
+ # str_buf = create_unicode_buffer(profile_name)
+ # ret = self._wlan_delete_profile(self._handle, obj['guid'], str_buf)
+ # self._logger.debug("delete result %d", ret)
+
+ def status(self, obj):
+ """Get the wifi interface status."""
+
+ data_size = DWORD()
+ data = PDWORD()
+ opcode_value_type = DWORD()
+ self._wlan_query_interface(self._handle, obj['guid'], 6,
+ byref(data_size), byref(data),
+ byref(opcode_value_type))
+
+ return status_dict[data.contents.value]
+
+ def rssi(self, obj):
+ """Get the wifi interface status."""
+
+ data_size = PDWORD()
+ data = DWORD()
+ opcode_value_type = DWORD()
+ self._wlan_query_interface(self._handle, obj['guid'], 23,
+ byref(data_size), byref(data),
+ byref(opcode_value_type))
+
+ return data.contents.value
+
+ def interfaces(self):
+ """Get the wifi interface lists."""
+
+ ifaces = []
+
+ if self._wlan_open_handle(CLIENT_VERSION,
+ byref(self._nego_version),
+ byref(self._handle)) \
+ is not ERROR_SUCCESS:
+ self._logger.error("Open handle failed!")
+
+ if self._wlan_enum_interfaces(self._handle, byref(self._ifaces)) \
+ is not ERROR_SUCCESS:
+ self._logger.error("Enum interface failed!")
+
+ interfaces = cast(self._ifaces.contents.InterfaceInfo,
+ POINTER(WLAN_INTERFACE_INFO))
+ for i in range(0, self._ifaces.contents.dwNumberOfItems):
+ iface = {}
+ iface['guid'] = interfaces[i].InterfaceGuid
+ iface['name'] = interfaces[i].strInterfaceDescription
+ ifaces.append(iface)
+
+ return ifaces
+
+ def _wlan_open_handle(self, client_version, _nego_version, handle):
+
+ func = native_wifi.WlanOpenHandle
+ func.argtypes = [DWORD, c_void_p, POINTER(DWORD), POINTER(HANDLE)]
+ func.restypes = [DWORD]
+ return func(client_version, None, _nego_version, handle)
+
+ def _wlan_close_handle(self, handle):
+
+ func = native_wifi.WlanCloseHandle
+ func.argtypes = [HANDLE, c_void_p]
+ func.restypes = [DWORD]
+ return func(handle, None)
+
+ def _wlan_enum_interfaces(self, handle, ifaces):
+
+ func = native_wifi.WlanEnumInterfaces
+ func.argtypes = [HANDLE, c_void_p, POINTER(
+ POINTER(WLAN_INTERFACE_INFO_LIST))]
+ func.restypes = [DWORD]
+ return func(handle, None, ifaces)
+
+ # def _wlan_get_available_network_list(self, handle,
+ # iface_guid,
+ # network_list):
+
+ # func = native_wifi.WlanGetAvailableNetworkList
+ # func.argtypes = [HANDLE, POINTER(GUID), DWORD, c_void_p, POINTER(
+ # POINTER(WLAN_AVAILABLE_NETWORK_LIST))]
+ # func.restypes = [DWORD]
+ # return func(handle, iface_guid, 2, None, network_list)
+
+ # def _wlan_get_network_bss_list(self, handle, iface_guid, bss_list, ssid = None, security = False):
+
+ # func = native_wifi.WlanGetNetworkBssList
+ # func.argtypes = [HANDLE, POINTER(GUID), POINTER(
+ # DOT11_SSID), c_uint, c_bool, c_void_p, POINTER(POINTER(WLAN_BSS_LIST))]
+ # func.restypes = [DWORD]
+ # return func(handle, iface_guid, ssid, 1, security, None, bss_list)
+
+ # def _wlan_scan(self, handle, iface_guid):
+
+ # func = native_wifi.WlanScan
+ # func.argtypes = [HANDLE, POINTER(GUID), POINTER(
+ # DOT11_SSID), POINTER(WLAN_RAW_DATA), c_void_p]
+ # func.restypes = [DWORD]
+ # return func(handle, iface_guid, None, None, None)
+
+ # def _wlan_connect(self, handle, iface_guid, params):
+
+ # func = native_wifi.WlanConnect
+ # func.argtypes = [HANDLE, POINTER(GUID), POINTER(
+ # WLAN_CONNECTION_PARAMETERS), c_void_p]
+ # func.restypes = [DWORD]
+ # return func(handle, iface_guid, params, None)
+
+ # def _wlan_set_profile(self, handle, iface_guid, xml, overwrite, reason_code):
+
+ # func = native_wifi.WlanSetProfile
+ # func.argtypes = [HANDLE, POINTER(
+ # GUID), DWORD, c_wchar_p, c_wchar_p, c_bool, c_void_p, POINTER(DWORD)]
+ # func.restypes = [DWORD]
+ # return func(handle, iface_guid, 2, xml, None, overwrite, None, reason_code)
+
+ # def _wlan_reason_code_to_str(self, reason_code, buf_size, buf):
+
+ # func = native_wifi.WlanReasonCodeToString
+ # func.argtypes = [DWORD, DWORD, PWCHAR, c_void_p]
+ # func.restypes = [DWORD]
+ # return func(reason_code, buf_size, buf, None)
+
+ # def _wlan_get_profile_list(self, handle, iface_guid, profile_list):
+
+ # func = native_wifi.WlanGetProfileList
+ # func.argtypes = [HANDLE, POINTER(GUID), c_void_p, POINTER(
+ # POINTER(WLAN_PROFILE_INFO_LIST))]
+ # func.restypes = [DWORD]
+ # return func(handle, iface_guid, None, profile_list)
+
+ # def _wlan_get_profile(self, handle, iface_guid, profile_name, xml, flags, access):
+
+ # func = native_wifi.WlanGetProfile
+ # func.argtypes = [HANDLE, POINTER(GUID), c_wchar_p, c_void_p, POINTER(
+ # c_wchar_p), POINTER(DWORD), POINTER(DWORD)]
+ # func.restypes = [DWORD]
+ # return func(handle, iface_guid, profile_name, None, xml, flags, access)
+
+ # def _wlan_delete_profile(self, handle, iface_guid, profile_name):
+
+ # func = native_wifi.WlanDeleteProfile
+ # func.argtypes = [HANDLE, POINTER(GUID), c_wchar_p, c_void_p]
+ # func.restypes = [DWORD]
+ # return func(handle, iface_guid, profile_name, None)
+
+ def _wlan_query_interface(self, handle, iface_guid, opcode, data_size, data, opcode_value_type):
+
+ func = native_wifi.WlanQueryInterface
+ func.argtypes = [HANDLE, POINTER(GUID), DWORD, c_void_p, POINTER(
+ DWORD), POINTER(POINTER(DWORD)), POINTER(DWORD)]
+ func.restypes = [DWORD]
+ return func(handle, iface_guid, opcode, None, data_size, data, opcode_value_type)
+
+ # def _wlan_disconnect(self, handle, iface_guid):
+
+ # func = native_wifi.WlanDisconnect
+ # func.argtypes = [HANDLE, POINTER(GUID), c_void_p]
+ # func.restypes = [DWORD]
+ # return func(handle, iface_guid, None)
+
+ # def _get_auth_alg(self, auth_val):
+
+ # auth_alg = []
+ # if auth_val in [1, 3, 4, 6, 7]:
+ # auth_alg.append(AUTH_ALG_OPEN)
+ # elif auth_val == 2:
+ # auth_alg.append(AUTH_ALG_SHARED)
+
+ # return auth_alg
+
+ # def _get_akm(self, akm_val):
+
+ # akm = []
+ # if akm_val == 2:
+ # akm.append(AKM_TYPE_WPAPSK)
+ # elif akm_val == 4:
+ # akm.append(AKM_TYPE_WPA2PSK)
+
+ # return akm
+
+
+util = WifiUtil()
+interfaces = util.interfaces()
+print(interfaces)
+print(util.status(interfaces[0]))
+print(util.rssi(interfaces[0]))
\ No newline at end of file
diff --git a/IoTuring/Entity/Entity.py b/IoTuring/Entity/Entity.py
index 1f5f258ad..06369058f 100644
--- a/IoTuring/Entity/Entity.py
+++ b/IoTuring/Entity/Entity.py
@@ -187,29 +187,14 @@ def RunCommand(self,
subprocess.CompletedProcess: See subprocess docs
"""
- # different defaults than in subprocess:
- defaults = {
- "capture_output": True,
- "text": True
- }
-
- for param, value in defaults.items():
- if param not in kwargs:
- kwargs[param] = value
-
try:
- if shell == False and isinstance(command, str):
- runcommand = command.split()
- else:
- runcommand = command
if command_name:
command_name = self.NAME + "-" + command_name
else:
command_name = self.NAME
- p = subprocess.run(
- runcommand, shell=shell, **kwargs)
+ p = OsD.RunCommand(command, shell=shell, **kwargs)
self.Log(self.LOG_DEBUG, f"Called {command_name} command: {p}")
@@ -218,11 +203,12 @@ def RunCommand(self,
if p.stderr:
self.Log(error_loglevel,
f"Error during {command_name} command: {p.stderr}")
+
+ return p
except Exception as e:
raise Exception(f"Error during {command_name} command: {str(e)}")
- return p
@classmethod
def CheckSystemSupport(cls):
diff --git a/IoTuring/Entity/ValueFormat/ValueFormatter.py b/IoTuring/Entity/ValueFormat/ValueFormatter.py
index ec943069e..01011d1e5 100644
--- a/IoTuring/Entity/ValueFormat/ValueFormatter.py
+++ b/IoTuring/Entity/ValueFormat/ValueFormatter.py
@@ -13,11 +13,14 @@
# Lists of measure units
BYTE_SIZES = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
+BYTE_PER_SECOND_SIZES = ['Bps' ,'KBps', 'MBps', 'GBps', 'TBps', 'PBps']
+BIT_PER_SECOND_SIZES = ['bps' ,'Kbps', 'Mbps', 'Gbps', 'Tbps', 'Pbps']
TIME_SIZES = ['s', 'm', 'h', 'd']
FREQUENCY_SIZES = ['Hz', 'kHz', 'MHz', 'GHz']
TIME_SIZES_DIVIDERS = [1, 60, 60, 24]
CELSIUS_UNIT = '°C'
ROTATION = ['rpm']
+RADIOPOWER =['dBm']
SPACE_BEFORE_UNIT = ' '
@@ -53,11 +56,17 @@ def _ParseValue(value, options: ValueFormatterOptions | None, includeUnit: bool)
return ValueFormatter.TemperatureCelsiusFormatter(value, options, includeUnit)
elif valueType == ValueFormatterOptions.TYPE_ROTATION:
return ValueFormatter.RoundsPerMinuteFormatter(value, options, includeUnit)
+ elif valueType ==ValueFormatterOptions.TYPE_RADIOPOWER:
+ return ValueFormatter.RadioPowerFormatter(value, options, includeUnit)
elif valueType == ValueFormatterOptions.TYPE_PERCENTAGE:
if includeUnit:
return str(value) + SPACE_BEFORE_UNIT + '%'
else:
return str(value)
+ elif valueType == ValueFormatterOptions.TYPE_BIT_PER_SECOND:
+ return ValueFormatter.BitPerSecondFormatter(value, options, includeUnit)
+ elif valueType == ValueFormatterOptions.TYPE_BYTE_PER_SECOND:
+ return ValueFormatter.BytePerSecondFormatter(value, options, includeUnit)
else:
return str(value)
@@ -168,8 +177,54 @@ def RoundsPerMinuteFormatter(value, options: ValueFormatterOptions, includeUnit:
result = result + SPACE_BEFORE_UNIT + ROTATION[0]
return result
+ @staticmethod
+ def RadioPowerFormatter(value, options: ValueFormatterOptions, includeUnit: bool):
+
+ value = ValueFormatter.roundValue(value, options)
+
+ if includeUnit:
+ return str(value) + SPACE_BEFORE_UNIT + 'dBm'
+ else:
+ return str(value)
+
+ @staticmethod
+ def BytePerSecondFormatter(value, options: ValueFormatterOptions, includeUnit: bool):
+ # Get value in hertz, and adjustable
+ asked_size = options.get_adjust_size()
+
+ if asked_size and asked_size in BYTE_PER_SECOND_SIZES:
+ index = BYTE_PER_SECOND_SIZES.index(asked_size)
+ value = value/pow(1000,index)
+ else:
+ index = 0
+
+ value = ValueFormatter.roundValue(value, options)
+
+ if includeUnit:
+ return str(value) + SPACE_BEFORE_UNIT + BYTE_PER_SECOND_SIZES[index]
+ else:
+ return str(value)
+
+ def BitPerSecondFormatter(value, options: ValueFormatterOptions, includeUnit: bool):
+ # Get value in hertz, and adjustable
+ asked_size = options.get_adjust_size()
+
+ if asked_size and asked_size in BYTE_PER_SECOND_SIZES:
+ index = BIT_PER_SECOND_SIZES.index(asked_size)
+ value = value/pow(1000,index)
+ else:
+ index = 0
+
+ value = ValueFormatter.roundValue(value, options)
+
+ if includeUnit:
+ return str(value) + SPACE_BEFORE_UNIT + BIT_PER_SECOND_SIZES[index]
+ else:
+ return str(value)
+
@staticmethod
def roundValue(value, options: ValueFormatterOptions):
if options.get_decimals() != ValueFormatterOptions.DO_NOT_TOUCH_DECIMALS:
return round(value, options.get_decimals())
- return value
\ No newline at end of file
+ return value
+
diff --git a/IoTuring/Entity/ValueFormat/ValueFormatterOptions.py b/IoTuring/Entity/ValueFormat/ValueFormatterOptions.py
index 3fc676ecb..272f46c3b 100644
--- a/IoTuring/Entity/ValueFormat/ValueFormatterOptions.py
+++ b/IoTuring/Entity/ValueFormat/ValueFormatterOptions.py
@@ -7,6 +7,9 @@ class ValueFormatterOptions():
TYPE_MILLISECONDS = 5
TYPE_TEMPERATURE = 6
TYPE_ROTATION = 7
+ TYPE_RADIOPOWER = 8
+ TYPE_BYTE_PER_SECOND = 9
+ TYPE_BIT_PER_SECOND = 10
DO_NOT_TOUCH_DECIMALS = -1
diff --git a/IoTuring/MyApp/SystemConsts/OperatingSystemDetection.py b/IoTuring/MyApp/SystemConsts/OperatingSystemDetection.py
index d34c2b947..07f6b9d86 100644
--- a/IoTuring/MyApp/SystemConsts/OperatingSystemDetection.py
+++ b/IoTuring/MyApp/SystemConsts/OperatingSystemDetection.py
@@ -1,7 +1,10 @@
+from __future__ import annotations
+
import platform
import os
import psutil
import shutil
+import subprocess
class OperatingSystemDetection():
OS_NAME = platform.system()
@@ -57,6 +60,42 @@ def GetEnv(cls, envvar) -> str:
env_value = ""
return env_value
+ @staticmethod
+ def RunCommand(command: str | list,
+ shell: bool = False,
+ **kwargs) -> subprocess.CompletedProcess:
+ """Safely call a subprocess. Kwargs are other Subprocess options
+
+ Args:
+ command (str | list): The command to call
+ shell (bool, optional): Run in shell. Defaults to False.
+ **kwargs: subprocess args
+
+ Returns:
+ subprocess.CompletedProcess: See subprocess docs
+ """
+
+ # different defaults than in subprocess:
+ defaults = {
+ "capture_output": True,
+ "text": True
+ }
+
+ for param, value in defaults.items():
+ if param not in kwargs:
+ kwargs[param] = value
+
+ if shell == False and isinstance(command, str):
+ runcommand = command.split()
+ else:
+ runcommand = command
+
+ p = subprocess.run(
+ runcommand, shell=shell, **kwargs)
+
+ return p
+
+
@staticmethod
def CommandExists(command) -> bool:
"""Check if a command exists"""
diff --git a/IoTuring/Warehouse/Deployments/HomeAssistantWarehouse/entities.yaml b/IoTuring/Warehouse/Deployments/HomeAssistantWarehouse/entities.yaml
index c5afe11e0..f00f615f0 100644
--- a/IoTuring/Warehouse/Deployments/HomeAssistantWarehouse/entities.yaml
+++ b/IoTuring/Warehouse/Deployments/HomeAssistantWarehouse/entities.yaml
@@ -97,6 +97,8 @@ Volume:
icon: mdi:volume-high
unit_of_measurement: "%"
custom_type: number
+Wifi:
+ icon: mdi:wifi
TerminalPayloadCommand:
name: Terminal Command
icon: mdi:console-line