diff --git a/acquire/acquire.py b/acquire/acquire.py index 6cd607fe..5297ccb9 100644 --- a/acquire/acquire.py +++ b/acquire/acquire.py @@ -29,7 +29,18 @@ from dissect.util.stream import RunlistStream from acquire.collector import Collector, get_full_formatted_report, get_report_summary +from acquire.dynamic.windows.arp import ( + NetAdapter, + format_net_neighbors_list, + get_windows_net_neighbors, + get_windows_network_adapters, +) from acquire.dynamic.windows.named_objects import NamedObjectType +from acquire.dynamic.windows.netstat import ( + NetConnection, + format_net_connections_list, + get_active_connections, +) from acquire.esxi import esxi_memory_context_manager from acquire.gui import GUI from acquire.hashes import ( @@ -377,12 +388,24 @@ def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Ite @register_module("--netstat") @local_module class Netstat(Module): - DESC = "netstat output" - SPEC = [ - ("command", (["powershell.exe", "netstat", "-a", "-n", "-o"], "netstat")), - ] + DESC = "Windows network connections" EXEC_ORDER = ExecutionOrder.BOTTOM + @classmethod + def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None: + net_connections: list[NetConnection] = get_active_connections() + output = format_net_connections_list(net_connections) + + output_base = ( + fsutil.join(collector.base, collector.COMMAND_OUTPUT_BASE) + if collector.base + else collector.COMMAND_OUTPUT_BASE + ) + full_output_path = fsutil.join(output_base, "netstat") + + collector.output.write_bytes(full_output_path, output.encode()) + collector.report.add_command_collected(cls.__name__, ["netstat", "-a", "-n", "-o"]) + @register_module("--win-processes") @local_module @@ -417,18 +440,21 @@ class WinArpCache(Module): EXEC_ORDER = ExecutionOrder.BOTTOM @classmethod - def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]: - if float(target.ntversion) < 6.2: - commands = [ - # < Windows 10 - ("command", (["arp", "-av"], "win7-arp-cache")), - ] - else: - commands = [ - # Windows 10+ (PowerShell) - ("command", (["PowerShell", "Get-NetNeighbor"], "win10-arp-cache")), - ] - return commands + def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None: + network_adapters: list[NetAdapter] = get_windows_network_adapters() + neighbors = get_windows_net_neighbors(network_adapters) + + output = format_net_neighbors_list(neighbors) + + output_base = ( + fsutil.join(collector.base, collector.COMMAND_OUTPUT_BASE) + if collector.base + else collector.COMMAND_OUTPUT_BASE + ) + full_output_path = fsutil.join(output_base, "arp-cache") + + collector.output.write_bytes(full_output_path, output.encode()) + collector.report.add_command_collected(cls.__name__, ["arp-cache"]) @register_module("--win-rdp-sessions") diff --git a/acquire/dynamic/windows/arp.py b/acquire/dynamic/windows/arp.py new file mode 100644 index 00000000..cd383fca --- /dev/null +++ b/acquire/dynamic/windows/arp.py @@ -0,0 +1,236 @@ +from __future__ import annotations + +import ctypes +from json import dumps +from socket import inet_ntop + +from acquire.dynamic.windows.iphlpapi import ( + ADDRESS_FAMILY, + IF_OPER_STATUS, + IF_TYPE, + IP_ADAPTER_ADDRESSES, + LPVOID, + MIB_IPNET_ROW2, + MIB_IPNET_TABLE2, + NL_NEIGHBOR_STATE, + NO_ERROR, + ULONG, + FreeMibTable, + GetAdaptersAddresses, + GetIpNetTable2, +) + + +def format_physical_address(data: bytes, length: int) -> str | None: + if length > 0: + return "-".join(f"{b:02X}" for b in data[:length]) + return None + + +class NetAdapter: + def __init__( + self, + index: int, + name: str, + description: str, + friendly_name: str, + physical_address: str | None, + mtu: int, + type: IF_TYPE, + status: IF_OPER_STATUS, + ): + self.index = index + self.name = name + self.description = description + self.friendly_name = friendly_name + self.physical_address = physical_address + self.mtu = mtu + self.type = type + self.operation_status = status + + @staticmethod + def from_adapter_addresses(addresses: IP_ADAPTER_ADDRESSES) -> NetAdapter: + index = addresses.Index + adapter_name = addresses.AdapterName.decode() + adapter_desc = addresses.Description + adapter_friendly = addresses.FriendlyName + physical_addr = format_physical_address(addresses.PhysicalAddress, addresses.PhysicalAddressLength) + mtu = addresses.Mtu + type = IF_TYPE(addresses.IfType) + status = IF_OPER_STATUS(addresses.OperStatus) + + return NetAdapter( + index=index, + name=adapter_name, + description=adapter_desc, + friendly_name=adapter_friendly, + physical_address=physical_addr, + mtu=mtu, + type=type, + status=status, + ) + + @staticmethod + def header_fields() -> list[str]: + return [ + "Index", + "Adapter Name", + "Description", + "Friendly Name", + "MAC Address", + "MTU", + "Type", + "Operation Status", + ] + + def as_dict(self, indent=0) -> dict: + return { + "index": self.index, + "name": self.name, + "description": self.description, + "friendly_name": self.friendly_name, + "mac": self.physical_address, + "mtu": self.mtu, + "type": self.type.name, + "status": self.operation_status.name, + } + + def __str__(self) -> str: + return ( + f"NetAdapter(index={self.index}, name={self.name}, desc={self.description}" + f", friendly={self.friendly_name}, mac={self.physical_address}, mtu={self.mtu}, type={self.type}" + f", status={self.operation_status.name})" + ) + + +class NetNeighbor: + def __init__( + self, + family: ADDRESS_FAMILY, + address: str, + mac: str | None, + state: NL_NEIGHBOR_STATE, + adapter: NetAdapter | None, + ): + self.family: ADDRESS_FAMILY = family + self.address: str = address + self.mac: str | None = mac + self.state: NL_NEIGHBOR_STATE = state + self.adapter: NetAdapter | None = adapter + + def as_dict(self) -> dict: + return { + "family": self.family.name, + "address": self.address, + "mac": self.mac if self.mac else "", + "state": self.state.name, + "adapter": self.adapter.as_dict(), + } + + def __str__(self) -> str: + return ( + f"NetNeighbor(family={self.family.name}, address={self.address}," + f"mac={self.mac}, state={self.state.name}, adapter={self.adapter})" + ) + + +def get_windows_network_adapters() -> list[NetAdapter]: + adapter_buffer_size = ULONG(0) + GetAdaptersAddresses(ADDRESS_FAMILY.AF_UNSPEC, 0, LPVOID(0), LPVOID(0), ctypes.byref(adapter_buffer_size)) + + if adapter_buffer_size == 0: + return [] + + buffer = ctypes.create_string_buffer(adapter_buffer_size.value) + result = GetAdaptersAddresses(ADDRESS_FAMILY.AF_UNSPEC, 0, LPVOID(0), buffer, ctypes.byref(adapter_buffer_size)) + if result != NO_ERROR: + return [] + + adapters = ctypes.cast(buffer, ctypes.POINTER(IP_ADAPTER_ADDRESSES)) + adapter = adapters.contents + + network_adapters = [] + + while True: + network_adapters.append(NetAdapter.from_adapter_addresses(adapter)) + + if not adapter.Next: + break + + adapter = ctypes.cast(adapter.Next, ctypes.POINTER(IP_ADAPTER_ADDRESSES)).contents + + return network_adapters + + +def get_adapter_by_index(adapters: list[NetAdapter], index: int) -> NetAdapter | None: + for adapter in adapters: + if adapter.index == index: + return adapter + return None + + +def get_windows_net_neighbors(adapters: list[NetAdapter]) -> list[NetNeighbor]: + table_pointer = ctypes.POINTER(MIB_IPNET_TABLE2)() + result = GetIpNetTable2(ADDRESS_FAMILY.AF_UNSPEC, ctypes.byref(table_pointer)) + + if result != NO_ERROR: + return [] + + table = table_pointer.contents + rows = ctypes.cast(table.Table, ctypes.POINTER(MIB_IPNET_ROW2 * table.NumEntries)).contents + + neighbors = [] + + for row in rows: + if row.Address.si_family == ADDRESS_FAMILY.AF_INET: + ipv4 = row.Address.Ipv4 + address = inet_ntop(ADDRESS_FAMILY.AF_INET, ipv4.sin_addr) + elif row.Address.si_family == ADDRESS_FAMILY.AF_INET6: + ipv6 = row.Address.Ipv6 + address = f"[{inet_ntop(ADDRESS_FAMILY.AF_INET6, ipv6.sin6_addr)}]" + else: + # We should not end up here, but let's gracefully continue in hope there is more valid data to parse. + continue + + mac = format_physical_address(row.PhysicalAddress, row.PhysicalAddressLength) + adapter = get_adapter_by_index(adapters, row.InterfaceIndex) + neighbor = NetNeighbor( + family=ADDRESS_FAMILY(row.Address.si_family), + address=address, + mac=mac, + state=NL_NEIGHBOR_STATE(row.State), + adapter=adapter, + ) + neighbors.append(neighbor) + + FreeMibTable(table_pointer) + + return neighbors + + +def format_net_neighbors_csv(net_neighbors: list[NetNeighbor]) -> str: + def formatter(neighbor: NetNeighbor) -> str: + return f",".join( + [str(neighbor.adapter.index), neighbor.address, neighbor.mac if neighbor.mac else "", neighbor.state.name] + ) + + header = ",".join(["interface_index", "ip_address", "mac", "state"]) + rows = "\n".join(formatter(neighbor) for neighbor in net_neighbors) + + return f"{header}\n{rows}" + + +def format_net_neighbors_json(net_neighbors: list[NetNeighbor], indent=0) -> str: + return dumps(net_neighbors, default=lambda neighbor: neighbor.as_dict(), indent=indent if indent > 0 else None) + + +def format_net_neighbors_list(net_neighbors: list[NetNeighbor]) -> str: + def formatter(neighbor: NetNeighbor) -> str: + mac = neighbor.mac if neighbor.mac else "" + return f"{neighbor.adapter.index:<10}{neighbor.address:<60}{mac:<20}{neighbor.state.name:<20}" + + header = f"{'ifIndex':<10}{'IP Address':<60}{'MAC Address':<20}{'State':<20}" + header += "\n" + ("=" * len(header)) + rows = "\n".join(formatter(neighbor) for neighbor in net_neighbors) + + return f"{header}\n{rows}" diff --git a/acquire/dynamic/windows/iphlpapi.py b/acquire/dynamic/windows/iphlpapi.py new file mode 100644 index 00000000..82073a43 --- /dev/null +++ b/acquire/dynamic/windows/iphlpapi.py @@ -0,0 +1,543 @@ +import ctypes +from ctypes.wintypes import ( + BOOL, + BYTE, + DWORD, + LPVOID, + LPWSTR, + PDWORD, + SHORT, + ULONG, + USHORT, +) +from enum import IntEnum +from typing import ClassVar + +IF_MAX_PHYS_ADDRESS_LENGTH = 32 +MAX_ADAPTER_ADDRESS_LENGTH = 8 +MAXLEN_PHYSADDR = 8 +MAX_DHCPV6_DUID_LENGTH = 130 + +NO_ERROR = 0 +ERROR_NOT_SUPPORTED = 50 +ERROR_INSUFFICIENT_BUFFER = 122 +ERROR_NO_DATA = 232 + +BITNESS = [32, 64][ctypes.sizeof(LPVOID) == 8] + + +class TCP_TABLE_CLASS(IntEnum): + BASIC_LISTENER = 0 + BASIC_CONNECTIONS = 1 + BASIC_ALL = 2 + OWNER_PID_LISTENER = 3 + OWNER_PID_CONNECTIONS = 4 + OWNER_PID_ALL = 5 + OWNER_MODULE_LISTENER = 6 + OWNER_MODULE_CONNECTIONS = 7 + OWNER_MODULE_ALL = 8 + + +class UDP_TABLE_CLASS(IntEnum): + BASIC = 0 + OWNER_PID = 1 + OWNER_MODUL = 2 + + +class MIB_IPNET_TYPE(IntEnum): + OTHER = 1 + INVALID = 2 + DYNAMIC = 3 + STATIC = 4 + + +class IF_TYPE(IntEnum): + OTHER = 1 + REGULAR_1822 = 2 + HDH_1822 = 3 + DDN_X25 = 4 + RFC877_X25 = 5 + ETHERNET_CSMACD = 6 + IS088023_CSMACD = 7 + ISO88024_TOKENBUS = 8 + ISO88025_TOKENRING = 9 + ISO88026_MAN = 10 + STARLAN = 11 + PROTEON_10MBIT = 12 + PROTEON_80MBIT = 13 + HYPERCHANNEL = 14 + FDDI = 15 + LAP_B = 16 + SDLC = 17 + DS1 = 18 + E1 = 19 + BASIC_ISDN = 20 + PRIMARY_ISDN = 21 + PROP_POINT2POINT_SERIAL = 22 + PPP = 23 + SOFTWARE_LOOPBACK = 24 + EON = 25 + ETHERNET_3MBIT = 26 + NSIP = 27 + SLIP = 28 + ULTRA = 29 + DS3 = 30 + SIP = 31 + FRAMERELAY = 32 + RS232 = 33 + PARA = 34 + ARCNET = 35 + ARCNET_PLUS = 36 + ATM = 37 + MIO_X25 = 38 + SONET = 39 + X25_PLE = 40 + ISO88022_LLC = 41 + LOCALTALK = 42 + SMDS_DXI = 43 + FRAMERELAY_SERVICE = 44 + V35 = 45 + HSSI = 46 + HIPPI = 47 + MODEM = 48 + AAL5 = 49 + SONET_PATH = 50 + SONET_VT = 51 + SMDS_ICIP = 52 + PROP_VIRTUAL = 53 + PROP_MULTIPLEXOR = 54 + IEEE80212 = 55 + FIBRECHANNEL = 56 + HIPPIINTERFACE = 57 + FRAMERELAY_INTERCONNECT = 58 + AFLANE_8023 = 59 + AFLANE_8025 = 60 + CCTEMUL = 61 + FASTETHER = 62 + ISDN = 63 + V11 = 64 + V36 = 65 + G703_64K = 66 + G703_2MB = 67 + QLLC = 68 + FASTETHER_FX = 69 + CHANNEL = 70 + IEEE80211 = 71 + IBM370PARCHAN = 72 + ESCON = 73 + DLSW = 74 + ISDN_S = 75 + ISDN_U = 76 + LAP_D = 77 + IPSWITCH = 78 + RSRB = 79 + ATM_LOGICAL = 80 + DS0 = 81 + DS0_BUNDLE = 82 + BSC = 83 + ASYNC = 84 + CNR = 85 + ISO88025R_DTR = 86 + EPLRS = 87 + ARAP = 88 + PROP_CNLS = 89 + HOSTPAD = 90 + TERMPAD = 91 + FRAMERELAY_MPI = 92 + X213 = 93 + ADSL = 94 + RADSL = 95 + SDSL = 96 + VDSL = 97 + ISO88025_CRFPRINT = 98 + MYRINET = 99 + VOICE_EM = 100 + VOICE_FXO = 101 + VOICE_FXS = 102 + VOICE_ENCAP = 103 + VOICE_OVERIP = 104 + ATM_DXI = 105 + ATM_FUNI = 106 + ATM_IMA = 107 + PPPMULTILINKBUNDLE = 108 + IPOVER_CDLC = 109 + IPOVER_CLAW = 110 + STACKTOSTACK = 111 + VIRTUALIPADDRESS = 112 + MPC = 113 + IPOVER_ATM = 114 + ISO88025_FIBER = 115 + TDLC = 116 + GIGABITETHERNET = 117 + HDLC = 118 + LAP_F = 119 + V37 = 120 + X25_MLP = 121 + X25_HUNTGROUP = 122 + TRANSPHDLC = 123 + INTERLEAVE = 124 + FAST = 125 + IP = 126 + DOCSCABLE_MACLAYER = 127 + DOCSCABLE_DOWNSTREAM = 128 + DOCSCABLE_UPSTREAM = 129 + A12MPPSWITCH = 130 + TUNNEL = 131 + COFFEE = 132 + CES = 133 + ATM_SUBINTERFACE = 134 + L2_VLAN = 135 + L3_IPVLAN = 136 + L3_IPXVLAN = 137 + DIGITALPOWERLINE = 138 + MEDIAMAILOVERIP = 139 + DTM = 140 + DCN = 141 + IPFORWARD = 142 + MSDSL = 143 + IEEE1394 = 144 + IF_GSN = 145 + DVBRCC_MACLAYER = 146 + DVBRCC_DOWNSTREAM = 147 + DVBRCC_UPSTREAM = 148 + ATM_VIRTUAL = 149 + MPLS_TUNNEL = 150 + SRP = 151 + VOICEOVERATM = 152 + VOICEOVERFRAMERELAY = 153 + IDSL = 154 + COMPOSITELINK = 155 + SS7_SIGLINK = 156 + PROP_WIRELESS_P2P = 157 + FR_FORWARD = 158 + RFC1483 = 159 + USB = 160 + IEEE8023AD_LAG = 161 + BGP_POLICY_ACCOUNTING = 162 + FRF16_MFR_BUNDLE = 163 + H323_GATEKEEPER = 164 + H323_PROXY = 165 + MPLS = 166 + MF_SIGLINK = 167 + HDSL2 = 168 + SHDSL = 169 + DS1_FDL = 170 + POS = 171 + DVB_ASI_IN = 172 + DVB_ASI_OUT = 173 + PLC = 174 + NFAS = 175 + TR008 = 176 + GR303_RDT = 177 + GR303_IDT = 178 + ISUP = 179 + PROP_DOCS_WIRELESS_MACLAYER = 180 + PROP_DOCS_WIRELESS_DOWNSTREAM = 181 + PROP_DOCS_WIRELESS_UPSTREAM = 182 + HIPERLAN2 = 183 + PROP_BWA_P2MP = 184 + SONET_OVERHEAD_CHANNEL = 185 + DIGITAL_WRAPPER_OVERHEAD_CHANNEL = 186 + AAL2 = 187 + RADIO_MAC = 188 + ATM_RADIO = 189 + IMT = 190 + MVL = 191 + REACH_DSL = 192 + FR_DLCI_ENDPT = 193 + ATM_VCI_ENDPT = 194 + OPTICAL_CHANNEL = 195 + OPTICAL_TRANSPORT = 196 + IEEE80216_WMAN = 237 + WWANPP = 243 + WWANPP2 = 244 + IEEE802154 = 259 + + +class ADDRESS_FAMILY(IntEnum): + AF_UNSPEC = 0 + AF_INET = 2 + AF_INET6 = 23 + + +class NL_NEIGHBOR_STATE(IntEnum): + UNREACHABLE = 0 + INCOMPLETE = 1 + PROBE = 2 + DELAY = 3 + STALE = 4 + REACHABLE = 5 + PERMANENT = 6 + MAXIMUM = 7 + + +class IF_OPER_STATUS(IntEnum): + UP = 1 + DOWN = 2 + TESTING = 3 + UNKNOWN = 4 + DORMANT = 5 + NOTPRESENT = 6 + LOWERLAYERDOWN = 7 + + +class NET_IF_CONNECTION_TYPE(IntEnum): + DEDICATED = 1 + PASSIVE = 2 + DEMAND = 3 + MAXIMUM = 4 + + +class TUNNEL_TYPE(IntEnum): + TUNNEL_TYPE_NONE = 0 + TUNNEL_TYPE_OTHER = 1 + TUNNEL_TYPE_DIRECT = 2 + TUNNEL_TYPE_6TO4 = 11 + TUNNEL_TYPE_ISATAP = 13 + TUNNEL_TYPE_TEREDO = 14 + TUNNEL_TYPE_IPHTTPS = 15 + + +class TCP_CONNECTION_OFFLOAD_STATE(IntEnum): + INHOST = 0 + OFFLOADING = 1 + OFFLOADED = 2 + UPLOADING = 3 + MAX = 4 + + +class MIB_TCP_STATE(IntEnum): + CLOSED = 1 + LISTENING = 2 + SYN_SENT = 3 + SYN_RCVD = 4 + ESTABLISHED = 5 + FIN_WAIT1 = 6 + FIN_WAIT2 = 7 + CLOSE_WAIT = 8 + CLOSING = 9 + LAST_ACK = 10 + TIME_WAIT = 11 + DELETE_TCB = 12 + RESERVED = 100 + + +class CONNECTION_PROTOCOL(IntEnum): + TCP4 = 1 + UDP4 = 2 + TCP6 = 3 + UDP6 = 4 + + +class IN_ADDR(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("S_addr", ULONG), + ] + + +class IN6_ADDR(ctypes.Union): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("Byte", BYTE * 16), + ("Word", USHORT * 8), + ] + + +class SOCKADDR_IN(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("sin_family", SHORT), + ("sin_port", USHORT), + ("sin_addr", IN_ADDR), + ("sin_zero", BYTE * 8), + ] + + +class SOCKADDR_IN6(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("sin6_family", SHORT), + ("sin6_port", USHORT), + ("sin6_flowinfo", ULONG), + ("sin6_addr", IN6_ADDR), + ("sin6_scope_id", ULONG), + ] + + +class SOCKADDR_INET(ctypes.Union): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("Ipv4", SOCKADDR_IN), + ("Ipv6", SOCKADDR_IN6), + ("si_family", USHORT), + ] + + +class MIB_IPNET_ROW2(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("Address", SOCKADDR_INET), + ("InterfaceIndex", ULONG), + # Padding when executing under 32-bit Python environment here. + ("InterfaceLuid", LPVOID), + ("PhysicalAddress", ctypes.c_ubyte * IF_MAX_PHYS_ADDRESS_LENGTH), + ("PhysicalAddressLength", ULONG), + ("State", DWORD), + ("Flags", BYTE), + ("ReachabilityTime", ULONG), + ] + + # 32-bit Python does not correctly align the stucture, which results + # in no padding being added after the interface index. This causes + # the size of the object to be 84 instead of the correct 88 bytes. + # To correct for this, we add an extra four bytes of padding after the + # `InterfaceIndex` member. + if BITNESS == 32: + _fields_.insert(2, ("Padding", DWORD)) + + +class MIB_IPNET_TABLE2(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("NumEntries", ULONG), + ("Padding", ULONG), # Padding to fix alignment + ("Table", MIB_IPNET_ROW2 * 1), + ] + + +class IP_ADAPTER_ADDRESSES(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("Length", ULONG), + ("Index", ULONG), + ("Next", LPVOID), + ("AdapterName", ctypes.c_char_p), + ("FirstUnicastAddress", LPVOID), + ("FirstAnycastAddress", LPVOID), + ("FirstMulticastAddress", LPVOID), + ("FirstDnsServerAddress", LPVOID), + ("DnsSuffix", LPWSTR), + ("Description", LPWSTR), + ("FriendlyName", LPWSTR), + ("PhysicalAddress", ctypes.c_ubyte * MAX_ADAPTER_ADDRESS_LENGTH), + ("PhysicalAddressLength", ULONG), + ("Flags", ULONG), + ("Mtu", ULONG), + ("IfType", ULONG), + ("OperStatus", DWORD), + ("Ipv6IfIndex", ULONG), + ("ZoneIndices", ULONG * 16), + ("FirstPrefix", LPVOID), + ("TransmitLinkSpeed", ctypes.c_ulonglong), + ("ReceiveLinkSpeed", ctypes.c_ulonglong), + ("FirstWinsServerAddress", LPVOID), + ("FirstGatewayAddress", LPVOID), + ("Ipv4Metric", ULONG), + ("Ipv6Metric", ULONG), + ("Luid", LPVOID), + ("Dhcpv4Server", BYTE * 16), + ("CompartmentId", DWORD), + ("Padding", DWORD), # Padding to fix alignment + ("NetworkGuid", BYTE * 16), + ("ConnectionType", DWORD), + ("TunnelType", DWORD), + ("Dhcpv6Server", BYTE * 16), + ("Dhcpv6ClientDuid", BYTE * MAX_DHCPV6_DUID_LENGTH), + ("Dhcpv6ClientDuidLength", ULONG), + ("Dhcpv6Iaid", ULONG), + ("FirstDnsSuffix", LPVOID), + ] + + +class MIB_TCPROW_OWNER_PID(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("dwState", DWORD), + ("dwLocalAddr", DWORD), + ("dwLocalPort", DWORD), + ("dwRemoteAddr", DWORD), + ("dwRemotePort", DWORD), + ("dwOwningPid", DWORD), + ] + + +class MIB_TCP6ROW_OWNER_PID(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("ucLocalAddr", ctypes.c_ubyte * 16), + ("dwLocalScopeId", DWORD), + ("dwLocalPort", DWORD), + ("ucRemoteAddr", ctypes.c_ubyte * 16), + ("dwRemoteScopeId", DWORD), + ("dwRemotePort", DWORD), + ("dwState", DWORD), + ("dwOwningPid", DWORD), + ] + + +class MIB_UDPROW_OWNER_PID(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("dwLocalAddr", DWORD), + ("dwLocalPort", DWORD), + ("dwOwningPid", DWORD), + ] + + +class MIB_UDP6ROW_OWNER_PID(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("ucLocalAddr", ctypes.c_ubyte * 16), + ("dwLocalScopeId", DWORD), + ("dwLocalPort", DWORD), + ("dwOwningPid", DWORD), + ] + + +class MIB_TCPTABLE_OWNER_PID(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("dwNumEntries", DWORD), + ("table", MIB_TCPROW_OWNER_PID * 1), + ] + + +class MIB_TCP6TABLE_OWNER_PID(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("dwNumEntries", DWORD), + ("table", MIB_TCP6ROW_OWNER_PID * 1), + ] + + +class MIB_UDPTABLE_OWNER_PID(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("dwNumEntries", DWORD), + ("table", MIB_UDPROW_OWNER_PID * 1), + ] + + +class MIB_UDP6TABLE_OWNER_PID(ctypes.Structure): + _fields_: ClassVar[list[tuple[str, type]]] = [ + ("dwNumEntries", DWORD), + ("table", MIB_UDP6ROW_OWNER_PID * 1), + ] + + +PULONG = ctypes.POINTER(ULONG) +PMIB_IPNET_TABLE2 = ctypes.POINTER(MIB_IPNET_TABLE2) +PMIB_TCPTABLE_OWNER_PID = ctypes.POINTER(MIB_TCPTABLE_OWNER_PID) +PMIB_TCP6TABLE_OWNER_PID = ctypes.POINTER(MIB_TCP6TABLE_OWNER_PID) +PMIB_UDPTABLE_OWNER_PID = ctypes.POINTER(MIB_UDPTABLE_OWNER_PID) +PMIB_UDP6TABLE_OWNER_PID = ctypes.POINTER(MIB_UDP6TABLE_OWNER_PID) + +iphlpapi = ctypes.WinDLL("Iphlpapi.dll") + +# arp calls +GetIpNetTable2 = iphlpapi.GetIpNetTable2 +GetIpNetTable2.argtypes = [ULONG, ctypes.POINTER(PMIB_IPNET_TABLE2)] +GetIpNetTable2.restype = ULONG + +FreeMibTable = iphlpapi.FreeMibTable +FreeMibTable.argtypes = [LPVOID] +FreeMibTable.restype = None + +GetAdaptersAddresses = iphlpapi.GetAdaptersAddresses +GetAdaptersAddresses.argtypes = [ULONG, ULONG, LPVOID, LPVOID, PULONG] +GetAdaptersAddresses.restype = ULONG + +# net connection calls +GetExtendedTcpTable = iphlpapi.GetExtendedTcpTable +GetExtendedTcpTable.argtypes = [LPVOID, PDWORD, BOOL, ULONG, ULONG, ULONG] +GetExtendedTcpTable.restype = DWORD + +GetExtendedUdpTable = iphlpapi.GetExtendedUdpTable +GetExtendedUdpTable.argtypes = [LPVOID, PDWORD, BOOL, ULONG, ULONG, ULONG] +GetExtendedUdpTable.restype = DWORD diff --git a/acquire/dynamic/windows/netstat.py b/acquire/dynamic/windows/netstat.py new file mode 100644 index 00000000..9b3db0d2 --- /dev/null +++ b/acquire/dynamic/windows/netstat.py @@ -0,0 +1,232 @@ +from __future__ import annotations + +import ctypes +from json import dumps +from socket import htons, inet_ntop +from typing import Callable, Union + +from acquire.dynamic.windows.iphlpapi import ( + ADDRESS_FAMILY, + BOOL, + CONNECTION_PROTOCOL, + ERROR_INSUFFICIENT_BUFFER, + LPVOID, + MIB_TCP6ROW_OWNER_PID, + MIB_TCP6TABLE_OWNER_PID, + MIB_TCP_STATE, + MIB_TCPROW_OWNER_PID, + MIB_TCPTABLE_OWNER_PID, + MIB_UDP6ROW_OWNER_PID, + MIB_UDP6TABLE_OWNER_PID, + MIB_UDPROW_OWNER_PID, + MIB_UDPTABLE_OWNER_PID, + NO_ERROR, + PDWORD, + TCP_CONNECTION_OFFLOAD_STATE, + TCP_TABLE_CLASS, + UDP_TABLE_CLASS, + ULONG, + GetExtendedTcpTable, + GetExtendedUdpTable, +) + +NetConnTableClass = Union[TCP_TABLE_CLASS, UDP_TABLE_CLASS] +NetConnTableType = Union[ + MIB_TCPTABLE_OWNER_PID, MIB_TCP6TABLE_OWNER_PID, MIB_UDPTABLE_OWNER_PID, MIB_UDP6TABLE_OWNER_PID +] +NetConnTableRowType = Union[MIB_TCPROW_OWNER_PID, MIB_TCP6ROW_OWNER_PID, MIB_UDPROW_OWNER_PID, MIB_UDP6ROW_OWNER_PID] +NetConnTableResult = Union[ + MIB_TCPTABLE_OWNER_PID, MIB_TCP6TABLE_OWNER_PID, MIB_UDPTABLE_OWNER_PID, MIB_UDP6TABLE_OWNER_PID +] +NetConnTableCallback = Callable[[LPVOID, PDWORD, BOOL, ULONG, ULONG, ULONG], NetConnTableResult] + +NetConnRowParserArgs = Union[MIB_TCPROW_OWNER_PID, MIB_TCP6ROW_OWNER_PID, MIB_UDPROW_OWNER_PID, MIB_UDP6ROW_OWNER_PID] +NetConnRowParser = Callable[[NetConnRowParserArgs], "NetConnection"] + + +class NetConnection: + def __init__( + self, + protocol: CONNECTION_PROTOCOL, + local_addr: str, + local_port: int, + remote_addr: str | None, + remote_port: int | None, + state: TCP_CONNECTION_OFFLOAD_STATE | None, + pid: int, + ) -> None: + self.protocol = protocol + self.local_address = local_addr + self.local_port = local_port + self.remote_address = remote_addr + self.remote_port = remote_port + self.state = state + self.pid = pid + + def as_dict(self) -> dict: + return { + "protocol": self.protocol.name, + "local_address": self.local_address, + "local_port": self.local_port, + "remote_address": self.remote_address, + "remote_port": self.remote_port, + "state": self.state.name if self.state else None, + "pid": self.pid, + } + + def __str__(self) -> str: + state = self.state.name if self.state else None + return ( + f"NetConnection(protocol={self.protocol.name}, lhost={self.local_address}, lport={self.local_port}, " + f"rhost={self.remote_address}, rport={self.remote_port}, state={state}, pid={self.pid})" + ) + + +def get_netconn_table( + get_netconn_table_proc: NetConnTableCallback, + family: ADDRESS_FAMILY, + cls: NetConnTableClass, + table_type: NetConnTableType, +) -> NetConnTableResult | None: + table_size = ULONG(0) + result = get_netconn_table_proc(LPVOID(0), ctypes.byref(table_size), True, family, cls, ULONG(0)) + + if result != ERROR_INSUFFICIENT_BUFFER: + return None + + buffer = ctypes.create_string_buffer(table_size.value) + result = get_netconn_table_proc(buffer, ctypes.byref(table_size), True, family, cls, ULONG(0)) + + if result != NO_ERROR: + return None + + return ctypes.cast(buffer, ctypes.POINTER(table_type)).contents + + +def parse_netconn_rows( + table: NetConnTableType, row_type: NetConnTableRowType, row_parse_callback: NetConnRowParser +) -> list[NetConnection]: + entries = table.dwNumEntries + rows = ctypes.cast(table.table, ctypes.POINTER(row_type * entries)).contents + + connections = [] + + for row in rows: + conn = row_parse_callback(row) + connections.append(conn) + + return connections + + +def tcp4_row_parser(row: MIB_TCPROW_OWNER_PID) -> NetConnection: + return NetConnection( + protocol=CONNECTION_PROTOCOL.TCP4, + local_addr=inet_ntop(ADDRESS_FAMILY.AF_INET, row.dwLocalAddr.to_bytes(4, byteorder="little")), + local_port=htons(row.dwLocalPort), + remote_addr=inet_ntop(ADDRESS_FAMILY.AF_INET, row.dwRemoteAddr.to_bytes(4, byteorder="little")), + remote_port=htons(row.dwRemotePort), + state=MIB_TCP_STATE(row.dwState), + pid=row.dwOwningPid, + ) + + +def udp4_row_parser(row: MIB_UDPROW_OWNER_PID) -> NetConnection: + return NetConnection( + protocol=CONNECTION_PROTOCOL.UDP4, + local_addr=inet_ntop(ADDRESS_FAMILY.AF_INET, row.dwLocalAddr.to_bytes(4, byteorder="little")), + local_port=htons(row.dwLocalPort), + remote_addr=None, + remote_port=None, + state=None, + pid=row.dwOwningPid, + ) + + +def tcp6_row_parser(row: MIB_TCP6ROW_OWNER_PID) -> NetConnection: + return NetConnection( + protocol=CONNECTION_PROTOCOL.TCP6, + local_addr=f"[{inet_ntop(ADDRESS_FAMILY.AF_INET6, row.ucLocalAddr)}]", + local_port=htons(row.dwLocalPort), + remote_addr=f"[{inet_ntop(ADDRESS_FAMILY.AF_INET6, row.ucRemoteAddr)}]", + remote_port=htons(row.dwRemotePort), + state=MIB_TCP_STATE(row.dwState), + pid=row.dwOwningPid, + ) + + +def udp6_row_parser(row: MIB_UDP6ROW_OWNER_PID) -> NetConnection: + return NetConnection( + protocol=CONNECTION_PROTOCOL.UDP6, + local_addr=f"[{inet_ntop(ADDRESS_FAMILY.AF_INET6, row.ucLocalAddr)}]", + local_port=htons(row.dwLocalPort), + remote_addr=None, + remote_port=None, + state=None, + pid=row.dwOwningPid, + ) + + +def get_active_connections() -> list[NetConnection]: + tcp4_table = get_netconn_table( + GetExtendedTcpTable, ADDRESS_FAMILY.AF_INET, TCP_TABLE_CLASS.OWNER_PID_ALL, MIB_TCPTABLE_OWNER_PID + ) + tcp4_conns = parse_netconn_rows(tcp4_table, MIB_TCPROW_OWNER_PID, tcp4_row_parser) + + tcp6_table = get_netconn_table( + GetExtendedTcpTable, ADDRESS_FAMILY.AF_INET6, TCP_TABLE_CLASS.OWNER_PID_ALL, MIB_TCP6TABLE_OWNER_PID + ) + tcp6_conns = parse_netconn_rows(tcp6_table, MIB_TCP6ROW_OWNER_PID, tcp6_row_parser) + + udp4_table = get_netconn_table( + GetExtendedUdpTable, ADDRESS_FAMILY.AF_INET, UDP_TABLE_CLASS.OWNER_PID, MIB_UDPTABLE_OWNER_PID + ) + udp4_conns = parse_netconn_rows(udp4_table, MIB_UDPROW_OWNER_PID, udp4_row_parser) + + udp6_table = get_netconn_table( + GetExtendedUdpTable, ADDRESS_FAMILY.AF_INET6, UDP_TABLE_CLASS.OWNER_PID, MIB_UDP6TABLE_OWNER_PID + ) + udp6_conns = parse_netconn_rows(udp6_table, MIB_UDP6ROW_OWNER_PID, udp6_row_parser) + + return tcp4_conns + tcp6_conns + udp4_conns + udp6_conns + + +def format_net_connections_csv(net_connections: list[NetConnection]) -> str: + def formatter(connection: NetConnection) -> str: + rhost = connection.remote_address if connection.remote_address else "" + rport = str(connection.remote_port) if connection.remote_port else "" + state = connection.state.name if connection.state else "" + return ",".join( + [connection.protocol.name, connection.local_address, str(connection.local_port), rhost, rport, state] + ) + + header = ",".join(["protocol", "local address", "local port", "remote address", "remote port", "state"]) + rows = "\n".join(formatter(connection) for connection in net_connections) + + return f"{header}\n{rows}" + + +def format_net_connections_json(net_connections: list[NetConnection], indent=0) -> str: + return dumps( + net_connections, default=lambda connection: connection.as_dict(), indent=indent if indent > 0 else None + ) + + +def format_net_connections_list(net_connections: list[NetConnection]) -> str: + def formatter(connection: NetConnection) -> str: + rhost = connection.remote_address if connection.remote_address else "" + rport = str(connection.remote_port) if connection.remote_port else "" + state = connection.state.name if connection.state else "" + + lconn = f"{connection.local_address}:{str(connection.local_port)}" + if connection.protocol in [CONNECTION_PROTOCOL.TCP4, CONNECTION_PROTOCOL.TCP6]: + rconn = f"{rhost}:{rport}" + else: + rconn = "*:*" + + return f"{connection.protocol.name:<10}{lconn:<40}{rconn:<40}" f"{state:<20}{str(connection.pid):<10}" + + header = f"{'Proto':<10}{'Local Address':<40}{'Foreign Address':<40}{'State':<20}{'PID':<10}" + header += "\n" + ("=" * len(header)) + rows = "\n".join(formatter(connection) for connection in net_connections) + + return f"{header}\n{rows}"